Mapreduce导入数据到HBase中

MapReuce导入数据文件到HBASE表中

1. 代码编写

/**
 * 参考 org.apache.hadoop.hbase.mapreduce.ImportTsv
 * org.apache.hadoop.hbase.mapreduce.TsvImporterMapper
 * @author Hyman
 */
public class ImportEmp  extends Configured implements Tool{
	
	public static final String COLUMN_FAMILY = "info";
	public static final String[] COLUMNS = new String[]{"rowkey","name","deptname","leader","joindate","sal","exp","deptno"};
	
	//7499    ALLEN   SALESMAN        7698    1981-2-20       1600.00 300.00  30
	static class ImportEmpMapper extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put>{
		
		ImmutableBytesWritable outkey = new ImmutableBytesWritable();

		@Override
		protected void map(LongWritable key, Text value,
				Mapper<LongWritable, Text, ImmutableBytesWritable, Put>.Context context)
						throws IOException, InterruptedException {
			
			String line = value.toString();
			
			//TODO validate data
			// .... 
			
			String[] fields = new String[8];
				StringTokenizer token = new StringTokenizer(line);
			int i = 0;
			while (token.hasMoreTokens()){
				fields[i++] = token.nextToken();
			}
			
			outkey.set(Bytes.toBytes(fields[0]));
			Put put = new Put(Bytes.toBytes(fields[0]));
			
			for(int index=1;index<8 ;index++){
				if(StringUtils.isNotEmpty(fields[index]))
					put.add(Bytes.toBytes(COLUMN_FAMILY),Bytes.toBytes(COLUMNS[index]), Bytes.toBytes(fields[index]));
			}
			
			context.write(outkey,put);
		}
		
	}

	public int run(String[] args) throws Exception {
		
		Job job = Job.getInstance(this.getConf(), this.getClass().getSimpleName());
        job.setJarByClass(this.getClass());

		
        job.setMapperClass(ImportEmpMapper.class);
        
        job.setMapOutputKeyClass(ImmutableBytesWritable.class);
        job.setMapOutputValueClass(Put.class);
        
        FileInputFormat.addInputPath(job, new Path(args[1]));
        
		TableMapReduceUtil.initTableReducerJob(
		  args[0],        // output table
		  null,    // reducer class
		  job//
		);
		
		job.setNumReduceTasks(0);   // at least one, adjust as required
		
        int exitcode = job.waitForCompletion(true) ? 0 : 1;
		return exitcode;
	}
	
	public static void main(String[] args) throws Exception{
		
		Configuration conf = HBaseConfiguration.create();
		
		int exitcode = ToolRunner.run(//
				conf, //
				new ImportEmp(), //
				args//
			);
        System.exit(exitcode);
	}
}

 2.打包运行

在hbase中创建emp表(create 'emp','info')。export  jar包($HADOOP_HOME/jars/mapred.jar)

准备数据:上传到HDFS文件系统中(/user/ehp/hbase/importtsv/emp/input)

数据文件 emp.txt

7369	HEHE	CLERK	7902	1980-12-17	800.00		20
7499	ALLEN	SALESMAN	7698	1981-2-20	1600.00	300.00	30
7521	WARD	SALESMAN	7698	1981-2-22	1250.00	500.00	30
7566	JONES	MANAGER	7839	1981-4-2	2975.00		20
7654	MARTIN	SALESMAN	7698	1981-9-28	1250.00	1400.00	30
7698	BLAKE	MANAGER	7839	1981-5-1	2850.00		30
7782	CLARK	MANAGER	7839	1981-6-9	2450.00		10
7788	SCOTT	ANALYST	7566	1987-4-19	3000.00		20
7839	KING	PRESIDENT		1981-11-17	5000.00		10
7844	TURNER	SALESMAN	7698	1981-9-8	1500.00	0.00	30
7876	ADAMS	CLERK	7788	1987-5-23	1100.00		20
7900	JAMES	CLERK	7698	1981-12-3	950.00		30
7902	FORD	ANALYST	7566	1981-12-3	3000.00		20
HADOOP_CLASSPATH=`${HBASE_HOME}/bin/hbase mapredcp`:${HBASE_HOME}/conf \
yarn jar $HADOOP_HOME/jars/mapred.jar \
com.hyman.ehp.mapred.hbase.ImportEmp \
emp \
/user/ehp/hbase/importtsv/emp/input

 相关内容

  • hbase 数据导入 bulkload 方式

相关推荐