4

I currently have a MapReduce job that uses MultipleOutputs to send data to several HDFS locations. After that completes, I am using HBase client calls (outside of MR) to add some of the same elements to a few HBase tables. It would be nice to add the HBase outputs as just additional MultipleOutputs, using TableOutputFormat. In that way, I would distribute my HBase processing.

Problem is, I cannot get this to work. Has anyone ever used TableOutputFormat in MultipleOutputs...? With multiple HBase outputs?

basically, I am setting up my collectors, like this....

Outputcollector<ImmutableBytesWritable, Writable> hbaseCollector1 = multipleOutputs.getCollector("hbase1", reporter); 
Outputcollector<ImmutableBytesWritable, Writable> hbaseCollector2 = multipleOutputs.getCollector("hbase2", reporter); 
Put put = new Put(mykey.getBytes());
put.add("family".getBytes(), "column".getBytes(), somedata1);
hbaseCollector1.collect(NullWritable.get(), put);

put = new Put(mykey.getBytes());
put.add("family".getBytes(), "column".getBytes(), somedata2);
hbaseCollector2.collect(newImmutableBytesWritable(mykey.getBytes()), put);

This seems to follow the general idea of hbase writing, I think.

Part of the issue, as I type this, might be more in the job definition. Looks like MR (and Hbase) want a global parameter set, like this....

conf.set(TableOutputFormat.OUTPUT_TABLE, "articles");

to provide the table name. Trouble is, I have two tables....

Any ideas?

Thanks

Wanderer
  • 1,583
  • 4
  • 20
  • 36
  • I just changed the code above to use ImmutableBytesWritable, instead of NullWritable. That gave me the ability to write to one HBase table. Still trying to figure out how to write to more than one. – Wanderer May 12 '11 at 19:58

3 Answers3

4

I've put data into HBase 3 different ways. The most efficient (and distributed) was using the HFileOutputFormat class.

I set up the job as follows... (Please note this is edited from actual code, but core content remains)

cubeBuilderETLJob.setJobName(jobName);
cubeBuilderETLJob.setMapOutputKeyClass(ImmutableBytesWritable.class);
cubeBuilderETLJob.setMapOutputValueClass(Put.class);
cubeBuilderETLJob.setMapperClass(HiveToHBaseMapper.class);      
cubeBuilderETLJob.setJarByClass(CubeBuilderDriver.class);       
cubeBuilderETLJob.setInputFormatClass(TextInputFormat.class);
cubeBuilderETLJob.setOutputFormatClass(HFileOutputFormat.class);
HFileOutputFormat.setOutputPath(cubeBuilderETLJob, cubeOutputPath);
HTable hTable = null;
Configuration hConf = HBaseConfiguration.create(conf);
hConf.set("ZOOKEEPER_QUORUM", hbaseZookeeperQuorum);
hConf.set("ZOOKEEPER_CLIENTPORT", hbaseZookeeperClientPort);
hTable = new HTable(hConf, tableName);
HFileOutputFormat.configureIncrementalLoad(cubeBuilderETLJob, hTable);

As we can see, my Mapper class is called HiveToHBaseMapper - Nice and original. :) Here's the (again, rough) definition of it

public class HiveToHBaseMapper extends
    Mapper<WritableComparable, Writable, ImmutableBytesWritable, Put> {
@Override
public void map(WritableComparable key, Writable val, Context context)
    throws IOException, InterruptedException {
    Configuration config = context.getConfiguration();
    String family = config.get("FAMILY");
    Double value = Double.parseDouble(sValue);
    String sKey = generateKey(config);
    byte[] bKey = Bytes.toBytes(sKey);
    Put put = new Put(bKey);
    put.add(Bytes.toBytes(family), Bytes.toBytes(column), (value <= 0) 
        ? Bytes.toBytes(Double.MIN_VALUE)
        : Bytes.toBytes(value));        
    ImmutableBytesWritable ibKey = new ImmutableBytesWritable(bKey);
    context.write(ibKey, put);
}

I don't know if you can use this to fit it into the MultipleOutputs or need to create a new MR job. This is the best way I've come across to get data into HBase. :)

This will hopefully get you in the right direction to finding a solution.

QuinnG
  • 6,346
  • 2
  • 39
  • 47
  • Thanks... I will take a look... but still need (or would like) to figure out if it is possible to get the MultipleOutputs working. – Wanderer May 12 '11 at 20:20
  • Maybe, like you indicated... I just need to create a new MR job... I just have all the data right were I need it.. and just want to give it to HBase (in several tables). – Wanderer May 12 '11 at 20:21
  • @John Galt... who: In my case, I only have 1 table per MR. I don't know if more is possible or not. – QuinnG May 12 '11 at 20:53
  • Can i still use this when my input data (rowKey) arrives in random? – raj Apr 26 '12 at 10:45
1

In my experience, the best approach is to just put the data into the hbase table as soon as you have it available (unless you are bulk loading data). If you have the data available in your map task, that is the best time to push it to hbase. If you don't have the data until a reduce task, then add the push to hbase there. Until you know that HBase is your bottleneck, let HBase worry about the caching issues.

David
  • 3,251
  • 18
  • 28
  • Actually, that is what I am doing now. I am using HBase client calls (Puts) in my reduce job to write to my tables. It is working fine, but I was told that using OutputFormats would be much better in getting data to HBase, hence TableOutputFormat. While this works, apparently not for MulipleOutputs in the mapred package. – Wanderer May 13 '11 at 15:45
1

So, apparently, this not possible with the old mapred packages. There is a new OutputFormat in the mapreduce package set, but I don't want to convert to that right now. So, I will have to write multiple MR jobs.

Wanderer
  • 1,583
  • 4
  • 20
  • 36