将hdfs文件保存到hbase中

 0.0  wlan.dat文件内容:

1363157985066 	13726230503	00-FD-07-A4-72-B8:CMCC	120.196.100.82	i02.c.aliimg.com		24	27	2481	24681	200
1363157995052 	13826544101	5C-0E-8B-C7-F1-E0:CMCC	120.197.40.4			4	0	264	0	200
1363157991076 	13926435656	20-10-7A-28-CC-0A:CMCC	120.196.100.99			2	4	132	1512	200
1363154400022 	13926251106	5C-0E-8B-8B-B1-50:CMCC	120.197.40.4			4	0	240	0	200
1363157993044 	18211575961	94-71-AC-CD-E6-18:CMCC-EASY	120.196.100.99	iface.qiyi.com	视频网站	15	12	1527	2106	200
1363157995074 	84138413	5C-0E-8B-8C-E8-20:7DaysInn	120.197.40.4	122.72.52.12		20	16	4116	1432	200
1363157993055 	13560439658	C4-17-FE-BA-DE-D9:CMCC	120.196.100.99			18	15	1116	954	200
1363157995033 	15920133257	5C-0E-8B-C7-BA-20:CMCC	120.197.40.4	sug.so.360.cn	信息安全	20	20	3156	2936	200
1363157983019 	13719199419	68-A1-B7-03-07-B1:CMCC-EASY	120.196.100.82			4	0	240	0	200
1363157984041 	13660577991	5C-0E-8B-92-5C-20:CMCC-EASY	120.197.40.4	s19.cnzz.com	站点统计	24	9	6960	690	200
1363157973098 	15013685858	5C-0E-8B-C7-F7-90:CMCC	120.197.40.4	rank.ie.sogou.com	搜索引擎	28	27	3659	3538	200
1363157986029 	15989002119	E8-99-C4-4E-93-E0:CMCC-EASY	120.196.100.99	www.umeng.com	站点统计	3	3	1938	180	200
1363157992093 	13560439658	C4-17-FE-BA-DE-D9:CMCC	120.196.100.99			15	9	918	4938	200
1363157986041 	13480253104	5C-0E-8B-C7-FC-80:CMCC-EASY	120.197.40.4			3	3	180	180	200
1363157984040 	13602846565	5C-0E-8B-8B-B6-00:CMCC	120.197.40.4	2052.flash2-http.qq.com	综合门户	15	12	1938	2910	200
1363157995093 	13922314466	00-FD-07-A2-EC-BA:CMCC	120.196.100.82	img.qfc.cn		12	12	3008	3720	200
1363157982040 	13502468823	5C-0A-5B-6A-0B-D4:CMCC-EASY	120.196.100.99	y0.ifengimg.com	综合门户	57	102	7335	110349	200
1363157986072 	18320173382	84-25-DB-4F-10-1A:CMCC-EASY	120.196.100.99	input.shouji.sogou.com	搜索引擎	21	18	9531	2412	200
1363157990043 	13925057413	00-1F-64-E1-E6-9A:CMCC	120.196.100.55	t3.baidu.com	搜索引擎	69	63	11058	48243	200
1363157988072 	13760778710	00-FD-07-A4-7B-08:CMCC	120.196.100.82			2	2	120	120	200
1363157985079 	13823070001	20-7C-8F-70-68-1F:CMCC	120.196.100.99			6	3	360	180	200
1363157985069 	13600217502	00-1F-64-E2-E8-B1:CMCC	120.196.100.55			18	138	1080	186852	200

0 将上网日志数据传到hdfs中

    hdfs dfs -put /opt/wlan.dat  /zmdata/

1.在HBase中创建表wlan_log
   create 'wlan' ,'cf'

2.确定行键是什么
  手机号码:时间戳      

如果仅仅用手机号作为行健,那么同一手机号作为行健下,hbase的数据会被覆盖掉, 上述文件中,手机号

13560439658出现了两次重复,这里组成主键的时间戳 应该用 yyyyMMddHHmmssSSSS 最好带上毫秒,

否则依旧会出现最后hbase输出主键重复下只输出21条的结果,测试过,出现过这种情况(hbase主键相同下 数据会被覆盖)

这是使用SSSS后的主键:

13560439658:201602280032110142

13560439658:201602280032110143

3.代码

注意: 

1 mapper, reducer类都需要 static ,否则执行时候会报找不到对应类错误

