Hadoop MapReduce应用开发
1.开发流程
1)编写map函数和reduce函数,最好使用单元测试来确保函数的运行符合预期
2)写一个驱动程序来运行作业
3)通过在一个小的数据集上运行这个驱动程序来进行测试
2.配置API
1)Configuration
一个Configuration类的实例代表配置属性及其取值的一个集合。
每个属性由一个String来命名,而值类型可以是多种。
Configuration从XML文件中读取属性内容,常见的有core-site.xml,hdfs-site.xml,mapred-site.xml。e.g.
configuration-1.xml
<?xml version="1.0"?> <configuration> <property> <name>color</name> <value>yellow</value> <description>Color</description> </property> <property> <name>size</name> <value>10</value> <description>Size</description> </property> <property> <name>sizew</name> <value>10w</value> <description>Size</description> </property> <property> <name>weight</name> <value>heavy</value> <final>true</final> <description>Weight</description> </property> <property> <name>size-weight</name> <value>${size},${weight}</value> <description>Size and weight</description> </property> </configuration>
访问
public void testLoad() { Configuration conf = new Configuration(); conf.addResource("configuration-1.xml"); assertEquals(conf.get("color"), "yellow"); assertEquals(conf.getInt("size", 0), 10); assertEquals(conf.getInt("sizew", 0), 0); assertEquals(conf.get("weight"), "heavy"); assertEquals(conf.get("size-weight"), "10,heavy"); }
--属性的类型通过访问时的方法确定
--属性可以通过其他属性进行扩展,如size-weight
合并多个配置文件
configuration-2.xml
<?xml version="1.0"?> <configuration> <property> <name>size</name> <value>12</value> </property> <property> <name>weight</name> <value>light</value> </property> </configuration>
访问
public void testMerge() { Configuration conf = new Configuration(); conf.addResource("configuration-1.xml"); conf.addResource("configuration-2.xml"); assertEquals(conf.getInt("size", 0), 12); assertEquals(conf.get("weight"), "heavy"); }
--后来添加到源文件的属性会覆盖之前定义的属性
--被标记为final的属性不能被后面的定义覆盖,并且会WARN提示
2)系统属性
系统属性System.setProperty(...)或者使用JVM参数-DXXX=XXX设置
配置属性可以通过系统属性来扩展,系统属性优先级高于配置文件中定义的属性。前提是配置属性中存在该属性。
public void testMerge() { Configuration conf = new Configuration(); conf.addResource("configuration-1.xml"); conf.addResource("configuration-2.xml"); assertEquals(conf.getInt("size", 0), 12); assertEquals(conf.get("weight"), "heavy"); System.setProperty("size", "14"); assertEquals(conf.get("size-weight"), "14,heavy"); //fail assertEquals(conf.getInt("size", 0), 14); }
3.辅助类GenericOptionsParser,Tool和ToolRunner
GenericOptionsParser一个用来解释常用的Hadoop命令行选项的类,但更方便的方式是实现Tool接口,通过ToolRunner来运行程序,ToolRunner内部调用GenericOptionsParser。e.g.
package com.siyuan.hadoop.test.dev; import java.util.Map; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class ConfigurationPrinter extends Configured implements Tool { @Override public int run(String[] args) throws Exception { for (String arg : args) { System.out.println(arg); } return 0; } public static void main(String[] args) throws Exception { ConfigurationPrinter cfgPrinter = new ConfigurationPrinter(); Configuration loaded = new Configuration(false); loaded.addResource("configuration-1.xml"); cfgPrinter.setConf(loaded); int exitCode = ToolRunner.run(cfgPrinter , args); Configuration cfg = cfgPrinter.getConf(); for (Map.Entry<String,String> property : cfg) { System.out.printf("Property: %s=%s\n", property.getKey(), property.getValue()); } System.exit(exitCode); } }
执行:
hadoop jar /home/hadoop/task/hadooptest.jar com.siyuan.hadoop.test.dev.ConfigurationPrinter -conf configuration-2.xml arg1 arg2
输出结果:
arg1 arg2 Property: weight=heavy Property: sizew=10w Property: color=yellow Property: size-weight=${size},${weight} Property: mapred.used.genericoptionsparser=true Property: size=12
在程序中ToolRunner的静态run方法使用GenericOptionsParser来获取在hadoop命令行中指定的标准选项,然后在Configuration实例上进行设置,将非标准选项传递给Tool接口的run方法。
1)-D选项和系统属性不一样
2)-D选项的优先级要高于配置文件中的其他属性
3)并不是所有的属性都能通过-D改变
4)选项必须位于程序参数之前,如之前的-conf configuration-2.xml必须位于arg1,arg2,否则将被视为程序参数
4.程序编写
1)Mapper
MaxTemperatureMapper
package com.siyuan.hadoop.test.dev; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class MaxTemperatureMapper extends Mapper<LongWritable, Text, Text, IntWritable> { private static final int MISSING = 9999; @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String year = line.substring(15, 19); int airTemperature; if (line.charAt(87) == '+') { airTemperature = Integer.parseInt(line.substring(88, 92)); } else { airTemperature = Integer.parseInt(line.substring(87, 92)); } String quaility = line.substring(92, 93); if (airTemperature != MISSING && quaility.matches("[01459]")) { context.write(new Text(year), new IntWritable(airTemperature)); } } }
MaxTemperatureMapperTest
package com.siyuan.hadoop.test.dev; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import junit.framework.TestCase; import static org.mockito.Mockito.*; public class MaxTemperatureMapperTest extends TestCase { private MaxTemperatureMapper mapper; private Mapper<LongWritable, Text, Text, IntWritable>.Context ctxt; @Override protected void setUp() throws Exception { mapper = new MaxTemperatureMapper(); ctxt = mock(Mapper.Context.class); } public void testMap() throws IOException, InterruptedException { Text value = new Text("0029029070999991901010106004+64333+023450FM-12+000599999V0" + "202701N015919999999N0000001N9-00781+99999102001ADDGF108991999999999999999999"); mapper.map(null, value, ctxt); verify(ctxt).write(new Text("1901"), new IntWritable(-78)); } public void testMapMissing() throws IOException, InterruptedException { Text value = new Text("0029029070999991901010106004+64333+023450FM-12+000599999V0" + "202701N015919999999N0000001N9+99991+99999102001ADDGF108991999999999999999999"); mapper.map(null, value, ctxt); verify(ctxt, never()).write(any(Text.class), any(IntWritable.class)); } }
2)Reducer
MaxTemperatureReducer
package com.siyuan.hadoop.test.dev; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class MaxTemperatureReducer extends Reducer<Text, IntWritable, Text, IntWritable>{ @Override protected void reduce(Text key, Iterable<IntWritable> values, Context ctxt) throws IOException, InterruptedException { int maxAirTemperature = Integer.MIN_VALUE; for (IntWritable airTemperature : values) { maxAirTemperature = Math.max(maxAirTemperature, airTemperature.get()); } ctxt.write(new Text(key), new IntWritable(maxAirTemperature)); } }
MaxTemperatureReducerTest
package com.siyuan.hadoop.test.dev; import static org.mockito.Mockito.mock; import java.io.IOException; import java.util.ArrayList; import java.util.List; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import junit.framework.TestCase; import static org.mockito.Mockito.*; public class MaxTemperatureReducerTest extends TestCase { private MaxTemperatureReducer reducer; private Reducer<Text, IntWritable, Text, IntWritable>.Context ctxt; @Override protected void setUp() throws Exception { reducer = new MaxTemperatureReducer(); ctxt = mock(Reducer.Context.class); } public void testReduce() throws IOException, InterruptedException { Text key = new Text("1901"); List<IntWritable> values = new ArrayList<IntWritable>(); values.add(new IntWritable(100)); values.add(new IntWritable(50)); values.add(new IntWritable(0)); reducer.reduce(key, values, ctxt); verify(ctxt).write(new Text("1901"), new IntWritable(100)); } }
3)job driver
package com.siyuan.hadoop.test.dev; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class MaxTemperatureJob extends Configured implements Tool { @Override public int run(String[] args) throws Exception { if (args.length != 2) { System.err.println("Usage: MaxTemperatureJob <input path> <output path>"); System.exit(-1); } Job job = new Job(getConf(), "Max Temperature Job"); job.setJarByClass(getClass()); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); // default: job.setInputFormatClass(TextInputFormat.class); job.setMapperClass(MaxTemperatureMapper.class); job.setReducerClass(MaxTemperatureReducer.class); job.setOutputKeyClass(Text.class); job.setOutputKeyClass(IntWritable.class); return job.waitForCompletion(true) ? 0 : 1; } public static void main(String[] args) throws Exception { System.exit(ToolRunner.run(new MaxTemperatureJob(), args)); } }
运行结果:
java.io.IOException: Type mismatch in key from map: expected org.apache.hadoop.io.IntWritable, recieved org.apache.hadoop.io.Text
修复:
job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class);
5.MapReduce的WEB页面
可以通过Hadoop提供的Web页面来浏览作业信息,对于跟踪作业进度,查找作业完成后的统计信息和日志非常有用。默认的端口号为50030。
mapred-site.xml
<property> <name>mapred.job.tracker.http.address</name> <value>0.0.0.0:50030</value> <description> The job tracker http server address and port the server will listen on. If the port is 0 then the server will start on a free port. </description> </property>1)作业,任务和task attempt ID
--作业ID:jobtracker(不是作业)开始的时间和唯一标识此作业的由jobtracker维护的增量计数器。
e.g. job_201403281304_0006
注:计数器从0001开始,达到10000时,不能重新设置。
--任务ID:在初始化时产生,不必是任务执行的顺序。格式为 作业ID_[mr]_000X,mr为任务类型,000X为任务计数器,从0000开始。
e.g. task_201403281304_0006_m_000000,task_201403281304_0006_r_000000
--task attempt ID
由于失败或者推测执行,任务可能会执行多次。为了标识任务执行的不同实例,会通过task attempt ID进行区分。格式为 任务ID_index,index从0开始。task attempt在作业运行时根据需要分配,所以,它们的顺序代表tasktracker产生并运行的先后顺序。
e.g.task_201403281304_0006_m_000000_0,task_201403281304_0006_r_000000_0
注:如果在jobtracker重启并恢复运行作业后,作业被重启,那么task attempt ID中的计数器将从1000开始。
2)WEB页面组成
--jobtracker页面
在作业存储到历史信息页之前,主页上只显示100个作业,作业历史是永久存储的。
mapred-site.xml
<property> <name>mapred.jobtracker.completeuserjobs.maximum</name> <value>100</value> <description>The maximum number of complete jobs per user to keep around before delegating them to the job history.</description> </property>
作业历史的保存路径,系统会保存30天,然后自动删除。
mapred-site.xml
<property> <name>hadoop.job.history.location</name> <value></value> <description> The location where jobtracker history files are stored. The value for this key is treated as a URI, meaning that the files can be stored either on HDFS or the local file system. If no value is set here, the location defaults to the local file system, at file:///${hadoop.log.dir}/history. If the URI is missing a scheme, fs.default.name is used for the file system. </description> </property>作业输出目录的_logs/history子目录为用户存放第二个备份,该文件不会被系统删除
mapred-site.xml
<property> <name>hadoop.job.history.user.location</name> <value></value> <description> User can specify a location to store the history files of a particular job. If nothing is specified, the logs are stored in output directory. The files are stored in "_logs/history/" in the directory. User can stop logging by giving the value "none". </description> </property>
--job页面
--task页面
Actions列包括终止task attempt的连接,默认情况下为禁用的。
core-site.xml
<property> <name>webinterface.private.actions</name> <value>false</value> <description> If set to true, the web interfaces of JT and NN may contain actions, such as kill job, delete file, etc., that should not be exposed to public. Enable this option if the interfaces are only reachable by those who have the right authorization. </description> </property>6.其它
1)获取结果
每个reducer会产生一个输出文件到输出目录。
--hadoop fs 命令中的-getmerge,可以得到源模式目录中的所有文件,并在本地系统上将它们合并成一个文件。e.g.
hadoop fs -getmerge output output-local.txt2)作业调试
--将调试信息打印到标准错误中
--发送一个信息来更新任务的状态信息以提示我们查看错误日志
--创建一个自定义的计数器来统计错误总数
以上信息均可在WEB页面中查看
package com.siyuan.hadoop.test.dev; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class MaxTemperatureMapper extends Mapper<LongWritable, Text, Text, IntWritable> { private static final int MISSING = 9999; enum RECORD_FORMAT { ERROR } @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String year = line.substring(15, 19); int airTemperature; if (line.charAt(87) == '+') { airTemperature = Integer.parseInt(line.substring(88, 92)); } else { airTemperature = Integer.parseInt(line.substring(87, 92)); } String quaility = line.substring(92, 93); if (airTemperature != MISSING && quaility.matches("[01459]")) { context.write(new Text(year), new IntWritable(airTemperature)); } else { //将调试信息打印到标准错误中 System.err.println("Wrong format record has been found:" + line); //更改任务状态 context.setStatus("Wrong format record has been found."); //创建一个自定义的计数器来统计错误总数 context.getCounter(RECORD_FORMAT.ERROR).increment(1); } } }
3)Hadoop用户日志
针对不同的用户,hadoop在不同的地方生成日志,如下表:
4)作业调优
5)MapReduce工作流
--JobControl类
--Oozie