hadoop写入hbase数据
贴下代码,留作备用,
@Override
    public int run(String[] args) throws Exception {
        Configuration hbaseConf = HBaseConfiguration.create();
       /* String whh = hbaseConf.get("hbase.zookeeper.quorum");
        System.out.print(whh);*/
        Config config = new Config();
        config.initJarFile("mr_hbase.properties");
        String numReduceTasksStr = config.getValue("numReduceTasks");
        Integer numReduceTasks = 3;
        if (NumberUtils.isDigits(numReduceTasksStr)) {
            numReduceTasks = Integer.valueOf(numReduceTasksStr);
        }
        String hbaseZookeeperQuorum = config.getValue("hbase.zookeeper.quorum");
        hbaseConf.set("hbase.zookeeper.quorum", hbaseZookeeperQuorum);
        String hbaseZookeeperPropertyClientPort = config.getValue("hbase.zookeeper.property.clientPort");
        hbaseConf.set("hbase.zookeeper.property.clientPort", hbaseZookeeperPropertyClientPort);
        if (args.length > 2) {
            hbaseConf.set("hbase.zookeeper.quorum", args[2]);
        }
        Job job = Job.getInstance(hbaseConf);
        job.setJarByClass(BookKpUnitToHbaseMr.class);
        job.setMapperClass(BookKpUnitToHbaseMr.BookKpUnitToHbaseMapper.class);
        //将第一个路径参数作为输入参数
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        //将第二个HBase表参数作为输出参数
        TableMapReduceUtil.initTableReducerJob(args[1], BookKpUnitToHbaseMr.BookKpUnitToHbaseReducer.class, job);  -----> 设置reducer的时候,使用org.apache.hadoop.hbase.mapreduce类
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(StudentKpInfo.class);
        //设置任务个数
        job.setNumReduceTasks(numReduceTasks);
        return job.waitForCompletion(true) ? 0 : 1;
    }public static class toHbaseReducer extends TableReducer<Text, StudentScoreInfo, ImmutableBytesWritable> {   ----> hbase的TableReducer设计只接受rowkey,其余的列簇,列名,列值按写入代码时灵活设置,因此这个类只有ImmutableBytesWritable
        @Override
        protected void reduce(Text key, Iterable<StudentScoreInfo> values, Context context) throws IOException, InterruptedException {
            try {
                String rowkey = key.toString();
                Put put = new Put(rowkey.getBytes());
                for (StudentScoreInfo studentScoreInfo : values) {
                    put.addColumn(Bytes.toBytes("es"), Bytes.toBytes(studentScoreInfo.getStudentId()), Bytes.toBytes(studentScoreInfo.getStudentScoreValue()));  // 写入列,参数1分别为 es表示列簇    参数2表示列名  参数3表示列值 
                }
                context.write(new ImmutableBytesWritable(Bytes.toBytes(rowkey)), put);  // 将rowkey和这一列写入hbase
            } catch (Exception e) {
                logger.error("reduce error: ", e);
            }
        }
    } 相关推荐
  Lzs    2020-10-23  
   聚合室    2020-11-16  
   零    2020-09-18  
   Justhavefun    2020-10-22  
   ChaITSimpleLove    2020-10-06  
   周游列国之仕子    2020-09-15  
   afanti    2020-09-16  
   88234852    2020-09-15  
   YClimb    2020-09-15  
   风雨断肠人    2020-09-04  
   卖口粥湛蓝的天空    2020-09-15  
   stulen    2020-09-15  
   pythonxuexi    2020-09-06  
   abfdada    2020-08-26  
   梦的天空    2020-08-25  
 