2 FileInputFormat,TextInputFormat要引用org.apache.hadoop.mapreduce.lib.input下

3 因为写出数据到hbase 因此fileoutputformat不需要了

package hbase;

import java.io.IOException;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Date;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class HbaseMRImport {
	
	
	// Mapper<后的泛型中,前两个参数表示k1 v1类型,后两个泛型参数表示k2,v2类型
	static class HbaseMRMapper extends Mapper<LongWritable, Text, LongWritable, Text> {
		Text v2 = new Text();
		@Override
		protected void map(LongWritable k1, Text v1,  Context context) throws IOException, InterruptedException {
			String v1str = v1.toString();
			String[] v1arr = v1str.split("\t");
			String phone = v1arr[1]; // 得到手机号
			DateFormat df = new SimpleDateFormat("yyyyMMddHHmmssSSSS");
			String dateStr = df.format(new Date());
			v2.set(phone+":"+dateStr + "\t" + v1.toString()); // 将手机号和当前时间戳作为第一个字段,然后当前行其余字段作为剩余字段,重新写出到reduce阶段
			context.write(k1, v2);
		}
	}
	
	static class HbaseMRReduce extends TableReducer<LongWritable, Text, NullWritable> {
		String family = "cf";
		@Override
		protected void reduce(LongWritable k2, Iterable<Text> v2s, Context context) throws IOException, InterruptedException {
			 for(Text v2 : v2s) {
				 String v2Str = v2.toString();
				 String[] splited = v2Str.split("\t");
				 String rowkey = splited[0];
				 
				Put put = new Put(rowkey.getBytes());
				put.add(family.getBytes(), "raw".getBytes(), v2.toString().getBytes()); // 将正行都保存起来,下面是将每个字段单独保存 方便灵活获取不同需求下数据
				put.add(family.getBytes(), "rePortTime".getBytes(), splited[1].getBytes());
				put.add(family.getBytes(), "msisdn".getBytes(), splited[2].getBytes());
				put.add(family.getBytes(), "apmac".getBytes(), splited[3].getBytes());
				put.add(family.getBytes(), "acmac".getBytes(), splited[4].getBytes());
				put.add(family.getBytes(), "host".getBytes(), splited[5].getBytes());
				put.add(family.getBytes(), "siteType".getBytes(), splited[6].getBytes());
				put.add(family.getBytes(), "upPackNum".getBytes(), splited[7].getBytes());
				put.add(family.getBytes(), "downPackNum".getBytes(), splited[8].getBytes());
				put.add(family.getBytes(), "upPayLoad".getBytes(), splited[9].getBytes());
				put.add(family.getBytes(), "downPayLoad".getBytes(), splited[10].getBytes());
				put.add(family.getBytes(), "httpStatus".getBytes(), splited[11].getBytes());
				
				context.write(NullWritable.get(), put);
			 }
		}
	}

	/**
	 * @throws Exception 
	 * 
	 */
	public static void main(String[] args) throws Exception {
		
		// 0 初始化conf文件
		Configuration conf = new Configuration();
		conf.set("hbase.zookeeper.quorum", "hadoop3"); // 这里hbase是单节点,没有使用外部zk 这行和下面这行不加入测试依旧可以
		conf.set("hbase.rootdir", "hdfs://hadoop3:9000/hbase"); 
		conf.set(TableOutputFormat.OUTPUT_TABLE, "wlan");
		// 0.1 定义任务
		Job job = new Job(conf,HbaseMRImport.class.getSimpleName());
		TableMapReduceUtil.addDependencyJars(job);
		job.setJarByClass(HbaseMRImport.class);
		
		// 1 设置自定义的mapper  reducer类的处理逻辑
		job.setMapperClass(HbaseMRMapper.class);
	    job.setReducerClass(HbaseMRReduce.class);
	    
	    // 2 设置 mapper类的 k2 v2输出类型
	    job.setMapOutputKeyClass(LongWritable.class);
	    job.setMapOutputValueClass(Text.class);
	    
	    // 设置reduce最后输出的 k3 v3类型 这里是对表输出 没有设置
	    /*job.setOutputKeyClass(theClass);
	    job.setOutputValueClass(theClass);*/
	    
	    // 3 设置读取文件 format 和输出文件 format
	    job.setInputFormatClass(TextInputFormat.class);
	    job.setOutputFormatClass(TableOutputFormat.class);
		
	    // 4 指定Job的输入源 和 输出目标
	    FileInputFormat.setInputPaths(job, "hdfs://hadoop3:9000/zmdata/wlan.dat");
	    // FileOutputFormat.setOutputPath(job, outputDir); 如果是写出到hdfs 那么需要目标hdfs文件位置
	    
	    // 5 提交Job
	    job.waitForCompletion(true);
	}

	
	
}

