I have been exploring HBASE for its speedy retrieve and shipment. Although i had problems in bulk import,export feature, i was able to do few things like creating table, selecting records etc.
Main.java
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.HTableFactory;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.coprocessor.AggregationClient;
import org.apache.hadoop.hbase.filter.BinaryComparator;
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.filter.FilterList.Operator;
import org.apache.hadoop.hbase.filter.PrefixFilter;
import org.apache.hadoop.hbase.filter.QualifierFilter;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Bytes;
//import org.apache.hadoop.mapred.KeyValueTextInputFormat;
//import org.apache.hadoop.mapreduce.Job;
//import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapred.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat;
import org.apache.hadoop.hbase.mapreduce.PutSortReducer;
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
import org.apache.hadoop.hbase.mapreduce.hadoopbackport.TotalOrderPartitioner;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
public class Main {
/**
* @param args the command line arguments
*/
public static void main(String[] args) throws IOException, Throwable {
// TODO code application logic here
Configuration conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum", "192.168.1.96");
//conf.set("hbase.zookeeper.property.clientPort", "2181");
// // HBaseConfiguration conf = new HBaseConfiguration();
// // conf.addResource(new Path("/mnt/hbase-0.94.5/hbase-0.94.5/conf/hbase-site.xml"));
// HTable table = new HTable(conf, "test_table");
// Put put = new Put(Bytes.toBytes("test-key"));
// put.add(Bytes.toBytes("columnfamily"), Bytes.toBytes("columnqualifier"), Bytes.toBytes("sample value"));
// table.put(put);
//// Get g = new Get(Bytes.toBytes("test-key"));
//// Result r = table.get(g);
//// byte[] value = r.getValue(Bytes.toBytes("columnfamily"), Bytes.toBytes("columnqualifier"));
////// If we convert the value bytes, we should get back 'Some Value', the
////// value we inserted at this location.
//// String valueStr = Bytes.toString(value);
//// System.out.println("GET: " + valueStr);
//
bulk(conf);
//createTbl(conf);
// Sample sample = new Sample();
//sample.run(conf);
}
private static void createTbl(Configuration conf) throws Throwable {
try {
HBaseAdmin admin = new HBaseAdmin(conf);
HTableDescriptor desc = new HTableDescriptor("workingtable".getBytes());
desc.addFamily(new HColumnDescriptor("family1".getBytes()));
desc.addFamily(new HColumnDescriptor("family1".getBytes()));
admin.createTable(desc);
HTableFactory factory = new HTableFactory();
HTableInterface table = factory.createHTableInterface(conf, "workingtable".getBytes());
createRecord(conf, table, admin);
scan(admin, table);
get(admin, table, conf);
filters(admin, table);
deleteRecord(admin, table);
deleteTable(admin);
} catch (Exception er) {
er.printStackTrace();
}
}
private static void createRecord(Configuration conf, HTableInterface table, HBaseAdmin admin) {
try {
Put p = new Put("Row1".getBytes());
p.add("family1".getBytes(), "qualifier1".getBytes(), "raghuram".getBytes());
p.add("family1".getBytes(), "qualifier2".getBytes(), "Gopal".getBytes());
table.put(p);
// Row2 => Family1:Qualifier1, Family2:Qualifier3
p = new Put("Row2".getBytes());
p.add("family1".getBytes(), "qualifier1".getBytes(), "sreeram".getBytes());
p.add("family1".getBytes(), "qualifier2".getBytes(), "Heman".getBytes());
table.put(p);
//
// // Row3 => Family1:Qualifier1, Family2:Qualifier3
// p = new Put(row3);
// p.add(family1, qualifier1, cellData);
// p.add(family2, qualifier3, cellData);
// table.put(p);
// admin.disableTable("workingtable");
// HColumnDescriptor desc = new HColumnDescriptor("Row1");
// admin.addColumn("workingtable", desc);
// admin.enableTable("workingtable");
} catch (Exception e) {
e.printStackTrace();
}
}
private static void scan(HBaseAdmin admin, HTableInterface table) {
System.out.println("Scan method................");
try {
Scan scan = new Scan();
scan.addColumn("family1".getBytes(), "qualifier1".getBytes());
ResultScanner scanner = table.getScanner(scan);
try {
for (Result result : scanner) {
System.out.println("result row---->\n" + result);
}
} finally {
scanner.close();
}
} catch (Exception e) {
e.printStackTrace();
}
}
private static void get(HBaseAdmin admin, HTableInterface table, Configuration conf) throws Throwable {
System.out.println("Get Method...............");
try {
Get g = new Get("Row1".getBytes());
Result r = table.get(g);
byte[] value = r.getValue("family1".getBytes(), "qualifier1".getBytes());
System.out.println("Fetched value: " + Bytes.toString(value));
AggregationClient aggregationClient = new AggregationClient(conf);
Scan scan = new Scan();
scan.addFamily("family1".getBytes());
long rowCount = aggregationClient.rowCount("workingtable".getBytes(), null, scan);
System.out.println("row count is " + rowCount);
} catch (Exception e) {
e.printStackTrace();
}
}
private static void filters(HBaseAdmin admin, HTableInterface table) throws IOException {
System.out.println("\n*** FILTERS ~ scanning with filters to fetch a row of which key is larget than \"Row1\"~ ***");
try {
Filter filter1 = new PrefixFilter("Row2".getBytes());
Filter filter2 = new QualifierFilter(CompareOp.GREATER_OR_EQUAL, new BinaryComparator("qualifier1".getBytes()));
List<Filter> filters = Arrays.asList(filter1, filter2);
Filter filter3 = new FilterList(Operator.MUST_PASS_ALL, filters);
Scan scan = new Scan();
scan.setFilter(filter3);
ResultScanner scanner = table.getScanner(scan);
try {
int i = 0;
for (Result result : scanner) {
System.out.println("Filter " + scan.getFilter() + " matched row: " + result);
i++;
}
assert i == 1 : "This filtering sample should return 1 row but was " + i + ".";
} finally {
scanner.close();
}
} catch (Exception e) {
e.printStackTrace();
}
}
private static void deleteRecord(HBaseAdmin admin, HTableInterface table) {
System.out.println("inside delte record logic");
try {
Delete delete = new Delete("Row1".getBytes());
delete.deleteColumn("family1".getBytes(), "qualifier1".getBytes());
table.delete(delete);
Get get = new Get("Row1".getBytes());
Result result = table.get(get);
byte[] value = result.getValue("family1".getBytes(), "qualifier1".getBytes());
System.out.println("Fetch the data after delete: " + Bytes.toString(value));
} catch (Exception e) {
e.printStackTrace();
}
}
private static void deleteTable(HBaseAdmin admin) throws IOException {
System.out.println("inside delte table method.............");
try {
if (admin.tableExists("workingtable")) {
admin.disableTable("workingtable");
try {
admin.deleteTable("workingtable");
} finally {
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
// private static void bulkload(Configuration conf) {
// try {
// conf.set(TableOutputFormat.OUTPUT_TABLE, "crudtable");
// Path inputPath = new Path("/share/tsvtest.tsv");
//
// Job job = new Job(conf, "Sample job");
//
// job.setMapOutputKeyClass(mapperKey);
// job.setMapOutputValueClass(mapperValue);
//
// FileInputFormat.setInputPaths(job, inputPath);
// job.setInputFormatClass(KeyValueTextInputFormat.class);
// FileOutputFormat.setOutputPath(job, new Path(HFileoutputPath));
////directory at HDFS where HFiles will be placed
////before bulk loading
//
// job.setOutputFormatClass(HFileOutputFormat.class);
//
// job.setJarByClass(caller);
// job.setMapperClass(mapper);
//
// HTable hTable = new HTable(config, tableNAME); //tableNAME is a String representing a table which has to already exist in HBase
// HFileOutputFormat.configureIncrementalLoad(job, hTable);
////check respective API for the complete functionality of this function
//
//job.waitForCompletion(true);
//
// /* after the job's completion, we have to write the HFiles
// * into HBase's specified table */
// LoadIncrementalHFiles lihf = new LoadIncrementalHFiles(config);
// lihf.doBulkLoad(new Path(HFileoutputPath), hTable);
// } catch (Exception e) {
// e.printStackTrace();
// }
// }
private static void bulk(Configuration conf) throws Exception {
// conf.set(TableOutputFormat.OUTPUT_TABLE, "crudtable");
Job job = new Job(conf, "bulk-load");
job.setJarByClass(Main.class);
job.setMapperClass(BulkLoadHBase_1Mapper.class);
job.setReducerClass(PutSortReducer.class);
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
job.setMapOutputValueClass(Put.class);
job.setPartitionerClass(TotalOrderPartitioner.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(HFileOutputFormat.class);
FileInputFormat.addInputPath(job,
new Path("/share/raghu/bulkLoad.csv"));
HFileOutputFormat.setOutputPath(job,
new Path("/share/raghu/HBASE_BulkOutput/"));
Configuration hConf = HBaseConfiguration.create(conf);
hConf.set("hbase.zookeeper.quorum", "192.168.1.96");
//hConf.set("hbase.zookeeper.property.clientPort", hbaseZookeeperClientPort);
HTable hTable = new HTable(hConf, "crudtable");
HFileOutputFormat.configureIncrementalLoad(job, hTable);
// return job;
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
public static class BulkLoadHBase_1Mapper
extends Mapper<Text, Text, ImmutableBytesWritable, Put> {
public void map(Text key, Text value, Context context) throws IOException, InterruptedException {
System.out.println("KEY " + key.toString());
System.out.println("VALUES : " + value);
System.out.println("Context : " + context);
ImmutableBytesWritable ibw =
new ImmutableBytesWritable(Bytes.toBytes(key.toString()));
String val = value.toString();
byte[] b = Bytes.toBytes(val);
Put p = new Put(Bytes.toBytes(key.toString()));
p.add(Bytes.toBytes("family1"), Bytes.toBytes("qualifier1"), Bytes.toBytes(val));
context.write(ibw, p);
}
}
}
http://hbase.apache.org/book/perf.writing.html
Main.java
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.HTableFactory;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.coprocessor.AggregationClient;
import org.apache.hadoop.hbase.filter.BinaryComparator;
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.filter.FilterList.Operator;
import org.apache.hadoop.hbase.filter.PrefixFilter;
import org.apache.hadoop.hbase.filter.QualifierFilter;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Bytes;
//import org.apache.hadoop.mapred.KeyValueTextInputFormat;
//import org.apache.hadoop.mapreduce.Job;
//import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapred.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat;
import org.apache.hadoop.hbase.mapreduce.PutSortReducer;
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
import org.apache.hadoop.hbase.mapreduce.hadoopbackport.TotalOrderPartitioner;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
public class Main {
/**
* @param args the command line arguments
*/
public static void main(String[] args) throws IOException, Throwable {
// TODO code application logic here
Configuration conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum", "192.168.1.96");
//conf.set("hbase.zookeeper.property.clientPort", "2181");
// // HBaseConfiguration conf = new HBaseConfiguration();
// // conf.addResource(new Path("/mnt/hbase-0.94.5/hbase-0.94.5/conf/hbase-site.xml"));
// HTable table = new HTable(conf, "test_table");
// Put put = new Put(Bytes.toBytes("test-key"));
// put.add(Bytes.toBytes("columnfamily"), Bytes.toBytes("columnqualifier"), Bytes.toBytes("sample value"));
// table.put(put);
//// Get g = new Get(Bytes.toBytes("test-key"));
//// Result r = table.get(g);
//// byte[] value = r.getValue(Bytes.toBytes("columnfamily"), Bytes.toBytes("columnqualifier"));
////// If we convert the value bytes, we should get back 'Some Value', the
////// value we inserted at this location.
//// String valueStr = Bytes.toString(value);
//// System.out.println("GET: " + valueStr);
//
bulk(conf);
//createTbl(conf);
// Sample sample = new Sample();
//sample.run(conf);
}
private static void createTbl(Configuration conf) throws Throwable {
try {
HBaseAdmin admin = new HBaseAdmin(conf);
HTableDescriptor desc = new HTableDescriptor("workingtable".getBytes());
desc.addFamily(new HColumnDescriptor("family1".getBytes()));
desc.addFamily(new HColumnDescriptor("family1".getBytes()));
admin.createTable(desc);
HTableFactory factory = new HTableFactory();
HTableInterface table = factory.createHTableInterface(conf, "workingtable".getBytes());
createRecord(conf, table, admin);
scan(admin, table);
get(admin, table, conf);
filters(admin, table);
deleteRecord(admin, table);
deleteTable(admin);
} catch (Exception er) {
er.printStackTrace();
}
}
private static void createRecord(Configuration conf, HTableInterface table, HBaseAdmin admin) {
try {
Put p = new Put("Row1".getBytes());
p.add("family1".getBytes(), "qualifier1".getBytes(), "raghuram".getBytes());
p.add("family1".getBytes(), "qualifier2".getBytes(), "Gopal".getBytes());
table.put(p);
// Row2 => Family1:Qualifier1, Family2:Qualifier3
p = new Put("Row2".getBytes());
p.add("family1".getBytes(), "qualifier1".getBytes(), "sreeram".getBytes());
p.add("family1".getBytes(), "qualifier2".getBytes(), "Heman".getBytes());
table.put(p);
//
// // Row3 => Family1:Qualifier1, Family2:Qualifier3
// p = new Put(row3);
// p.add(family1, qualifier1, cellData);
// p.add(family2, qualifier3, cellData);
// table.put(p);
// admin.disableTable("workingtable");
// HColumnDescriptor desc = new HColumnDescriptor("Row1");
// admin.addColumn("workingtable", desc);
// admin.enableTable("workingtable");
} catch (Exception e) {
e.printStackTrace();
}
}
private static void scan(HBaseAdmin admin, HTableInterface table) {
System.out.println("Scan method................");
try {
Scan scan = new Scan();
scan.addColumn("family1".getBytes(), "qualifier1".getBytes());
ResultScanner scanner = table.getScanner(scan);
try {
for (Result result : scanner) {
System.out.println("result row---->\n" + result);
}
} finally {
scanner.close();
}
} catch (Exception e) {
e.printStackTrace();
}
}
private static void get(HBaseAdmin admin, HTableInterface table, Configuration conf) throws Throwable {
System.out.println("Get Method...............");
try {
Get g = new Get("Row1".getBytes());
Result r = table.get(g);
byte[] value = r.getValue("family1".getBytes(), "qualifier1".getBytes());
System.out.println("Fetched value: " + Bytes.toString(value));
AggregationClient aggregationClient = new AggregationClient(conf);
Scan scan = new Scan();
scan.addFamily("family1".getBytes());
long rowCount = aggregationClient.rowCount("workingtable".getBytes(), null, scan);
System.out.println("row count is " + rowCount);
} catch (Exception e) {
e.printStackTrace();
}
}
private static void filters(HBaseAdmin admin, HTableInterface table) throws IOException {
System.out.println("\n*** FILTERS ~ scanning with filters to fetch a row of which key is larget than \"Row1\"~ ***");
try {
Filter filter1 = new PrefixFilter("Row2".getBytes());
Filter filter2 = new QualifierFilter(CompareOp.GREATER_OR_EQUAL, new BinaryComparator("qualifier1".getBytes()));
List<Filter> filters = Arrays.asList(filter1, filter2);
Filter filter3 = new FilterList(Operator.MUST_PASS_ALL, filters);
Scan scan = new Scan();
scan.setFilter(filter3);
ResultScanner scanner = table.getScanner(scan);
try {
int i = 0;
for (Result result : scanner) {
System.out.println("Filter " + scan.getFilter() + " matched row: " + result);
i++;
}
assert i == 1 : "This filtering sample should return 1 row but was " + i + ".";
} finally {
scanner.close();
}
} catch (Exception e) {
e.printStackTrace();
}
}
private static void deleteRecord(HBaseAdmin admin, HTableInterface table) {
System.out.println("inside delte record logic");
try {
Delete delete = new Delete("Row1".getBytes());
delete.deleteColumn("family1".getBytes(), "qualifier1".getBytes());
table.delete(delete);
Get get = new Get("Row1".getBytes());
Result result = table.get(get);
byte[] value = result.getValue("family1".getBytes(), "qualifier1".getBytes());
System.out.println("Fetch the data after delete: " + Bytes.toString(value));
} catch (Exception e) {
e.printStackTrace();
}
}
private static void deleteTable(HBaseAdmin admin) throws IOException {
System.out.println("inside delte table method.............");
try {
if (admin.tableExists("workingtable")) {
admin.disableTable("workingtable");
try {
admin.deleteTable("workingtable");
} finally {
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
// private static void bulkload(Configuration conf) {
// try {
// conf.set(TableOutputFormat.OUTPUT_TABLE, "crudtable");
// Path inputPath = new Path("/share/tsvtest.tsv");
//
// Job job = new Job(conf, "Sample job");
//
// job.setMapOutputKeyClass(mapperKey);
// job.setMapOutputValueClass(mapperValue);
//
// FileInputFormat.setInputPaths(job, inputPath);
// job.setInputFormatClass(KeyValueTextInputFormat.class);
// FileOutputFormat.setOutputPath(job, new Path(HFileoutputPath));
////directory at HDFS where HFiles will be placed
////before bulk loading
//
// job.setOutputFormatClass(HFileOutputFormat.class);
//
// job.setJarByClass(caller);
// job.setMapperClass(mapper);
//
// HTable hTable = new HTable(config, tableNAME); //tableNAME is a String representing a table which has to already exist in HBase
// HFileOutputFormat.configureIncrementalLoad(job, hTable);
////check respective API for the complete functionality of this function
//
//job.waitForCompletion(true);
//
// /* after the job's completion, we have to write the HFiles
// * into HBase's specified table */
// LoadIncrementalHFiles lihf = new LoadIncrementalHFiles(config);
// lihf.doBulkLoad(new Path(HFileoutputPath), hTable);
// } catch (Exception e) {
// e.printStackTrace();
// }
// }
private static void bulk(Configuration conf) throws Exception {
// conf.set(TableOutputFormat.OUTPUT_TABLE, "crudtable");
Job job = new Job(conf, "bulk-load");
job.setJarByClass(Main.class);
job.setMapperClass(BulkLoadHBase_1Mapper.class);
job.setReducerClass(PutSortReducer.class);
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
job.setMapOutputValueClass(Put.class);
job.setPartitionerClass(TotalOrderPartitioner.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(HFileOutputFormat.class);
FileInputFormat.addInputPath(job,
new Path("/share/raghu/bulkLoad.csv"));
HFileOutputFormat.setOutputPath(job,
new Path("/share/raghu/HBASE_BulkOutput/"));
Configuration hConf = HBaseConfiguration.create(conf);
hConf.set("hbase.zookeeper.quorum", "192.168.1.96");
//hConf.set("hbase.zookeeper.property.clientPort", hbaseZookeeperClientPort);
HTable hTable = new HTable(hConf, "crudtable");
HFileOutputFormat.configureIncrementalLoad(job, hTable);
// return job;
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
public static class BulkLoadHBase_1Mapper
extends Mapper<Text, Text, ImmutableBytesWritable, Put> {
public void map(Text key, Text value, Context context) throws IOException, InterruptedException {
System.out.println("KEY " + key.toString());
System.out.println("VALUES : " + value);
System.out.println("Context : " + context);
ImmutableBytesWritable ibw =
new ImmutableBytesWritable(Bytes.toBytes(key.toString()));
String val = value.toString();
byte[] b = Bytes.toBytes(val);
Put p = new Put(Bytes.toBytes(key.toString()));
p.add(Bytes.toBytes("family1"), Bytes.toBytes("qualifier1"), Bytes.toBytes(val));
context.write(ibw, p);
}
}
}
No comments:
Post a Comment