将hdfs文件保存到hbase中
0.0 wlan.dat文件内容:
1363157985066 13726230503 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 24 27 2481 24681 200 1363157995052 13826544101 5C-0E-8B-C7-F1-E0:CMCC 120.197.40.4 4 0 264 0 200 1363157991076 13926435656 20-10-7A-28-CC-0A:CMCC 120.196.100.99 2 4 132 1512 200 1363154400022 13926251106 5C-0E-8B-8B-B1-50:CMCC 120.197.40.4 4 0 240 0 200 1363157993044 18211575961 94-71-AC-CD-E6-18:CMCC-EASY 120.196.100.99 iface.qiyi.com 视频网站 15 12 1527 2106 200 1363157995074 84138413 5C-0E-8B-8C-E8-20:7DaysInn 120.197.40.4 122.72.52.12 20 16 4116 1432 200 1363157993055 13560439658 C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 18 15 1116 954 200 1363157995033 15920133257 5C-0E-8B-C7-BA-20:CMCC 120.197.40.4 sug.so.360.cn 信息安全 20 20 3156 2936 200 1363157983019 13719199419 68-A1-B7-03-07-B1:CMCC-EASY 120.196.100.82 4 0 240 0 200 1363157984041 13660577991 5C-0E-8B-92-5C-20:CMCC-EASY 120.197.40.4 s19.cnzz.com 站点统计 24 9 6960 690 200 1363157973098 15013685858 5C-0E-8B-C7-F7-90:CMCC 120.197.40.4 rank.ie.sogou.com 搜索引擎 28 27 3659 3538 200 1363157986029 15989002119 E8-99-C4-4E-93-E0:CMCC-EASY 120.196.100.99 www.umeng.com 站点统计 3 3 1938 180 200 1363157992093 13560439658 C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 15 9 918 4938 200 1363157986041 13480253104 5C-0E-8B-C7-FC-80:CMCC-EASY 120.197.40.4 3 3 180 180 200 1363157984040 13602846565 5C-0E-8B-8B-B6-00:CMCC 120.197.40.4 2052.flash2-http.qq.com 综合门户 15 12 1938 2910 200 1363157995093 13922314466 00-FD-07-A2-EC-BA:CMCC 120.196.100.82 img.qfc.cn 12 12 3008 3720 200 1363157982040 13502468823 5C-0A-5B-6A-0B-D4:CMCC-EASY 120.196.100.99 y0.ifengimg.com 综合门户 57 102 7335 110349 200 1363157986072 18320173382 84-25-DB-4F-10-1A:CMCC-EASY 120.196.100.99 input.shouji.sogou.com 搜索引擎 21 18 9531 2412 200 1363157990043 13925057413 00-1F-64-E1-E6-9A:CMCC 120.196.100.55 t3.baidu.com 搜索引擎 69 63 11058 48243 200 1363157988072 13760778710 00-FD-07-A4-7B-08:CMCC 120.196.100.82 2 2 120 120 200 1363157985079 13823070001 20-7C-8F-70-68-1F:CMCC 120.196.100.99 6 3 360 180 200 1363157985069 13600217502 00-1F-64-E2-E8-B1:CMCC 120.196.100.55 18 138 1080 186852 200
0 将上网日志数据传到hdfs中
hdfs dfs -put /opt/wlan.dat /zmdata/
1.在HBase中创建表wlan_log
create 'wlan' ,'cf'
2.确定行键是什么
手机号码:时间戳
如果仅仅用手机号作为行健,那么同一手机号作为行健下,hbase的数据会被覆盖掉, 上述文件中,手机号
13560439658出现了两次重复,这里组成主键的时间戳 应该用 yyyyMMddHHmmssSSSS 最好带上毫秒,
否则依旧会出现最后hbase输出主键重复下只输出21条的结果,测试过,出现过这种情况(hbase主键相同下 数据会被覆盖)
这是使用SSSS后的主键:
13560439658:201602280032110142
13560439658:201602280032110143
3.代码
注意:
1 mapper, reducer类都需要 static ,否则执行时候会报找不到对应类错误
2 FileInputFormat,TextInputFormat要引用org.apache.hadoop.mapreduce.lib.input下
3 因为写出数据到hbase 因此fileoutputformat不需要了
package hbase; import java.io.IOException; import java.text.DateFormat; import java.text.SimpleDateFormat; import java.util.Date; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.hbase.mapreduce.TableOutputFormat; import org.apache.hadoop.hbase.mapreduce.TableReducer; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Mapper.Context; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class HbaseMRImport { // Mapper<后的泛型中,前两个参数表示k1 v1类型,后两个泛型参数表示k2,v2类型 static class HbaseMRMapper extends Mapper<LongWritable, Text, LongWritable, Text> { Text v2 = new Text(); @Override protected void map(LongWritable k1, Text v1, Context context) throws IOException, InterruptedException { String v1str = v1.toString(); String[] v1arr = v1str.split("\t"); String phone = v1arr[1]; // 得到手机号 DateFormat df = new SimpleDateFormat("yyyyMMddHHmmssSSSS"); String dateStr = df.format(new Date()); v2.set(phone+":"+dateStr + "\t" + v1.toString()); // 将手机号和当前时间戳作为第一个字段,然后当前行其余字段作为剩余字段,重新写出到reduce阶段 context.write(k1, v2); } } static class HbaseMRReduce extends TableReducer<LongWritable, Text, NullWritable> { String family = "cf"; @Override protected void reduce(LongWritable k2, Iterable<Text> v2s, Context context) throws IOException, InterruptedException { for(Text v2 : v2s) { String v2Str = v2.toString(); String[] splited = v2Str.split("\t"); String rowkey = splited[0]; Put put = new Put(rowkey.getBytes()); put.add(family.getBytes(), "raw".getBytes(), v2.toString().getBytes()); // 将正行都保存起来,下面是将每个字段单独保存 方便灵活获取不同需求下数据 put.add(family.getBytes(), "rePortTime".getBytes(), splited[1].getBytes()); put.add(family.getBytes(), "msisdn".getBytes(), splited[2].getBytes()); put.add(family.getBytes(), "apmac".getBytes(), splited[3].getBytes()); put.add(family.getBytes(), "acmac".getBytes(), splited[4].getBytes()); put.add(family.getBytes(), "host".getBytes(), splited[5].getBytes()); put.add(family.getBytes(), "siteType".getBytes(), splited[6].getBytes()); put.add(family.getBytes(), "upPackNum".getBytes(), splited[7].getBytes()); put.add(family.getBytes(), "downPackNum".getBytes(), splited[8].getBytes()); put.add(family.getBytes(), "upPayLoad".getBytes(), splited[9].getBytes()); put.add(family.getBytes(), "downPayLoad".getBytes(), splited[10].getBytes()); put.add(family.getBytes(), "httpStatus".getBytes(), splited[11].getBytes()); context.write(NullWritable.get(), put); } } } /** * @throws Exception * */ public static void main(String[] args) throws Exception { // 0 初始化conf文件 Configuration conf = new Configuration(); conf.set("hbase.zookeeper.quorum", "hadoop3"); // 这里hbase是单节点,没有使用外部zk 这行和下面这行不加入测试依旧可以 conf.set("hbase.rootdir", "hdfs://hadoop3:9000/hbase"); conf.set(TableOutputFormat.OUTPUT_TABLE, "wlan"); // 0.1 定义任务 Job job = new Job(conf,HbaseMRImport.class.getSimpleName()); TableMapReduceUtil.addDependencyJars(job); job.setJarByClass(HbaseMRImport.class); // 1 设置自定义的mapper reducer类的处理逻辑 job.setMapperClass(HbaseMRMapper.class); job.setReducerClass(HbaseMRReduce.class); // 2 设置 mapper类的 k2 v2输出类型 job.setMapOutputKeyClass(LongWritable.class); job.setMapOutputValueClass(Text.class); // 设置reduce最后输出的 k3 v3类型 这里是对表输出 没有设置 /*job.setOutputKeyClass(theClass); job.setOutputValueClass(theClass);*/ // 3 设置读取文件 format 和输出文件 format job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TableOutputFormat.class); // 4 指定Job的输入源 和 输出目标 FileInputFormat.setInputPaths(job, "hdfs://hadoop3:9000/zmdata/wlan.dat"); // FileOutputFormat.setOutputPath(job, outputDir); 如果是写出到hdfs 那么需要目标hdfs文件位置 // 5 提交Job job.waitForCompletion(true); } }
4.使用ant发到远程linux服务器运行,把HBase的相关jar包放到HADOOP_CLASSPATH中
find / -name hadoop.env.sh 查找文件
在hadoop-env.sh文件中增加如下代码:
export HADOOP_CLASSPATH=/usr/local/hbase-0.98.8-hadoop2/lib/*
或者将hbase的lib都拷贝到hadoop节点某一个文件夹下 然后在引入
可以参看 hbase java操作代码简介和NoClassDefFoundError: org/apache/hadoop/hbase/HBaseConfiguratio
ant脚本:
<?xml version="1.0" encoding="UTF-8"?> <project name="项目名称" basedir="." default="sshexec"> <description>本配置文件供ANT编译项目、自动进行单元测试、打包并部署之用。</description> <description>默认操作(输入命令:ant)为编译源程序并发布运行。</description> <!--属性设置--> <property environment="env" /> <property file="build.properties" /> <property name="src.dir" value="${basedir}/src" /> <property name="java.lib.dir" value="${env.JAVA_HOME}/lib" /> <property name="classes.dir" value="${basedir}/classes" /> <property name="dist.dir" value="${basedir}/dist" /> <property name="third.lib.dir" value="${basedir}/lib" /> <property name="localpath.dir" value="${basedir}" /> <property name="remote.host" value="hadoop3"/> <property name="remote.username" value="root"/> <property name="remote.password" value="123456"/> <property name="remote.home" value="~"/> <!--每次需要知道的main类,写到这里--> <property name="main.class" value="hbase.MyHbaseAPI"/> <!-- 基本编译路径设置 --> <path id="compile.classpath"> <fileset dir="${java.lib.dir}"> <include name="tools.jar" /> </fileset> <fileset dir="${third.lib.dir}"> <include name="*.jar"/> </fileset> </path> <!-- 运行路径设置 --> <path id="run.classpath"> <path refid="compile.classpath" /> <pathelement location="${classes.dir}" /> </path> <!-- 清理,删除临时目录 --> <target name="clean" description="清理,删除临时目录"> <!--delete dir="${build.dir}" /--> <delete dir="${dist.dir}" /> <delete dir="${classes.dir}" /> <echo level="info">清理完毕</echo> </target> <!-- 初始化,建立目录,复制文件 --> <target name="init" depends="clean" description="初始化,建立目录,复制文件"> <mkdir dir="${classes.dir}" /> <mkdir dir="${dist.dir}" /> </target> <!-- 编译源文件--> <target name="compile" depends="init" description="编译源文件"> <javac srcdir="${src.dir}" destdir="${classes.dir}" source="1.7" target="1.7" includeAntRuntime="false" debug="false" verbose="false"> <compilerarg line="-encoding UTF-8 "/> <classpath refid="compile.classpath" /> </javac> </target> <!-- 打包类文件 --> <target name="jar" depends="compile" description="打包类文件"> <jar jarfile="${dist.dir}/jar.jar"> <fileset dir="${classes.dir}" includes="**/*.*" /> </jar> </target> <!--上传到服务器 **需要把lib目录下的jsch-0.1.51拷贝到$ANT_HOME/lib下,如果是Eclipse下的Ant环境必须在Window->Preferences->Ant->Runtime->Classpath中加入jsch-0.1.51。 --> <target name="ssh" depends="jar"> <scp file="${dist.dir}/jar.jar" todir="${remote.username}@${remote.host}:${remote.home}" password="${remote.password}" trust="true"/> </target> <target name="sshexec" depends="ssh"> <sshexec host="${remote.host}" username="${remote.username}" password="${remote.password}" trust="true" command="source /etc/profile;hadoop jar ${remote.home}/jar.jar ${main.class}"/> </target> </project>
5 手机上网日志字段介绍: