Hadoop系列(三)MapReduce Job的几种提交运行模式

Job执行可以分为本地执行或者集群执行。hadoop集群安装部署在远程centos系统中。使用经典的WordCount代码为例。

1. 本地执行模式(本地为MacOS环境),无需启动远程的hadoop集群,本地job会提交给本地执行器LocalJobRunner去执行。

1)输入输出数据存放在本地路径下:

首先,MapReduce代码如下:

  • Mapper
package com.nasuf.hadoop.mr;

import java.io.IOException;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class WCMapper extends Mapper<LongWritable, Text, Text, LongWritable> {
    
    @Override
    protected void map(LongWritable key, Text value, Context context)
            throws IOException, InterruptedException {
        
        String line = value.toString();
        String[] words = StringUtils.split(line, " ");
        
        for (String word: words) {
            context.write(new Text(word), new LongWritable(1));
        }
        
    }

}
  • Reducer
package com.nasuf.hadoop.mr;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class WCReducer extends Reducer<Text, LongWritable, Text, LongWritable>{
    
    @Override
    protected void reduce(Text key, Iterable<LongWritable> values, Context context) 
            throws IOException, InterruptedException {
        
        long count = 0;
        for (LongWritable value: values) {
            count += value.get();
        }
        
        context.write(key, new LongWritable(count));
        
    }

}
  • Runner
package com.nasuf.hadoop.mr;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;


public class WCRunner {
    
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);
        
        // 设置整个job所用的类在哪个jar包
        job.setJarByClass(WCRunner.class);
        
        // 本job实用的mapper和reducer的类
        job.setMapperClass(WCMapper.class);
        job.setReducerClass(WCReducer.class);
        
        // 指定reducer的输出数据kv类型(若不指定下面mapper的输出类型,此处可以同时表明mapper和reducer的输出类型)
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(LongWritable.class);
        
        // 指定mapper的输出数据kv类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(LongWritable.class);
        
        // 指定原始数据存放位置
        FileInputFormat.setInputPaths(job, new Path("/Users/nasuf/Desktop/wc/srcdata"));
        
        // 处理结果的输出数据存放路径
        FileOutputFormat.setOutputPath(job, new Path("/Users/nasuf/Desktop/wc/output"));
        
        // 将job提交给集群运行
        job.waitForCompletion(true);
        
    }

}

在本地模式中,可以将测试数据存放在"/Users/nasuf/Desktop/wc/srcdata"路径下,注意输出路径不能是已经存在的路径,不然会抛出异常。
2) 输入输出数据存放在hdfs中,需要启动远程的hdfs(无需启动yarn)
修改Runner代码如下:

// 指定原始数据存放位置
        FileInputFormat.setInputPaths(job, new Path("hdfs://hdcluster01:9000/wc/srcdata"));
        
        // 处理结果的输出数据存放路径
        FileOutputFormat.setOutputPath(job, new Path("hdfs://hdcluster01:9000/wc/output1"));

如果出现如下错误:

org.apache.hadoop.security.AccessControlException: Permission denied: user=nasuf, access=WRITE, inode="/wc":parallels:supergroup:drwxr-xr-x

显然是权限问题。hadoop的用户目录是parallels,权限是rwxr-xr-x,而本地操作使用的用户是nasuf。解决方法如下:在vm启动参数中加入如下参数:-DHADOOP_USER_NAME=parallels即可。

2. 集群执行模式(首先需要启动yarn,job会提交到yarn框架中去执行。访问http://hdcluster01:8088可以查看job执行状态。)

1)使用命令直接执行jar

hadoop jar wc.jar com.nasuf.hadoop.mr.WCRunner

查看http://hdcluster01:8088中job执行状态

Hadoop系列(三)MapReduce Job的几种提交运行模式

2) 通过main方法直接在本地提交job到yarn集群中执行
将$HADOOP_HOME/etc/hadoop/mapred-site.xml 和 yarn-site.xml拷贝到工程的classpath下,直接执行上述代码,即可提交job到yarn集群中执行。
或者直接在代码中配置如下参数,与拷贝上述两个配置文件相同的作用:

conf.set("mapreduce.framework.name", "yarn");
conf.set("yarn.resourcemanager.hostname", "hdcluster01");
conf.set("yarn.nodemanager.aux-services", "mapreduce_shuffle");

如果出现如下错误信息:

2018-08-26 10:25:37,544 INFO  [main] mapreduce.Job (Job.java:monitorAndPrintJob(1375)) - Job job_1535213323614_0010 failed with state FAILED due to: Application application_1535213323614_0010 failed 2 times due to AM Container for appattempt_1535213323614_0010_000002 exited with  exitCode: -1000 due to: File file:/tmp/hadoop-yarn/staging/nasuf/.staging/job_1535213323614_0010/job.jar does not exist
.Failing this attempt.. Failing the application.

可以将core-site.xml配置文件同时拷贝到classpath中,或者同样配置如下参数:

conf.set("hadoop.tmp.dir", "/home/parallels/app/hadoop-2.4.1/data/");

即可解决问题。

相关推荐