Sunday, January 23, 2011

HBase 0.89 Bulk Import capability

I recently posted about my experience tuning HBase for mass inserts. When I learned that HBase actually supports a way to *avoid* the insert all together, I got really excited. As the Joshua Program says, "the only way to win the game is not to play"! What better way to speed up inserts than not to do them at all! You can't get much faster than zero. So how do you get data into HBase if you don't do inserts? Aha! You can actually generate HFileOutputFormat files, that essentiallly *are* database tables. And believe me, compared to running Puts from your reducer it is fast, and you don't have the "attack my database and maybe crash it" issue. To give you an idea of how fast it is, I had a job whose reducers ran for 45 minutes doing Puts. The bulk loader took 19 seconds to load the files, and it only took a couple minutes to generate the files from the reducer. So the whole bulk load process, end to end, was maybe 2 minutes.

The main "gotchas" along the way: The documentation isn't entirely clear about the fact that if you use HFileOutputFormat.configureIncrementalLoad, that your reducer is silently replaced by an internal HBase class that insures total ordering. Secondly, your mapper must output either Put or KeyValue. This is not a usable way to actually write mapreduce jobs (since you lose the ability to pass domain objects between map and reduce). Therefore, I found it best to alter my existing mapreduce jobs to produce a SequenceFileOutputFormat, in which I wrote Put from my reducer as the output class (since it doesn't interfere with your job logic/classes at all to change only the *final* output class to a Put). In fact, the class I wrote to generate the HFileOutputFormat will actually accept sequence files as input if they contain either a Put, or a single column value. I included support for a single column value just 'cause I had a couple jobs lying around that produced only a single column, and would have been a hassle to convert them to producing Puts. Then my generic bulk loader mapreduce job can be run on any of my output sequence files.

public class BulkLoader extends Configured implements Tool{


    public static class BulkLoaderMapper extends Mapper<WritableComparable, Writable, ImmutableBytesWritable, Put> {

        protected long version;
        protected String column;
        protected String family;

        @Override
        protected void setup(Mapper.Context context) throws IOException, InterruptedException {
            this.version = context.getConfiguration().getLong("version", System.currentTimeMillis());
            this.column = context.getConfiguration().get("column");
            this.family= context.getConfiguration().get("family");
        }


        private byte[] marshal(Writable o) throws IOException{
            ByteArrayOutputStream bos = new ByteArrayOutputStream();
            DataOutputStream dos = new DataOutputStream(bos);
            o.write(dos);
            return bos.toByteArray();
        }

        protected byte[] getHBaseRowKeyBytes(Writable k) throws IOException{
            return marshal(k);
        }

        protected byte[] getHBaseFamilyNameBytes(){
            return Bytes.toBytes(family);
        }

        protected byte[] getHBaseColumnNameBytes(){
            return Bytes.toBytes(column);
        }

        protected byte[] getHBaseValueBytes(Writable v) throws IOException{
            return marshal(v);
        }

        @Override
        protected void map(WritableComparable key, Writable val, Context context) throws IOException, InterruptedException {
            //KeyValue kv = new KeyValue(getHBaseRowKeyBytes(key), getHBaseFamilyNameBytes(), getHBaseColumnNameBytes(), version, getHBaseValueBytes());
            if(context.getConfiguration().get("input-format", "column").equals("put")){ //the serialized file actually is full of puts
                System.out.println(key.toString());
                Put p = (Put) val;
                //I am baffled as to why I cannot use getHBaseRowKeyBytes(key) here. It seems to not produce the correct key.
                //Therefore I m using p.getRow() to get the rowkey bytes.
                context.write(new ImmutableBytesWritable(p.getRow()), p);
            }else{
                Put put = new Put(getHBaseRowKeyBytes(key));
                put.add(getHBaseFamilyNameBytes(), getHBaseColumnNameBytes(), version, getHBaseValueBytes(val));
                context.write(new ImmutableBytesWritable(getHBaseRowKeyBytes(key)), put);
            }
        }
    }

    public static void main(String[] args) throws Exception {
        int exitCode = ToolRunner.run(new BulkLoader(), args);
        System.exit(exitCode);

    }

    @Override
    public int run(String[] otherArgs) throws Exception {

        Configuration config = (Configuration) getConf();//new Configuration();
        Job job = new Job(config);

        //job.setOutputKeyClass(ImmutableBytesWritable.class);
        //job.setOutputValueClass(Put.class);
        job.setMapOutputKeyClass(ImmutableBytesWritable.class);
        job.setMapOutputValueClass(Put.class);
        job.setMapperClass(BulkLoaderMapper.class);

        job.setJarByClass(BulkLoader.class);
        job.setInputFormatClass(SequenceFileInputFormat.class);
        System.out.println("sequencefile-input: " + config.get("sequencefile-input"));
        System.out.println("output-path: " + config.get("output-path"));
        System.out.println("bulk-load-table: " + config.get("bulk-load-table"));
        System.out.println("family: " + config.get("family"));
        System.out.println("column: " + config.get("column"));
        System.out.println("input-format: " + config.get("input-format", "column"));
        SequenceFileInputFormat.addInputPath(job, new Path(config.get("sequencefile-input")));
        job.setOutputFormatClass(HFileOutputFormat.class);
        Configuration hConfig = HBaseConfiguration.create(config);
        hConfig.setLong("version", System.currentTimeMillis());
        hConfig.set("hbase.zookeeper.quorum", config.get("zk", "zk1.x.com, zk2.x.xom"));
        job.setJobName("Bulk Loading table: " + hConfig.get("bulk-load-table","YOU HAVE NOT SET bulk-load-table PARAMETER"));
        HFileOutputFormat.setOutputPath(job, new Path(config.get("output-path")));
        HFileOutputFormat.configureIncrementalLoad(job, new HTable(hConfig, config.get("bulk-load-table")));


        job.waitForCompletion(true);

        return 0; //normal exit

    }
}

Finally, the 0.89 jar seemed to have some packaging problems. I had to create a "lib" directory (used by hadoop in job jars) and rejar the 0.89 jar with guava.jar and a perhaps a couple other missing dependencies. With that done I was able to run the bulkloader with the completebulkload option, using ant command:

5 comments:

  1. What values do you have for your config file?

    ReplyDelete
  2. Can you post your reduce code for outputting Put object into SequenceFileOutputFormat file? And how many nodes do you have in your hbase and how many records you are dealing with? You results are really impressive. Thanks

    ReplyDelete
    Replies
    1. 20 node cluster, 3 billion records. By the way "I'm baffled" comment in my code. I quickly realized (but did not update the code in my blog) that it's just because the key that I output from the job that produces the Puts is a HadoopWritable which of course contains some bookkeeping bytes (it's not just a byte array representing the value, as HBase would expect as a rowkey). The job to produce the Puts is really simple. output key is a Text, value is a Put.

      Delete
    2. Anonymous8:48 PM

      i have been run success job.but i use hbase shell "count 'table'" number row =0.

      Delete
  3. Hello Geffrey, Thanks for the update. We get the bulk loading to hbase finally. But our loading rate is not that great, we only have 4 nodes 14M files and each file will create average 10 records in hbase, so we are loading about 140M records into hbase.It take about almost 1 hour to convert those files into hfiles and 40 seconds to load them. Can you share some detailed job configuration settings such as how many map and reduce tasks you have and what is the maximum map/reduce jobs for each node. We plan to increase the number of the nodes in hbase and to see the performance of loading. BTW: what is your record? Is it a plan text file?
    Thanks

    ReplyDelete