8/3/13

HBASE - A POC

I have been exploring HBASE for its speedy retrieve and shipment. Although i had problems in bulk import,export feature, i was able to do few things like creating table, selecting records etc.

Main.java

import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.HTableFactory;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.coprocessor.AggregationClient;
import org.apache.hadoop.hbase.filter.BinaryComparator;
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.filter.FilterList.Operator;
import org.apache.hadoop.hbase.filter.PrefixFilter;
import org.apache.hadoop.hbase.filter.QualifierFilter;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Bytes;
//import org.apache.hadoop.mapred.KeyValueTextInputFormat;
//import org.apache.hadoop.mapreduce.Job;
//import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapred.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat;
import org.apache.hadoop.hbase.mapreduce.PutSortReducer;
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
import org.apache.hadoop.hbase.mapreduce.hadoopbackport.TotalOrderPartitioner;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
public class Main {

    /**
     * @param args the command line arguments
     */
    public static void main(String[] args) throws IOException, Throwable {
        // TODO code application logic here
        Configuration conf = HBaseConfiguration.create();
        conf.set("hbase.zookeeper.quorum", "192.168.1.96");
        //conf.set("hbase.zookeeper.property.clientPort", "2181");
//        // HBaseConfiguration conf = new HBaseConfiguration();
//        //   conf.addResource(new Path("/mnt/hbase-0.94.5/hbase-0.94.5/conf/hbase-site.xml"));
//        HTable table = new HTable(conf, "test_table");
//        Put put = new Put(Bytes.toBytes("test-key"));
//        put.add(Bytes.toBytes("columnfamily"), Bytes.toBytes("columnqualifier"), Bytes.toBytes("sample value"));
//        table.put(put);
////        Get g = new Get(Bytes.toBytes("test-key"));
////        Result r = table.get(g);
////        byte[] value = r.getValue(Bytes.toBytes("columnfamily"), Bytes.toBytes("columnqualifier"));
////// If we convert the value bytes, we should get back 'Some Value', the
////// value we inserted at this location.
////        String valueStr = Bytes.toString(value);
////        System.out.println("GET: " + valueStr);
//


        bulk(conf);
        //createTbl(conf);
        // Sample sample = new Sample();
        //sample.run(conf);
    }

    private static void createTbl(Configuration conf) throws Throwable {
        try {
            HBaseAdmin admin = new HBaseAdmin(conf);
            HTableDescriptor desc = new HTableDescriptor("workingtable".getBytes());
            desc.addFamily(new HColumnDescriptor("family1".getBytes()));
            desc.addFamily(new HColumnDescriptor("family1".getBytes()));
            admin.createTable(desc);

            HTableFactory factory = new HTableFactory();
            HTableInterface table = factory.createHTableInterface(conf, "workingtable".getBytes());
            createRecord(conf, table, admin);
            scan(admin, table);
            get(admin, table, conf);
            filters(admin, table);
            deleteRecord(admin, table);
            deleteTable(admin);
        } catch (Exception er) {
            er.printStackTrace();
        }

    }

