javahadoopmapreducehbaseapache-crunch

WordCount with Apache Crunch into HBase Standalone


Currently I'm evaluating Apache Crunch. I followed a simple WordCount MapReduce job example: Afterwards I try to save the results into a standalone HBase. HBase is running (checked with jps and HBase shell) as described here: http://hbase.apache.org/book/quickstart.html

Now I adopt the example for writing into HBase:

Pipeline pipeline = new MRPipeline(WordCount.class,getConf());
PCollection<String> lines = pipeline.readTextFile(inputPath);
PTable<String,Long> counts = noStopWords.count();
pipeline.write(counts, new HBaseTarget("wordCountOutTable");
PipelineResult result = pipeline.done();

I get an Exception: "exception:java.lang.illegalArgumentException: HBaseTarget only supports Put and Delete"

Any clues what went wrong?


Solution

  • PTable might be a PCollection, but HBaseTarget can only handle Put or Delete objects. So you have to convert the PTable to a PCollection where every element of the collection is either a Put or a Delete. Have a look at the Crunch-Examples where this is done.

    An example convert could look like this:

     public PCollection<Put> createPut(final PTable<String, String> counts) {
       return counts.parallelDo("Convert to puts", new DoFn<Pair<String, String>, Put>() {
         @Override
         public void process(final Pair<String, String> input, final Emitter<Put> emitter) {
           Put put;
           // input.first is used as row key
           put = new Put(Bytes.toBytes(input.first())); 
           // the value (input.second) is added with its family and qualifier
           put.add(COLUMN_FAMILY_TARGET, COLUMN_QUALIFIER_TARGET_TEXT, Bytes.toBytes(input.second())); 
           emitter.emit(put);
         }
       }, Writables.writables(Put.class));
     }