Sunday, January 23, 2011

HBase 0.89 Bulk Import capability

I recently posted about my experience tuning HBase for mass inserts. When I learned that HBase actually supports a way to *avoid* the insert all together, I got really excited. As the Joshua Program says, "the only way to win the game is not to play"! What better way to speed up inserts than not to do them at all! You can't get much faster than zero. So how do you get data into HBase if you don't do inserts? Aha! You can actually generate HFileOutputFormat files, that essentiallly *are* database tables. And believe me, compared to running Puts from your reducer it is fast, and you don't have the "attack my database and maybe crash it" issue. To give you an idea of how fast it is, I had a job whose reducers ran for 45 minutes doing Puts. The bulk loader took 19 seconds to load the files, and it only took a couple minutes to generate the files from the reducer. So the whole bulk load process, end to end, was maybe 2 minutes.

The main "gotchas" along the way: The documentation isn't entirely clear about the fact that if you use HFileOutputFormat.configureIncrementalLoad, that your reducer is silently replaced by an internal HBase class that insures total ordering. Secondly, your mapper must output either Put or KeyValue. This is not a usable way to actually write mapreduce jobs (since you lose the ability to pass domain objects between map and reduce). Therefore, I found it best to alter my existing mapreduce jobs to produce a SequenceFileOutputFormat, in which I wrote Put from my reducer as the output class (since it doesn't interfere with your job logic/classes at all to change only the *final* output class to a Put). In fact, the class I wrote to generate the HFileOutputFormat will actually accept sequence files as input if they contain either a Put, or a single column value. I included support for a single column value just 'cause I had a couple jobs lying around that produced only a single column, and would have been a hassle to convert them to producing Puts. Then my generic bulk loader mapreduce job can be run on any of my output sequence files.

public class BulkLoader extends Configured implements Tool{


    public static class BulkLoaderMapper extends Mapper<WritableComparable, Writable, ImmutableBytesWritable, Put> {

        protected long version;
        protected String column;
        protected String family;

        @Override
        protected void setup(Mapper.Context context) throws IOException, InterruptedException {
            this.version = context.getConfiguration().getLong("version", System.currentTimeMillis());
            this.column = context.getConfiguration().get("column");
            this.family= context.getConfiguration().get("family");
        }


        private byte[] marshal(Writable o) throws IOException{
            ByteArrayOutputStream bos = new ByteArrayOutputStream();
            DataOutputStream dos = new DataOutputStream(bos);
            o.write(dos);
            return bos.toByteArray();
        }

        protected byte[] getHBaseRowKeyBytes(Writable k) throws IOException{
            return marshal(k);
        }

        protected byte[] getHBaseFamilyNameBytes(){
            return Bytes.toBytes(family);
        }

        protected byte[] getHBaseColumnNameBytes(){
            return Bytes.toBytes(column);
        }

        protected byte[] getHBaseValueBytes(Writable v) throws IOException{
            return marshal(v);
        }

        @Override
        protected void map(WritableComparable key, Writable val, Context context) throws IOException, InterruptedException {
            //KeyValue kv = new KeyValue(getHBaseRowKeyBytes(key), getHBaseFamilyNameBytes(), getHBaseColumnNameBytes(), version, getHBaseValueBytes());
            if(context.getConfiguration().get("input-format", "column").equals("put")){ //the serialized file actually is full of puts
                System.out.println(key.toString());
                Put p = (Put) val;
                //I am baffled as to why I cannot use getHBaseRowKeyBytes(key) here. It seems to not produce the correct key.
                //Therefore I m using p.getRow() to get the rowkey bytes.
                context.write(new ImmutableBytesWritable(p.getRow()), p);
            }else{
                Put put = new Put(getHBaseRowKeyBytes(key));
                put.add(getHBaseFamilyNameBytes(), getHBaseColumnNameBytes(), version, getHBaseValueBytes(val));
                context.write(new ImmutableBytesWritable(getHBaseRowKeyBytes(key)), put);
            }
        }
    }

    public static void main(String[] args) throws Exception {
        int exitCode = ToolRunner.run(new BulkLoader(), args);
        System.exit(exitCode);

    }

    @Override
    public int run(String[] otherArgs) throws Exception {

        Configuration config = (Configuration) getConf();//new Configuration();
        Job job = new Job(config);

        //job.setOutputKeyClass(ImmutableBytesWritable.class);
        //job.setOutputValueClass(Put.class);
        job.setMapOutputKeyClass(ImmutableBytesWritable.class);
        job.setMapOutputValueClass(Put.class);
        job.setMapperClass(BulkLoaderMapper.class);

        job.setJarByClass(BulkLoader.class);
        job.setInputFormatClass(SequenceFileInputFormat.class);
        System.out.println("sequencefile-input: " + config.get("sequencefile-input"));
        System.out.println("output-path: " + config.get("output-path"));
        System.out.println("bulk-load-table: " + config.get("bulk-load-table"));
        System.out.println("family: " + config.get("family"));
        System.out.println("column: " + config.get("column"));
        System.out.println("input-format: " + config.get("input-format", "column"));
        SequenceFileInputFormat.addInputPath(job, new Path(config.get("sequencefile-input")));
        job.setOutputFormatClass(HFileOutputFormat.class);
        Configuration hConfig = HBaseConfiguration.create(config);
        hConfig.setLong("version", System.currentTimeMillis());
        hConfig.set("hbase.zookeeper.quorum", config.get("zk", "zk1.x.com, zk2.x.xom"));
        job.setJobName("Bulk Loading table: " + hConfig.get("bulk-load-table","YOU HAVE NOT SET bulk-load-table PARAMETER"));
        HFileOutputFormat.setOutputPath(job, new Path(config.get("output-path")));
        HFileOutputFormat.configureIncrementalLoad(job, new HTable(hConfig, config.get("bulk-load-table")));


        job.waitForCompletion(true);

        return 0; //normal exit

    }
}

