MapReduce编程实践-基于IDEA/Maven实现单词统计分析
在开始MapReduce编程之前,需要做好如下准备工作。
(1)搭建好单机版本或者伪分布式Hadoop环境;
CentOS 7 单机安装最新版Hadoop v3.1.2以及配置和简单测试
Hadoop v3.1.2 伪分布式安装(Pseudo-Distributed Operation)
(2)在HDFS中创建好input文件夹,并上传文本文件到HDFS中的input文件夹中;
创建input文件夹
bin/hadoop fs -mkdir input
上传本地input文件夹中文件到HDFS中的input文件夹中
bin/hadoop fs -put input/*.txt input
查看HDFS 目录(-R 是ls命令的递归选项)
bin/hadoop fs -ls -R
如果output 目录已经存在,则删除output 文件夹(重新运行应用时,也需要首先将HDFS中的output文件夹删除,然后在运行)
bin/hadoop fs -rm -r output
再次查看HDFS目录
bin/hadoop fs -ls -R
(3)删除output 文件夹,MapReduce应用运行时会将结果存放在该目录中;
编写MapReduce应用,主要包括如下几个步骤:
(1)编写Map处理逻辑;
(2)编写Reduce处理逻辑;
(3)编写main 方法;
(4)编译打包代码,以及运行应用程序
一,编写Map处理逻辑
首先,通过IDEA创建一个Maven项目,并添加对hadoop-client的引用。
添加对hadoop-client jar包的引用:
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.1.2</version>
</dependency>
通过继承Mapper类来实现Map处理逻辑。
package com.rickie.wordcount;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
import java.util.StringTokenizer;
public class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {
private static final IntWritable one = new IntWritable(1);
private Text word = new Text();
public TokenizerMapper() {
}
@Override
public void map(Object key, Text value, Mapper<Object, Text, Text, IntWritable>.Context context)
throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while(itr.hasMoreTokens()) {
this.word.set(itr.nextToken());
context.write(this.word, one);
}
}
}
Map过程需要继承org.apache.hadoop.mapreduce包中 Mapper 类,并重写其map方法。
public class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable>
其中的模板参数:第一个Object表示输入key的类型;第二个Text表示输入value的类型;第三个Text表示表示输出键的类型;第四个IntWritable表示输出值的类型。
作为map方法输入的键值对,其value值存储的是文本文件中的一行(以回车符为行结束标记),而key值为该行的首字母相对于文本文件的首地址的偏移量。然后StringTokenizer类将每一行拆分成为一个个的单词,并将<word,1>作为map方法的结果输出,其余的工作都交有 MapReduce框架处理。
StringTokenizer是Java工具包中的一个类,用于将字符串进行拆分—默认情况下使用空格作为分隔符进行分割。
二,编写Reduce处理逻辑
在Map运行结束得到中间结果后,接下来进入Shuffle阶段,在这个阶段中Hadoop自动将Map的输出结果进行分区、排序、合并,然后分发给对应的Reduce任务去处理。
下面是Reduce处理逻辑的具体代码:
package com.rickie.wordcount;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
import java.util.Iterator;
public class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable result = new IntWritable();
public IntSumReducer() {}
@Override
public void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context)
throws IOException, InterruptedException {
int sum = 0;
IntWritable val;
for(Iterator i = values.iterator(); i.hasNext(); sum +=val.get()){
val = (IntWritable)i.next();
}
this.result.set(sum);
context.write(key, this.result);
}
}
继承Hadoop提供的类(Reducer),并override其方法(reduce)。
public class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable>
其中模板参数同Map一样,依次表示是输入键类型,输入值类型,输出键类型,输出值类型。
public void reduce(Text key, Iterable<IntWritable> values, Context context)
reduce 方法的输入参数 key 为单个单词,而 values 是由各Mapper上对应单词的计数值所组成的列表(一个实现了 Iterable 接口的变量,可以理解成 values 里包含若干个 IntWritable 整数,可以通过迭代的方式遍历所有的值),所以只要遍历 values 并求和,即可得到某个单词出现的总次数。
当Reduce过程结束时,就可以得到最终需要的数据了。
三,编写main方法
为了让前面的Map/Reduce处理类能够协同工作,需要在main方法中通过Job 类设置Hadoop应用程序运行时的环境变量。
package com.rickie;
import com.rickie.wordcount.IntSumReducer;
import com.rickie.wordcount.TokenizerMapper;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import java.io.IOException;
/**
* Word Count
*
*/
public class WordCount
{
public static void main( String[] args ) throws IOException, ClassNotFoundException, InterruptedException {
System.out.println( "Hello World!" );
Configuration conf = new Configuration();
String[] otherArgs = (new GenericOptionsParser(conf, args)).getRemainingArgs();
if(otherArgs.length < 2) {
System.err.println("Usage: wordcount <in> [<in>...] <out>");
}
// 设置环境参数
Job job = Job.getInstance(conf, "word count");
// 设置整个应用的类名
job.setJarByClass(WordCount.class);
// 添加Mapper 类
job.setMapperClass(TokenizerMapper.class);
// 添加Reducer 类
job.setReducerClass(IntSumReducer.class);
// 设置key 输出类型
job.setOutputKeyClass(Text.class);
// 设置value 输出类型
job.setOutputValueClass(IntWritable.class);
for(int i=0; i<otherArgs.length - 1; ++i) {
// 设置输入文件
FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
}
// 设置输出文件
FileOutputFormat.setOutputPath(job, new Path(otherArgs[otherArgs.length -1]));
System.exit(job.waitForCompletion(true)?0:1);
}
}
在代码的开始部分,通过类 Configuration 获取程序运行时的参数情况,并存储到 otherArgs 变量中。
然后,通过类 Job 设置环境参数。
最后,根据之前获得的程序运行时参数,设置输入/输出文件路径。
完成相应任务的参数设定后,即可调用 job.waitForCompletion() 方法执行任务。
四,编译打包代码,以及运行应用程序
完整Maven项目,如图所示。
通过maven命令,mvn clean package,打包wordcount.jar 包文件,并复制到hadoop 存储节点。
根据MapReduce设计理念:计算向数据靠拢。将jar 包复制到hadoop 存储节点。
在运行之前,先检查一下input / output目录。
bin/hadoop fs -ls -R
显示input 目录中有2个文件,output目录不存在。
运行 wordcount.jar 应用
bin/hadoop jar wordcount.jar com.rickie.WordCount input output
查看输出结果
bin/hdfs dfs -cat output/*
或者
bin/hadoop fs -cat output/*