hadoop写入hbase数据
贴下代码,留作备用,
@Override public int run(String[] args) throws Exception { Configuration hbaseConf = HBaseConfiguration.create(); /* String whh = hbaseConf.get("hbase.zookeeper.quorum"); System.out.print(whh);*/ Config config = new Config(); config.initJarFile("mr_hbase.properties"); String numReduceTasksStr = config.getValue("numReduceTasks"); Integer numReduceTasks = 3; if (NumberUtils.isDigits(numReduceTasksStr)) { numReduceTasks = Integer.valueOf(numReduceTasksStr); } String hbaseZookeeperQuorum = config.getValue("hbase.zookeeper.quorum"); hbaseConf.set("hbase.zookeeper.quorum", hbaseZookeeperQuorum); String hbaseZookeeperPropertyClientPort = config.getValue("hbase.zookeeper.property.clientPort"); hbaseConf.set("hbase.zookeeper.property.clientPort", hbaseZookeeperPropertyClientPort); if (args.length > 2) { hbaseConf.set("hbase.zookeeper.quorum", args[2]); } Job job = Job.getInstance(hbaseConf); job.setJarByClass(BookKpUnitToHbaseMr.class); job.setMapperClass(BookKpUnitToHbaseMr.BookKpUnitToHbaseMapper.class); //将第一个路径参数作为输入参数 FileInputFormat.setInputPaths(job, new Path(args[0])); //将第二个HBase表参数作为输出参数 TableMapReduceUtil.initTableReducerJob(args[1], BookKpUnitToHbaseMr.BookKpUnitToHbaseReducer.class, job); -----> 设置reducer的时候,使用org.apache.hadoop.hbase.mapreduce类 job.setOutputKeyClass(Text.class); job.setOutputValueClass(StudentKpInfo.class); //设置任务个数 job.setNumReduceTasks(numReduceTasks); return job.waitForCompletion(true) ? 0 : 1; }
public static class toHbaseReducer extends TableReducer<Text, StudentScoreInfo, ImmutableBytesWritable> { ----> hbase的TableReducer设计只接受rowkey,其余的列簇,列名,列值按写入代码时灵活设置,因此这个类只有ImmutableBytesWritable @Override protected void reduce(Text key, Iterable<StudentScoreInfo> values, Context context) throws IOException, InterruptedException { try { String rowkey = key.toString(); Put put = new Put(rowkey.getBytes()); for (StudentScoreInfo studentScoreInfo : values) { put.addColumn(Bytes.toBytes("es"), Bytes.toBytes(studentScoreInfo.getStudentId()), Bytes.toBytes(studentScoreInfo.getStudentScoreValue())); // 写入列,参数1分别为 es表示列簇 参数2表示列名 参数3表示列值 } context.write(new ImmutableBytesWritable(Bytes.toBytes(rowkey)), put); // 将rowkey和这一列写入hbase } catch (Exception e) { logger.error("reduce error: ", e); } } }
相关推荐
Lzs 2020-10-23
聚合室 2020-11-16
零 2020-09-18
Justhavefun 2020-10-22
jacktangj 2020-10-14
ChaITSimpleLove 2020-10-06
Andrea0 2020-09-18
周游列国之仕子 2020-09-15
afanti 2020-09-16
88234852 2020-09-15
YClimb 2020-09-15
风雨断肠人 2020-09-04
卖口粥湛蓝的天空 2020-09-15
stulen 2020-09-15
pythonxuexi 2020-09-06
abfdada 2020-08-26
梦的天空 2020-08-25