Finally, the 0.89 jar seemed to have some packaging problems. I had to create a "lib" directory (used by hadoop in job jars) and rejar the 0.89 jar with guava.jar and a perhaps a couple other missing dependencies. With that done I was able to run the bulkloader with the completebulkload option, using ant command:

Saturday, January 08, 2011

Facebook announces websearch to rival Google's



Admittedly I've made up the headline "Facebook announces websearch to rival Google's", but is it really such an outlandish proposition? It's certainly attention grabbing, and why not? Google's websearch has basically remained the same product for 10 years. Any engineer at Google would debate this, but I doubt anyone would debate that the engineers at facebook have achieved a stunning mastery over big data sets and management of the social network. 
Based on the considerable "Big Data" and real-time data expertise at Facebook, would anyone really be surprised if such a headline surfaced? I'd find it quite a believable story: "Facebook announces intention to best Google at web search." It's hard to overstate the earthquake that such an announcement would send rumbling through silicon valley, and the entire world. In fact, what better way for Facebook to spend their $500 million in Goldman Sachs cash then acquiring promising engineers and technologies to directly attack Google's biggest Web cash cows: search and maps. Bloodying the nose of the competition by competing with their core value has always been Google's way: If an Operating System is of value, Google builds an OS. If mobile computing is of value, google builds a mobile OS, and it's own phone hardware. The day is going to come when a Google rival will compete with Google on Google's home turf: web search.

Tuesday, January 04, 2011

Minhashing is reaaally cool

What is the Jaccard Coefficient? Answer: one of the most important concepts in information retrieval and theory of sets. The Jaccard Coeffieicnt is very simple. It is the measure of the fractional similarity of two sets. Let's take  a simple example {A,B,C,D,E,F} vs. {B,E,F}. Fractional similarity of these sets is 3/6. Easily you can see that 3 elements are shared (the intersection is {B,E,F}), whereas the total number of items in both sets (the union) is 6 elements ({A,B,C,D,E,F}).

So without any mumbo-jumbo, we can see the Jaccard  Coefficient  (JC) is just the ratio of sameness (intersection) over the total amount of stuff present. Technically it's the cardinality (count) of elements in the intersection over cardinality (count) of elements in the union. So while the JC between {A,B,C,D,E,F} and {B,E,F} is 50%, we can see that the JC between {A,B,C,D,E,F,G,H,I} and {B,E,F} is only 33.33%. This illustrates the importance of the total amount of stuff present. The more irrelevant (non-intersection) stuff is present, the lower the Jaccard Coefficient.


 Now pretend we have two bags. Bag 1 contains Set 1, and Bag 2 contains Set 2. I tell you the JC between these two sets is 95%. I reach into Bag 1 and I pull out the element "M". What is the probability that the element "M" is also in Bag 2? It's 95%. That's what the Jaccard Coefficient tells me. But now here is something quite amazing. What is the probability that the lowest symbol (lowest according to some ordering, like a hashcode) in Bag 1 is the *same* letter as the lowest letter in Bag 2. Again, it's 95%. Why? Some letter has to be the lowest one in a bag. Since only 5% of the letters in the other bag ought to be different, then there is a 95% chance that the lowest symbol in the second bag falls in the intersection of the sets, and is consequently the same.



Now it really gets interesting. If I group sets by a key, where the key is the lowest element of the set, then the probability that two sets get grouped by the same key is again the Jaccard Coefficient. That's cool. That's BADASS. Why? It's an almost shockingly simple way to create clusters of similar sets.

I find a lot of convoluted explanations of Minhashing, but you really don't need to know more than this guy's explanation. Here's a short piece of code, that given a sentence, splits the sentence into words, then builds N-grams out of the words (the N-grams are the set elements). Surrounding a word with ^ and $ is a well-known way for demarcating N-grams that are beginning and endings of words. A SHA-1 cryotographic hash function is a nice way to assign a random, but consistent ordering to any possible N-gram we might come up with. As suggested by this guy, rather than clustering on the *single* lowest element, we can tighten the clusters up by making the cluster key the numMins smallest elements, concatenated.  I use this in a mapreduce job, where I write the minhash out as the key from the mapper, and the reducer collects all the sentences with the same minHash key, thereby clustering similar sentences. And lo and behold...it works.

      protected String minHash(String sentence, int nGramLength, int numMins) throws Exception{
            MessageDigest md = MessageDigest.getInstance("SHA-1");
            PriorityQueue queue = new PriorityQueue();
            for(String part :sentence.split("\\s")){
                for(String gram:getNGrams("^"+part+"$",nGramLength)){
                    md.reset();
                    queue.add(ByteBuffer.wrap(md.digest(gram.getBytes("UTF-8"))).getLong());
                }
            }            
            StringBuilder minHash = new StringBuilder();
            while(null != queue.peek() && numMins > 0){
                minHash.append(queue.remove());
                --numMins;
            }
            return minHash.toString();
        }.