Running MapReduce Job with HBase

Generally there are three different ways of interacting with HBase from a MapReduce application. HBase can be used as data source at the beginning of a job, as a data sink at the end of a job or as a shared resource.

  • HBase as a data source:  The following example using HBase as a MapReduce source in read-only manner. Specifically, there is a Mapper instance but no Reducer, and nothing is being emitted from the Mapper.
    package hbaseinaction;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.conf.Configured;
    import org.apache.hadoop.hbase.HBaseConfiguration;
    import org.apache.hadoop.hbase.client.Result;
    import org.apache.hadoop.hbase.client.Scan;
    import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
    import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
    import org.apache.hadoop.hbase.mapreduce.TableMapper;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
    import org.apache.hadoop.util.Tool;
    import org.apache.hadoop.util.ToolRunner;
    
    import java.io.IOException;
    
    /**
     * HBase as a data source example, you can write your own code in map(...) to read
     * data from the HBase table specified during job initialization.
     * In this case, the table is your_hbase_table_name.
     * <p/>
     * User: George Sun
     * Date: 7/21/13
     * Time: 12:42 AM
     */
    public class HBaseAsDataSource extends Configured implements Tool {
    
        @Override
        public int run(String[] args) throws Exception {
            Configuration config = HBaseConfiguration.create();
            Job job = new Job(config, "ExampleRead");
            job.setJarByClass(HBaseAsDataSource.class);     
    
            Scan scan = new Scan();
            // 1 is the default in Scan, which will be bad for MapReduce jobs
            scan.setCaching(500);
            // don't set to true for MR jobs
            scan.setCacheBlocks(false);
            // set other scan attrs here...
    
            TableMapReduceUtil.initTableMapperJob(
                    // input HBase table name
                    "your_hbase_table_name",
                    // Scan instance to control column family and attribute selection
                    scan,
                    MyMapper.class,   // mapper
                    null,             // mapper output key
                    null,             // mapper output value
                    job);
            // because we aren't emitting anything from mapper
            job.setOutputFormatClass(NullOutputFormat.class);
    
            return job.waitForCompletion(true) ? 0 : 1;
        }
    
        public static void main(String[] args) throws Exception {
            int exitCode = ToolRunner.run(new HBaseAsDataSource(), args);
            System.exit(exitCode);
        }
    
    
        public static class MyMapper extends TableMapper<Text, Text> {
    
            public void map(ImmutableBytesWritable row, Result result, Context context)
                    throws InterruptedException, IOException {
                // process data for the row from the Result instance.
                // For example, read data from HBase table, then populate it into HDFS.
            }
        }
    }
     
  • HBase as data sink:   Writing to a HBase table from MapReduce as a data sink is similar to reading from a table in terms of implementation. Of course you can use HBase as a data sink and use HBase as a data source at the same time. The following example using HBase both as a source and as a sink with MapReduce. This example will simply copy data from one table to another.
    package hbaseinaction;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.conf.Configured;
    import org.apache.hadoop.hbase.HBaseConfiguration;
    import org.apache.hadoop.hbase.KeyValue;
    import org.apache.hadoop.hbase.client.Put;
    import org.apache.hadoop.hbase.client.Result;
    import org.apache.hadoop.hbase.client.Scan;
    import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
    import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
    import org.apache.hadoop.hbase.mapreduce.TableMapper;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.util.Tool;
    import org.apache.hadoop.util.ToolRunner;
    
    import java.io.IOException;
    
    /**
     * HBase is used as data source as well as data sink. This MapReduce job will try to copy data from
     * the source table to the target table. Note that no reduce task needed.
     * <p/>
     * User: George Sun
     * Date: 7/21/13
     * Time: 12:55 AM
     */
    public class HBaseAsDataSourceSink extends Configured implements Tool {
    
        @Override
        public int run(String[] args) throws Exception {
            Configuration config = HBaseConfiguration.create();
            Job job = new Job(config, "ExampleReadWrite");
            job.setJarByClass(HBaseAsDataSourceSink.class);
    
            Scan scan = new Scan();
            // 1 is the default in Scan, which will be bad for MapReduce jobs
            scan.setCaching(500);
            // don't set to true for MR jobs
            scan.setCacheBlocks(false);
            // set other scan attrs
    
            TableMapReduceUtil.initTableMapperJob(
                    // input table
                    "your_hbase_source_table",
                    // Scan instance to control CF and attribute selection
                    scan,
                    // mapper class
                    MyMapper.class,
                    // mapper output key
                    null,
                    // mapper output value
                    null,
                    job);
            TableMapReduceUtil.initTableReducerJob(
                    // output table
                    "your_hbase_target_table",
                    // reducer class
                    null,
                    job);
            job.setNumReduceTasks(0);// No reducer actually needed here
    
            return job.waitForCompletion(true) ? 0 : 1;
        }
    
        public static void main(String[] args) throws Exception {
            int exitCode = ToolRunner.run(new HBaseAsDataSourceSink(), args);
            System.exit(exitCode);
        }
    
        public static class MyMapper extends TableMapper<ImmutableBytesWritable, Put> {
    
            public void map(ImmutableBytesWritable row, Result value, Context context)
                    throws IOException, InterruptedException {
    
                // this example is just copying the data from the source table...
                context.write(row, resultToPut(row, value));
            }
    
            private static Put resultToPut(ImmutableBytesWritable key, Result result) throws IOException {
                Put put = new Put(key.get());
                for (KeyValue kv : result.raw()) {
                    put.add(kv);
                }
                return put;
            }
        }
    }
     
  • HBase used as shared resource to do map-side join:    As we know, HBase could be considered as a giant hable-table, it would be clear that HBase is a perfect condidate to be used within a map-side join.
  • HBase MapReduce read/write with multi-table output:  Leverage to MultiTableInputFormat and MultiTableOutputFormat shipped with HBase. Take MultiTableInputFormat as an example.
    List<Scan> scans = new ArrayList<Scan>();
      
    Scan scan1 = new Scan();
    scan1.setStartRow(firstRow1);
    scan1.setStopRow(lastRow1);
    scan1.setAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME, table1);
    scans.add(scan1);
    
    Scan scan2 = new Scan();
    scan2.setStartRow(firstRow2);
    scan2.setStopRow(lastRow2);
    scan1.setAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME, table2);
    scans.add(scan2);
    
    // Initialized with more than one scan to read data from multiple tables.
    TableMapReduceUtil.initTableMapperJob(scans, TableMapper.class, Text.class,
         IntWritable.class, job);
    You can find an example of MultiTableOutputFormat from this blog post:
  • There's also a few predefined MapReduce jobs ship with HBase under package org.apache.hadoop.hbase.mapreduce, such as Export to export data from HBase into HDFS, Import to import data from HDFS into HBase, CopyTable to copy data from one HBase table to another. You can explore their source code for more examples of using HBase from MapReduce. 

Important note from HBase in Action: Hadoop MapReduce assumes your map and reduce tasks are idempotent. This means the map and reduce tasks can be run any number of times with the same input and produce the same output. This allows MapReduce to provide fault tolerance in job execution and also take maximum advantage of cluster processing power. You must take care when performing stateful operations. HBase's Increment is an example of such a stateful operation.

So instead of incrementing a counter in mapper, a better approach is to emit ["counter", 1] pairs from each mapper. Failed tasks are recovered, and their ourput isn't double-counterd. Sum the pairs in reducer, and write out a single value from there. This also avoids an unduly high bunden being applied to the single machine hosting the invremented cell.

相关推荐