使用Hadoop做K-Means计算的总结
以K均值聚类算法为实验对象。
通过调整各项Hadoop参数,已经不能再进一步缩短K均值迭代的时间,在计算过程中,CPU User态的使用率始终维持在95%左右。
尝试过的配置项有:
mapred.min.split.size
io.sort.mb
io.sort.spill.percent
io.sort.factor
min.num.spill.for.combine
mapred.child.java.opts
mapred.reduce.tasks
mapred.compress.map.output
dfs.data.dir 这里设置多个分布在不同存储设备上的目录,可以明显的提高IO效率。
在试验中,5个计算节点,一个MapTask的执行时间为130s,输入是512M的数据,包含约2,800,000条记录。
在相同的机器上进行单机实验,使用一个包含3,000,000条记录的512M的文件作为输入,边读文件边计算,耗时41s; 不计算读文件时间,运算时间仅耗时13s。
由此推断,一个MapTask中,只有三分之一的时间消耗在读文件和运算上。看来Map端的shuffle时间比较长。
下面是一个比较详细的MapTask运行机理,其中对shuffle阶段的描述比较详细:
1. 在map task执行时,它的输入数据来源于HDFS的block,当然在MapReduce概念中,map task只读取split。Split与block的对应关系可能是多对一,默认是一对一。在WordCount例子里,假设map的输入数据都是像“aaa”这样的字符串。
2. 在经过mapper的运行后,我们得知mapper的输出是这样一个key/value对: key是“aaa”, value是数值1。因为当前map端只做加1的操作,在reduce task里才去合并结果集。前面我们知道这个job有3个reduce task,到底当前的“aaa”应该交由哪个reduce去做呢,是需要现在决定的。
MapReduce提供Partitioner接口,它的作用就是根据key或value及reduce的数量来决定当前的这对输出数据最终应该交由哪个reduce task处理。默认对key hash后再以reduce task数量取模。默认的取模方式只是为了平均reduce的处理能力,如果用户自己对Partitioner有需求,可以订制并设置到job上。
在我们的例子中,“aaa”经过Partitioner后返回0,也就是这对值应当交由第一个reducer来处理。接下来,需要将数据写入内存缓冲区中,缓冲区的作用是批量收集map结果,减少磁盘IO的影响。我们的key/value对以及Partition的结果都会被写入缓冲区。当然写入之前,key与value值都会被序列化成字节数组。
整个内存缓冲区就是一个字节数组,它的字节索引及key/value存储结构我没有研究过。如果有朋友对它有研究,那么请大致描述下它的细节吧。