4.使用ant发到远程linux服务器运行,把HBase的相关jar包放到HADOOP_CLASSPATH中
  find / -name hadoop.env.sh  查找文件
  在hadoop-env.sh文件中增加如下代码:
  export HADOOP_CLASSPATH=/usr/local/hbase-0.98.8-hadoop2/lib/*

 或者将hbase的lib都拷贝到hadoop节点某一个文件夹下 然后在引入

 可以参看 hbase java操作代码简介和NoClassDefFoundError: org/apache/hadoop/hbase/HBaseConfiguratio 

 ant脚本:

<?xml version="1.0" encoding="UTF-8"?>

<project name="项目名称" basedir="." default="sshexec">
	<description>本配置文件供ANT编译项目、自动进行单元测试、打包并部署之用。</description>
	<description>默认操作(输入命令:ant)为编译源程序并发布运行。</description>

	<!--属性设置-->
	<property environment="env" />
	<property file="build.properties" />
	<property name="src.dir" value="${basedir}/src" />
	<property name="java.lib.dir" value="${env.JAVA_HOME}/lib" />
	<property name="classes.dir" value="${basedir}/classes" />
	<property name="dist.dir" value="${basedir}/dist" />
	<property name="third.lib.dir" value="${basedir}/lib" />
	<property name="localpath.dir" value="${basedir}" />
	<property name="remote.host" value="hadoop3"/>
	<property name="remote.username" value="root"/>
	<property name="remote.password" value="123456"/>
	<property name="remote.home" value="~"/>
	<!--每次需要知道的main类,写到这里-->
	<property name="main.class" value="hbase.MyHbaseAPI"/>

	<!-- 基本编译路径设置 -->
	<path id="compile.classpath">
		<fileset dir="${java.lib.dir}">
			<include name="tools.jar" />
		</fileset>
		<fileset dir="${third.lib.dir}">
			<include name="*.jar"/>
		</fileset>		
	</path>
	
	<!-- 运行路径设置 -->
	<path id="run.classpath">
		<path refid="compile.classpath" />
		<pathelement location="${classes.dir}" />
	</path>
	<!-- 清理,删除临时目录 -->
	<target name="clean" description="清理,删除临时目录">
		<!--delete dir="${build.dir}" /-->
		<delete dir="${dist.dir}" />
		<delete dir="${classes.dir}" />
		<echo level="info">清理完毕</echo>
	</target>
	<!-- 初始化,建立目录,复制文件 -->
	<target name="init" depends="clean" description="初始化,建立目录,复制文件">
		<mkdir dir="${classes.dir}" />
		<mkdir dir="${dist.dir}" />
	</target>
	<!-- 编译源文件-->
	<target name="compile" depends="init" description="编译源文件">
		<javac srcdir="${src.dir}" destdir="${classes.dir}" source="1.7" target="1.7"  includeAntRuntime="false" debug="false" verbose="false">
		    <compilerarg line="-encoding UTF-8 "/> 
			<classpath refid="compile.classpath" />
		</javac>
	</target>
	<!-- 打包类文件 -->
	<target name="jar" depends="compile" description="打包类文件">
		<jar jarfile="${dist.dir}/jar.jar">
			<fileset dir="${classes.dir}" includes="**/*.*" />
		</jar>
	</target>
	
	<!--上传到服务器
	**需要把lib目录下的jsch-0.1.51拷贝到$ANT_HOME/lib下,如果是Eclipse下的Ant环境必须在Window->Preferences->Ant->Runtime->Classpath中加入jsch-0.1.51。
	-->
	<target name="ssh" depends="jar">
		<scp file="${dist.dir}/jar.jar" todir="${remote.username}@${remote.host}:${remote.home}" password="${remote.password}" trust="true"/>
	</target>
	
	<target name="sshexec" depends="ssh">
	      <sshexec host="${remote.host}" username="${remote.username}"  password="${remote.password}" trust="true" command="source /etc/profile;hadoop jar ${remote.home}/jar.jar ${main.class}"/>
	</target>
</project>

5   手机上网日志字段介绍:
将hdfs文件保存到hbase中
 

相关推荐