HDAOOP SIMPLIZE TOOLKIT hadoop mapreduce简化开发包
https://github.com/jonenine/HST
虽然大数据的发展已经将近10个年头了,hadoop技术仍然没有过时,特别是一些低成本,入门级的小项目,使用hadoop还是蛮不错的。而且,也不是每一个公司都有能力招聘和培养自己的spark人才。
我本人对于hadoop mapreduce是有一些意见的。hadoop mapreduce技术对于开发人员的友好度不高,程序难写,调试困难,对于复杂的业务逻辑远没有spark得心应手。
2016年的春节前接到一个任务,要在一个没有spark的平台实现电力系统的一些统计分析算法,可选的技术只有hadoop mapreduce。受了这个刺激之后产生了一些奇思妙想,然后做了一些试验,并最终形成HST---hadoop simplize toolkit,还真是无心载柳柳成荫啊。
HST基本优点如下:
屏蔽了hadoop数据类型,取消了driver,将mapper和reducer转化为transformer和joiner,业务逻辑更接近sql。相当程度的减少了代码量,极大的降低了大数据编程的门槛,让基层程序员通过简单的学习即可掌握大数据的开发。
克服了hadoop mapreduce数据源单一的情况,比如在一个job内,input可以同时读文件和来自不同集群的hbase。
远程日志系统,让mapper和reducer的日志集中到driver的控制台,极大减轻了并行多进程程序的调试难度。
克服了hadoop mapreduce编写业务逻辑时,不容易区分数据来自哪个数据源的困难。接近了spark(或者sql)的水平。
天生的多线程执行,即在mapper和reducer端都默认使用多线程来执行业务逻辑。
对于多次迭代的任务,相连的两个任务可以建立关联,下一个任务直接引用上一个任务的结果,使多次迭代任务的代码结构变得清晰优美。
以下会逐条说明
基本概念的小变化:
Source类代替了hadoop Input体系(format,split和reader)
Transformer代替了mapper
Joiner代替了Reducer
去掉了饱受诟病的Driver,改为内置的实现,现在完全不用操心了。
<!--[if !supportLists]-->1. <!--[endif]-->基本上,屏蔽了hadoop的数据类型,使用纯java类型
在原生的hadoop mapreduce开发中,使用org.apache.hadoop.io包下的各种hadoop数据类型,比如hadoop的Text类型,算法的编写中一些转换非常不方便。而在HST中一律使用java基本类型,完全屏蔽了hadoop类型体系。
比如在hbase作为source(Input)的时候,再也不用直接使用ImmutableBytesWritable和Result了,HST为你做了自动的转换。
现在的mapper(改名叫Transformer了)风格是这样的
publicstaticclass TransformerForHBase0 extends HBaseTransformer<Long>
…
现在map方法叫flatmap,看到没,已经帮你自动转成了string和map
publicvoid flatMap(String key, Map<String, String> row,
Collector<Long> collector)
可阅读xs.hadoop.iterated.IteratedUtil类中关于类型自动转换的部分
<!--[if !supportLists]-->2. <!--[endif]-->克服了hadoop mapreduce数据源单一的情况。比如在一个job内,数据源同时读文件和hbase,这在原生的hadoop mapreduce是不可能做到的
以前访问hbase,需要使用org.apache.hadoop.hbase.client.Scan和TableMapReduceUtil,现在完全改为与spark相似的方式。
现在的风格是这样的:
Configuration conf0 = HBaseConfiguration.create();
conf0.set("hbase.zookeeper.property.clientPort", "2181");
conf0.set("hbase.zookeeper.quorum", "172.16.144.132,172.16.144.134,172.16.144.136");
conf0.set(TableInputFormat.INPUT_TABLE,"APPLICATION_JOBS");
conf0.set(TableInputFormat.SCAN_COLUMN_FAMILY,"cf");
conf0.set(TableInputFormat.SCAN_CACHEBLOCKS,"false");
conf0.set(TableInputFormat.SCAN_BATCHSIZE,"20000");
...其他hbase的Configuration,可以来自不同集群。
IteratedJob<Long> iJob = scheduler.createJob("testJob")
.from(Source.hBase(conf0), TransformerForHBase0.class)
.from(Source.hBase(conf1), TransformerForHBase1.class)
.from(Source.textFile("file:///home/cdh/0.txt"),Transformer0.class)
.join(JoinerHBase.class)
Hadoop中的input,现在完全由source类来代替。通过内置的机制转化为inputformat,inputsplit和reader。在HST的框架下,其实可以很容易的写出诸如Source.dbms(),Source.kafka()以及Source.redis()方法。想想吧,在一个hadoop job中,你终于可以将任意数据源,例如来自不同集群的HBASE和来自数据库的source进行join了,这是多么happy的事情啊!
<!--[if !supportLists]-->3. <!--[endif]-->远程日志系统。让mapper和reducer的日志集中在driver进行显示,极大减轻了了并行多进程程序的调试难度
各位都体验过,job fail后到控制台页面,甚至ssh到计算节点去查看日志的痛苦了吧。对,hadoop原生的开发,调试很痛苦的呢!
现在好了,有远程日志系统,可以在调试时将mapper和reducer的日志集中在driver上,错误和各种counter也会自动发送到driver上,并实时显示在你的控制台上。如果在eclipse中调试程序,就可以实现点击console中的错误,直接跳到错误代码行的功能喽!
Ps:有人可能会问,如何在集群外使用eclipse调试一个job,却可以以集群方式运行呢?这里不再赘述了,网上有很多答案的哦
<!--[if !supportLists]-->4. <!--[endif]-->克服了hadoop mapreduce在join上,区分数据来自哪个数据源的困难,接近spark(或者sql)的水平
在上面给出示例中,大家都看到了,现在的mapper可以绑定input喽!,也就是每个input都有自己独立的mapper。正因为此,现在的input和mapper改名叫Source和Transformer。
那么,大家又要问了,在mapper中,我已经可以轻松根据不同的数据输入写出不同的mapper了,那reducer中怎么办,spark和sql都是很容易实现的哦?比如看人家sql
Select a.id,b.name from A a,B b where a.id = b.id
多么轻松愉悦啊!
在原生hadoop mapreduce中,在reducer中找出哪个数据对应来自哪个input可是一个令人抓狂的问题呢!
现在这个问题已经被轻松解决喽!看下面这个joiner,对应原生的reducer
publicstaticclass Joiner0 extends Joiner<Long, String, String>
…
Reduce方法改名叫join方法,是不是更贴近sql的概念呢?
publicvoid join(Long key,RowHandler handler,Collector collector) throws Exception{
List<Object> row = handler.getSingleFieldRows(0);//对应索引为0的source
List<Object> row2 = handler.getSingleFieldRows(1);//对应第二个定义的source
注意上面两句,可以按照数据源定义的索引来取出来自不同数据源join后的数据了,以后有时间可能会改成按照别名来取出,大家看源码的时候,会发现别名这个部分的接口都写好了,要不你来帮助实现了吧。
<!--[if !supportLists]-->5. <!--[endif]-->天生的多线程执行,即在mapper和reducer端都默认使用多线程来执行业务逻辑。
看看源码吧,HST框架是并发调用flatMap和join方法的,同时又不能改变系统调用reduce方法的顺序(否则hadoop的辛苦排序可就白瞎了),这可不是一件容易的事呢!
看到这里,有的同学说了。你这个HST好是好,但你搞的自动转换类型这个机制可能会把性能拉下来的。这个吗,不得不承认,可能是会有一点影响。但在生产环境做的比对可以证明,影响太小了,基本忽略不计。
笔者在生产环境做了做了多次试验,mapper改成多线程后性能并未有提高,特别是对一些业务简单的job,增加Transformer中的并发级别效率可能还会下降。
很多同学喜欢在mapper中做所谓“mapper端的join”。这种方式,相信在HST中通过提高mapper的并发级别后会有更好的表现。
Reducer中的性能相对原生提升的空间还是蛮大的。大部分的mapreduce项目,都是mapper简单而reducer复杂,HST采用并发执行join的方式对提升reducer性能是超好的。
<!--[if !supportLists]-->6. <!--[endif]-->对于多次迭代的任务,相连的两个任务可以建立关联,在流程上的下一个job直接引用上一个job的结果,使多次迭代任务的代码结构变得清晰优美
虽然在最后才提到这一点,但这却是我一开始想要写HST原因。多次迭代的任务太麻烦了,上一个任务要写在hdfs做存储,下一个任务再取出使用,麻烦不麻烦。如果都由程序自动完成,岂不美哉!
在上一个任务里format一下
IteratedJob<Long> iJob = scheduler.createJob("testJob")
...//各种source定义
.format("f1","f2")
在第二个任务中,直接引用
IteratedJob<Long> stage2Job = scheduler.createJob("stage2Job")
.fromPrevious(iJob, Transformer2_0.class);
//Transformer2_0.class
publicstaticclass Transformer2_0 extends PreviousResultTransformer<Long>
...
publicvoid flatMap(Long inputKey, String[] inputValues,Collector<Long> collector) {
String f1 = getFiledValue(inputValues, "f1");
String f2 = getFiledValue(inputValues, "f2");
看到没,就是这么简单。
在最开始的计划中,我还设计了使用redis队列来缓冲前面job的结果,供后面的job作为输入。这样本来必须严格串行的job可以在一定程度上并发。另外还设计了子任务的并发调度,这都留给以后去实现吧。
<!--[if !supportLists]-->7. <!--[endif]-->便捷的自定义参数传递。
有时候,在业务中需要作一些“开关变量”,在运行时动态传入不同的值以实现不同的业务逻辑。这个问题HST框架其实也为你考虑到了。
Driver中的自定义参数,source中的自定义参数都会以内置的方式传到transformer或joiner中去,方便程序员书写业务。
查看transformer或joiner的源码就会发现:
getSourceParam(name)和getDriverParam(pIndex)方法,在计算节点轻松的得到在driver和source中设置的各层次级别的自定义参数,爽吧!
<!--[if !supportLists]-->8. <!--[endif]-->其他工具
HST提供的方便还不止以上这些,比如在工具类中还提供了两行数据(map类型)直接join的方法。这些都留给你自己去发现并实践吧!