Mapreduce导入数据到HBase中
MapReuce导入数据文件到HBASE表中
1. 代码编写
/** * 参考 org.apache.hadoop.hbase.mapreduce.ImportTsv * org.apache.hadoop.hbase.mapreduce.TsvImporterMapper * @author Hyman */ public class ImportEmp extends Configured implements Tool{ public static final String COLUMN_FAMILY = "info"; public static final String[] COLUMNS = new String[]{"rowkey","name","deptname","leader","joindate","sal","exp","deptno"}; //7499 ALLEN SALESMAN 7698 1981-2-20 1600.00 300.00 30 static class ImportEmpMapper extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put>{ ImmutableBytesWritable outkey = new ImmutableBytesWritable(); @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, ImmutableBytesWritable, Put>.Context context) throws IOException, InterruptedException { String line = value.toString(); //TODO validate data // .... String[] fields = new String[8]; StringTokenizer token = new StringTokenizer(line); int i = 0; while (token.hasMoreTokens()){ fields[i++] = token.nextToken(); } outkey.set(Bytes.toBytes(fields[0])); Put put = new Put(Bytes.toBytes(fields[0])); for(int index=1;index<8 ;index++){ if(StringUtils.isNotEmpty(fields[index])) put.add(Bytes.toBytes(COLUMN_FAMILY),Bytes.toBytes(COLUMNS[index]), Bytes.toBytes(fields[index])); } context.write(outkey,put); } } public int run(String[] args) throws Exception { Job job = Job.getInstance(this.getConf(), this.getClass().getSimpleName()); job.setJarByClass(this.getClass()); job.setMapperClass(ImportEmpMapper.class); job.setMapOutputKeyClass(ImmutableBytesWritable.class); job.setMapOutputValueClass(Put.class); FileInputFormat.addInputPath(job, new Path(args[1])); TableMapReduceUtil.initTableReducerJob( args[0], // output table null, // reducer class job// ); job.setNumReduceTasks(0); // at least one, adjust as required int exitcode = job.waitForCompletion(true) ? 0 : 1; return exitcode; } public static void main(String[] args) throws Exception{ Configuration conf = HBaseConfiguration.create(); int exitcode = ToolRunner.run(// conf, // new ImportEmp(), // args// ); System.exit(exitcode); } }
2.打包运行
在hbase中创建emp表(create 'emp','info')。export jar包($HADOOP_HOME/jars/mapred.jar)
准备数据:上传到HDFS文件系统中(/user/ehp/hbase/importtsv/emp/input)
数据文件 emp.txt
7369 HEHE CLERK 7902 1980-12-17 800.00 20 7499 ALLEN SALESMAN 7698 1981-2-20 1600.00 300.00 30 7521 WARD SALESMAN 7698 1981-2-22 1250.00 500.00 30 7566 JONES MANAGER 7839 1981-4-2 2975.00 20 7654 MARTIN SALESMAN 7698 1981-9-28 1250.00 1400.00 30 7698 BLAKE MANAGER 7839 1981-5-1 2850.00 30 7782 CLARK MANAGER 7839 1981-6-9 2450.00 10 7788 SCOTT ANALYST 7566 1987-4-19 3000.00 20 7839 KING PRESIDENT 1981-11-17 5000.00 10 7844 TURNER SALESMAN 7698 1981-9-8 1500.00 0.00 30 7876 ADAMS CLERK 7788 1987-5-23 1100.00 20 7900 JAMES CLERK 7698 1981-12-3 950.00 30 7902 FORD ANALYST 7566 1981-12-3 3000.00 20
HADOOP_CLASSPATH=`${HBASE_HOME}/bin/hbase mapredcp`:${HBASE_HOME}/conf \ yarn jar $HADOOP_HOME/jars/mapred.jar \ com.hyman.ehp.mapred.hbase.ImportEmp \ emp \ /user/ehp/hbase/importtsv/emp/input
相关内容
- hbase 数据导入 bulkload 方式
相关推荐
yangkang 2020-11-09
lbyd0 2020-11-17
sushuanglei 2020-11-12
85477104 2020-11-17
KANSYOUKYOU 2020-11-16
wushengyong 2020-10-28
lizhengjava 2020-11-13
星月情缘 2020-11-13
huangxiaoyun00 2020-11-13
luyong0 2020-11-08
腾讯soso团队 2020-11-06
Apsaravod 2020-11-05
PeterChangyb 2020-11-05
gaobudong 2020-11-04
wwwjun 2020-11-02
gyunwh 2020-11-02
EchoYY 2020-10-31
dingyahui 2020-10-30