    private static void createRecord(Configuration conf, HTableInterface table, HBaseAdmin admin) {
        try {
            Put p = new Put("Row1".getBytes());
            p.add("family1".getBytes(), "qualifier1".getBytes(), "raghuram".getBytes());
            p.add("family1".getBytes(), "qualifier2".getBytes(), "Gopal".getBytes());
            table.put(p);

            // Row2 => Family1:Qualifier1, Family2:Qualifier3
            p = new Put("Row2".getBytes());
            p.add("family1".getBytes(), "qualifier1".getBytes(), "sreeram".getBytes());
            p.add("family1".getBytes(), "qualifier2".getBytes(), "Heman".getBytes());
            table.put(p);
//
//            // Row3 => Family1:Qualifier1, Family2:Qualifier3
//            p = new Put(row3);
//            p.add(family1, qualifier1, cellData);
//            p.add(family2, qualifier3, cellData);
//            table.put(p);

//            admin.disableTable("workingtable");

            //          HColumnDescriptor desc = new HColumnDescriptor("Row1");
            //         admin.addColumn("workingtable", desc);


            //       admin.enableTable("workingtable");

        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    private static void scan(HBaseAdmin admin, HTableInterface table) {
        System.out.println("Scan method................");
        try {
            Scan scan = new Scan();
            scan.addColumn("family1".getBytes(), "qualifier1".getBytes());

            ResultScanner scanner = table.getScanner(scan);
            try {
                for (Result result : scanner) {
                    System.out.println("result row---->\n" + result);
                }
            } finally {
                scanner.close();
            }
        } catch (Exception e) {
            e.printStackTrace();
        }


    }

    private static void get(HBaseAdmin admin, HTableInterface table, Configuration conf) throws Throwable {

        System.out.println("Get Method...............");
        try {
            Get g = new Get("Row1".getBytes());
            Result r = table.get(g);
            byte[] value = r.getValue("family1".getBytes(), "qualifier1".getBytes());

            System.out.println("Fetched value: " + Bytes.toString(value));

            AggregationClient aggregationClient = new AggregationClient(conf);
            Scan scan = new Scan();
            scan.addFamily("family1".getBytes());
            long rowCount = aggregationClient.rowCount("workingtable".getBytes(), null, scan);
            System.out.println("row count is " + rowCount);

        } catch (Exception e) {
            e.printStackTrace();

        }



    }

    private static void filters(HBaseAdmin admin, HTableInterface table) throws IOException {
        System.out.println("\n*** FILTERS ~ scanning with filters to fetch a row of which key is larget than \"Row1\"~ ***");
        try {
            Filter filter1 = new PrefixFilter("Row2".getBytes());
            Filter filter2 = new QualifierFilter(CompareOp.GREATER_OR_EQUAL, new BinaryComparator("qualifier1".getBytes()));

            List<Filter> filters = Arrays.asList(filter1, filter2);
            Filter filter3 = new FilterList(Operator.MUST_PASS_ALL, filters);

            Scan scan = new Scan();
            scan.setFilter(filter3);

            ResultScanner scanner = table.getScanner(scan);
            try {
                int i = 0;
                for (Result result : scanner) {
                    System.out.println("Filter " + scan.getFilter() + " matched row: " + result);
                    i++;
                }
                assert i == 1 : "This filtering sample should return 1 row but was " + i + ".";
            } finally {
                scanner.close();
            }
        } catch (Exception e) {
            e.printStackTrace();
        }


    }

    private static void deleteRecord(HBaseAdmin admin, HTableInterface table) {
        System.out.println("inside delte record logic");
        try {
            Delete delete = new Delete("Row1".getBytes());
            delete.deleteColumn("family1".getBytes(), "qualifier1".getBytes());
            table.delete(delete);

            Get get = new Get("Row1".getBytes());
            Result result = table.get(get);
            byte[] value = result.getValue("family1".getBytes(), "qualifier1".getBytes());
            System.out.println("Fetch the data after delete: " + Bytes.toString(value));
        } catch (Exception e) {
            e.printStackTrace();
        }

    }

    private static void deleteTable(HBaseAdmin admin) throws IOException {
        System.out.println("inside delte table method.............");
        try {
            if (admin.tableExists("workingtable")) {
                admin.disableTable("workingtable");
                try {
                    admin.deleteTable("workingtable");
                } finally {
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        }

    }

//    private static void bulkload(Configuration conf) {
//        try {
//            conf.set(TableOutputFormat.OUTPUT_TABLE, "crudtable");
//            Path inputPath = new Path("/share/tsvtest.tsv");
//
//            Job job = new Job(conf, "Sample job");
//
//            job.setMapOutputKeyClass(mapperKey);
//            job.setMapOutputValueClass(mapperValue);
//
//            FileInputFormat.setInputPaths(job, inputPath);
//            job.setInputFormatClass(KeyValueTextInputFormat.class);
//            FileOutputFormat.setOutputPath(job, new Path(HFileoutputPath));
////directory at HDFS where HFiles will be placed
////before bulk loading
//
//            job.setOutputFormatClass(HFileOutputFormat.class);
//
//            job.setJarByClass(caller);
//            job.setMapperClass(mapper);
//
//            HTable hTable = new HTable(config, tableNAME); //tableNAME is a String representing a table which has to already exist in HBase
//            HFileOutputFormat.configureIncrementalLoad(job, hTable);
////check respective API for the complete functionality of this function
//
//job.waitForCompletion(true);
//
//            /* after the job's completion, we have to write the HFiles
//             * into HBase's specified table */
//            LoadIncrementalHFiles lihf = new LoadIncrementalHFiles(config);
//            lihf.doBulkLoad(new Path(HFileoutputPath), hTable);
//        } catch (Exception e) {
//            e.printStackTrace();
//        }
//    }
    private static void bulk(Configuration conf) throws Exception {
        //  conf.set(TableOutputFormat.OUTPUT_TABLE, "crudtable");
        Job job = new Job(conf, "bulk-load");

        job.setJarByClass(Main.class);
        job.setMapperClass(BulkLoadHBase_1Mapper.class);

        job.setReducerClass(PutSortReducer.class);
        job.setMapOutputKeyClass(ImmutableBytesWritable.class);
        job.setMapOutputValueClass(Put.class);
        job.setPartitionerClass(TotalOrderPartitioner.class);
        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(HFileOutputFormat.class);

        FileInputFormat.addInputPath(job,
                new Path("/share/raghu/bulkLoad.csv"));
        HFileOutputFormat.setOutputPath(job,
                new Path("/share/raghu/HBASE_BulkOutput/"));
        Configuration hConf = HBaseConfiguration.create(conf);
        hConf.set("hbase.zookeeper.quorum", "192.168.1.96");
        //hConf.set("hbase.zookeeper.property.clientPort", hbaseZookeeperClientPort);

        HTable hTable = new HTable(hConf, "crudtable");

        HFileOutputFormat.configureIncrementalLoad(job, hTable);
      //  return job;
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }

    public static class BulkLoadHBase_1Mapper
            extends Mapper<Text, Text, ImmutableBytesWritable, Put> {

        public void map(Text key, Text value, Context context) throws IOException, InterruptedException {

            System.out.println("KEY  " + key.toString());
            System.out.println("VALUES : " + value);
            System.out.println("Context : " + context);

            ImmutableBytesWritable ibw =
                    new ImmutableBytesWritable(Bytes.toBytes(key.toString()));

            String val = value.toString();
            byte[] b = Bytes.toBytes(val);
            Put p = new Put(Bytes.toBytes(key.toString()));

            p.add(Bytes.toBytes("family1"), Bytes.toBytes("qualifier1"), Bytes.toBytes(val));

            context.write(ibw, p);
        }
    }
}
 

http://hbase.apache.org/book/perf.writing.html
 

No comments:

Post a Comment

Popular Posts