map函数处理的<k1,v1>从HDFS文件中获取流程源码解析
0 引子:
mapreduce在执行任务的时候,是如何将外部文件进行切分,并将这些文件转换成<k1,v1>键值对方式的
(还记得 map-reduce基本概念和wordcount解析 文章中提到的<k1,v1>概念吗?)
一些总结性的话:
a) recordreader + inputsplit是数据输入处理阶段非常重要的两个概念。
b) inputsplit: 对原始输入数据的封装,封装原始数据源,这个数据源可以是hdfs文件系统,也可是DB,或者数据流
c) recordreader: 把inputsplit的数据转换成<k1,v1>
d) inputsplit和hdfs的block之间的关系是什么??
在hdfs看来,mapreduce就是hdfs的客户端,
mapreduce默认inputsplit大小和hdfs的一个block相等,
但是并不说
mapreduce的一个inputsplit就对应hdfs的一个block,
mapreduce是看不到hdfs体内的一个个的block的, 因此默认情况下,被处理目标文件有多少block,就会产生
多少inputsplit,也就会对应多少个map任务,即block和inputsplit只有这种对照关系。
为什么设计成一个hdfs block 对应一个 inputsplit:
核心就是数据本地化,在当前机器上计算的数据尽量不要传输
1个inputsplit>1个block时,比如1个inputsplit使用2个block时,hdfs的这两个block不一定都存储在同一个节点上,那么必然会产生网络传输,将需要的另一个block 传输到计算的这个节点上.
inputsplit<1个block时,那么一个block的数据中就必然有一部分没有被处理,必然就会被别的mapper处理,必然增大网络传输的概率.
看下图:
1 FileInputFormat代码简析, 回答如何将源文件逻辑分割
package org.apache.hadoop.mapreduce.lib.input 读取hdfs目录下的文件,后将文件便利,在执行getSplits方法时会发现,for循环内代码中, 每一个文件不论是1K还是1G 都会被处理封装成对应个数的inputsplit ,即一个文件对应一个或者多个split 而每一个split都会对应一个map进程 如果小文件太多,那么开启map进程太多 势必浪费资源,操作系统开启,执行,挂起一个进程是很消耗资源的 /** * Generate the list of files and make them into FileSplits. */ public List<InputSplit> getSplits(JobContext job ) throws IOException { long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job)); long maxSize = getMaxSplitSize(job); // generate splits List<InputSplit> splits = new ArrayList<InputSplit>(); // hdfs//master:9000/hello 则files为hello hdfs//master:9000/ 则files为目录下的所有文件 List<FileStatus>files = listStatus(job); // 读取job中的输入路径,进入 for (FileStatus file: files) {// 对目标路径下的文件/文件数 进行截取成 split操作, 每个文件至少对应一个切片,一个切片对应一个map //如果每个文件都是1M,那么这个文件就在切片时对应一个split 也就对应一个map操作,1T的文件就1T个map,这种计算要消费多大的资源,这就是小文件不适合hdfs的原因, //而如果64个1m文件合并后在处理,对应map就缩小了63倍,这样对程序性能影响有多少 可想而知 Path path = file.getPath();// 获取文件路径 FileSystem fs = path.getFileSystem(job.getConfiguration());// 获取hdfs操作 long length = file.getLen(); BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0, length); if ((length != 0) && isSplitable(job, path)) { // 当目标文件长度>0并且是可split时 long blockSize = file.getBlockSize();// 文件块大小 默认是64M long splitSize = computeSplitSize(blockSize, minSize, maxSize);// 计算切片尺寸,进入此方法,可见结果为64M, 参数minSize值为1,maxSize为long的最大值 如果想调整切片大小,需要重写方法 getMaxSplitSize public static long getMinSplitSize( 两个方法 long bytesRemaining = length; while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) { int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining); splits.add(new FileSplit(path, length-bytesRemaining, splitSize, blkLocations[blkIndex].getHosts())); bytesRemaining -= splitSize; } if (bytesRemaining != 0) { splits.add(new FileSplit(path, length-bytesRemaining, bytesRemaining, blkLocations[blkLocations.length-1].getHosts())); } } else if (length != 0) { // 文件不可拆分 splits.add(new FileSplit(path, 0, length, blkLocations[0].getHosts())); } else { // 文件长度=0 //Create empty hosts array for zero length files splits.add(new FileSplit(path, 0, length, new String[0])); } } // Save the number of input files in the job-conf job.getConfiguration().setLong(NUM_INPUT_FILES, files.size()); LOG.debug("Total # of splits: " + splits.size()); return splits; } 如下: protected List<FileStatus> listStatus( .... Path[] dirs = getInputPaths(job); // 进入 大概看下 ) 仅仅是对1T数据按照64M一个个的做标记, 是逻辑切分,不是物理切分,仅仅记录原始数据位置和要处理的长度 类比于高速公路上的 37公里处, 70公里处的路标, 仅仅是个标志而已,实际上高速公路还是连在一起的 看下面的数据结构,如果是物理切分的话,其数据结构必然也会要有 private bytes[] buf; 这个属性 public class FileSplit extends InputSplit implements Writable { private Path file; private long start; private long length; private String[] hosts; 。。。。 }
2 RecordReader 代码简析,回答如何将split转换成键值对:
package org.apache.hadoop.mapreduce.lib.input TextInputFormat. @Override public RecordReader<LongWritable, Text> createRecordReader(InputSplit split, TaskAttemptContext context) { return new LineRecordReader(); // 进入 } } 如下: /** * Treats keys as offset in file and value as line. */ LineRecordReader.java nextKeyValue() 调用一次 则当前 key value 就被赋值到, 相当于迭代器 public boolean nextKeyValue() throws IOException { if (key == null) { key = new LongWritable(); // 第一次调用时,key = 0 eg: hello.txt (大小10KB) } key.set(pos); if (value == null) { value = new Text(); } int newSize = 0; // We always read one extra line, which lies outside the upper // split limit i.e. (end - 1) while (getFilePosition() <= end) { newSize = in.readLine(value, maxLineLength, //读取第一行时,将第一行内容写入到value内,并返回第一行读完后的位置 Math.max(maxBytesToConsume(pos), maxLineLength)); if (newSize == 0) { break; } pos += newSize; // 记录pos现在的位置,为读取第二行时, key.set(pos);做准备工作 if (newSize < maxLineLength) { break; } // 这样赋值对全局变量 key value赋值完毕后,通过getCurrentKey getCurrentValue 获取现在的值 ....} 看map如何调用写法: Mapper.java: public void run(Context context) throws IOException, InterruptedException { setup(context); while (context.nextKeyValue()) {// 不断context.nextKeyValue()进入方法后实际就是RecordReader.nextKeyValue()来不断将目标文件的行,行内容放在<k,v> map(context.getCurrentKey(), context.getCurrentValue(), context); //不断取值 } cleanup(context); }
总结如下:
系统回答如下:
问:从源代码的角度分析map函数处理的<k1,v1>是如何从HDFS文件中获取的? 答: 1.从TextInputFormat入手分析,找到父类FileInputFormat,找到父类InputFormat。 在InputFormat中找到2个方法,分别是getSplits(...)和createRecordReader(...)。 通过注释知道getSplits(...)作用是把输入文件集合中的所有内容解析成一个个的InputSplits,每一个InputSplit对应一个mapper task。 createRecordReader(...)作用是创建一个RecordReader的实现类。RecordReader作用是解析InputSplit产生一个个的<k,v>。 2.在FileInputFormat中找到getSplits(...)的实现。 通过实现,获知 (1)每个SplitSize的大小和默认的block大小一致,好处是满足数据本地性。 (2)每个输入文件都会产生一个InputSplit,即使是空白文件,也会产生InputSPlit; 如果一个文件非常大,那么会按照InputSplit大小,切分产生多个InputSplit。 3.在TextInputFormat中找到createRecordReader(...)的实现,在方法中找到了LineRecordReader。 接下来分析LineRecordReader类。 在RecordReader类中,通过查看多个方法,知晓key、value作为类的属性存在的,且知道了nextKeyValue()方法的用法。 在LineRecordReader类中,重点分析了nextKeyValue(...)方法。在这个方法中,重点分析了newSize = in.readLine(value, maxLineLength, maxBytesToConsume(pos)); 在in.readLine(...)中,第一个形参存储被读取的行文本内容,返回值表示被读取内容的字节数。 通过以上代码,分析了InputSplit中的内容是如何转化为一个个的<k,v>。 4.从Mapper类中进行分析,发现了setup()、cleanup()、map()、run()。 在run()方法中,通过while,调用context.nextKeyValue(...)。 进一步分析Context的接口类是org.apache.hadoop.mapreduce.lib.map.WrappedMapper.MapContext,MapContext调用了nextKeyValue(...)。最终找到了MapContext的实现了MapContextImpl类org.apache.hadoop.mapreduce.task.MapContextImpl。 在这个类的构造方法中,发现传入了RecordReader的实现类。