Hadoop浅度学习指南(HDFS、YARN、MapReduce)

大数据

  1. 概念:big data
  2. 5V特征:

    1. Volume:量大
    2. Value:价值高,价值密度低
    3. Variety:多样性
    4. Velocity:速度快
    5. Veracity:准确性

hadoop

主要组成

GFS --> HDFS
MapReduce --> MapReduce
BigTable -- > HBase

模块

  • Hadoop Common: The common utilities that support the other Hadoop modules.支持hadoop其他模块的一般工具
  • Hadoop Distributed File System (HDFS™): A distributed file system that provides high-throughput access to application data.高吞吐分布式文件系统
  • Hadoop YARN: A framework for job scheduling and cluster resource management. 资源调度和任务管理
  • Hadoop MapReduce: A YARN-based system for parallel processing of large data sets.基于yarn的大数据并行处理系统

HDFS

组成

  • namenode:管理元数据,处理来自客户端的请求
    元数据:描述数据属性的数据、描述数据的数据
  • secondarynamenode:元数据的合并
  • datanode:具体数据的读写
  • client:文件读写请求的发起

HDFS机制

namenode

  • 负责元数据的管理,DataNode负责处理文件内容的读写请求
  • 处理client的读写的请求,负责管理文件系统的名字空间(namespace)以及客户端对文件的访问
  • 副本存放在哪些DataNode上由 NameNode来控制,根据全局情况做出块放置决定,读取文件时NameNode尽量让用户先读取最近的副本,降低带块消耗和读取时延
  • 全权管理数据块的复制、它周期性地从集群中的每个Datanode接收心跳信号和块状态报告(Blockreport),块状态报告包含了一个该Datanode上所有数据块的列表

datanode

  • 一个数据块在DataNode以文件存储在磁盘上,包括数据块本身、数据块的元数据(数据块的长度,块数据的校验和,以及时间戳)
  • DataNode启动后向NameNode注册,通过后,周期性(1小时)的向NameNode上报所有的块信息
  • 心跳是每3秒一次,心跳返回结果带有NameNode给该DataNode的命令如复制块数据到另一台机器,或删除某个数据块。如果超过10分钟没有收到某个DataNode 的心跳,则认为该节点不可用。

文件

  • block 默认128M,每个块有多个副本存储在不同的机器上
  • NameNode 是主节点,存储文件的元数据如文件名,文件目录结构,文件属性(生成时间,副本数,文件权限),以及每个文件的块列表以及块所在的DataNode等等
  • DataNode 在本地文件系统存储文件块数据,以及块数据的校验和
  • 可以创建、删除、移动或重命名文件,当文件创建、写入和关闭之后不能修改文件内容

namenode从datanode接受心跳和块报告

  • namenode启动后,datanode向namenode进行注册
  • 心跳

    心跳是每3秒一次,

    心跳返回结果带有NameNode给该DataNode的命令如删除块,
    复制块等

    如果超过10分钟没有收到某个DataNode 的心跳,则认为该
    节点不可用

  • 块报告

    DataNode启动后向NameNode注册,

    通过后,周期性(1小时)的向NameNode上报所有的块信息

  • 块损坏

    当DataNode读取block的时候,重新计算checksum,和创建
    时的对比

    DataNode 在其文件创建后三周验证其checksum

  • HDFS有哪些进程

    NameNode

    DataNode

    NodeManager

    ResourceManager

NameNode启动过程

  • NameNode元数据/命名空间持久化fsimage与edits
  • NameNode格式化,具体做什么事

    创建fsimage文件,存储fsimage信息

    创建edits文件

  • NameNode 启动过程

    加载fsimage和edits文件

    生成新的fsimage和edits文件

    等待DataNode注册与发送Block Report

  • DataNode 启动过程

    向NameNode注册、发送Block Report

  • NameNode SafeMode 安全模式

    namenode启动时会进入安全模式,此时只可读不可写


  1. Name启动的时候首先将fsimage(镜像)载入内存,并执行(replay)编辑日志editlog的的各项操作;
  2. 一旦在内存中建立文件系统元数据映射,则创建一个新的fsimage文件(这个过程不需SecondaryNameNode) 和一个空的editlog;
  3. 在安全模式下,各个datanode会向namenode发送块列表的最新情况;
  4. 此刻namenode运行在安全模式。即NameNode的文件系统对于客服端来说是只读的。(显示目录,显示文件内容等。写、删除、重命名都会失败);
  5. NameNode开始监听RPC和HTTP请求
    解释RPC:RPC(Remote Procedure Call Protocol)——远程过程通过协议,它是一种通过网络从远程计算机程序上请求服务,而不需要了解底层网络技术的协议;
  6. 系统中数据块的位置并不是由namenode维护的,而是以块列表形式存储在datanode中;
  7. 在系统的正常操作期间,namenode会在内存中保留所有块信息的映射信息。

HDFS启动流程及元数据的同步

  • 元数据的同步
    流程图:
    触发的阈值(hdfs-default.xml)
    dfs.namenode.checkpoint.period 3600
    dfs.namenode.checkpoint.txns 1百万个事务
  • NameNode 启动过程

    1. 加载fsimage和edits文件
    2. 合并生成新的fsimage,并生成edits文件
    3. 等待DataNode注册与发送心跳和Block Report
    4. NameNode 启动过程中会进入SafeMode(安全模式)

安全模式

在安全模式下,文件系统不允许修改
目的,是在系统启动时检查各个datanode数据的有效性

进入安全模式的三种方式

  1. 手动进入

    $ bin/hdfs dfsadmin -safemode enter

    $ bin/hdfs dfsadmin -safemode leave

  2. namenode启动会自动进入
  3. 正常块的个数/总的块个数<0.999 也会进入安全模式
<property>
        <name>dfs.namenode.safemode.threshold-pct</name>
        <value>0.999f</value>
    </property>

HDFS特点

  • 优点

    1. 处理超大文件
    2. 一次写入,多次读取
    3. 运行与廉价服务器
    4. 不移动数据到计算点,而是就地计算,减少网络阻塞
  • 缺点:

    1. 高延迟,不适合接入前台业务
    2. 不支持任意的修改

HDFS API

Java API

package com.ct.test;

import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Arrays;

import org.apache.commons.compress.utils.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.permission.FsPermission;
import org.junit.Before;
import org.junit.Test;

public class TestDemo {
    
    FileSystem fs = null;
    
//    public static void main(String[] args) throws IOException, InterruptedException, URISyntaxException {
//        
////        FileSystem fs = FileSystem.get(new URI("hdfs://centos01:8020"),
////                new Configuration(),
////                "chen");
////        
////        boolean success = fs.mkdirs(new Path("/test"));
////        
////        System.out.println(success);
////        test.setUp();
////        test.testMkdir();
////        test.testDelete();
//        
//        
//        
//        
//    }
    @Before
    //获取文件对象
    public void setUp() {
        Configuration conf = new Configuration();
        conf.set("dfs.replication", "7");
        
        try {
            fs = FileSystem.get(new URI("hdfs://centos01:8020"), 
                    conf, 
                    "chen");
        } catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } catch (URISyntaxException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }
    //创建文件夹
    @Test
    public void testMkdir() throws IllegalArgumentException, IOException {
        boolean success = fs.mkdirs(new Path("/result"));
        System.out.println(success);
    }
    
    
    //删除文件夹
    public void testDelete() throws IllegalArgumentException, IOException {
        fs.delete(new Path("/result"), true);
    }
    
    @Test
    //上传文件
    public void testUpload() throws IllegalArgumentException, IOException {
        FSDataOutputStream out = fs.create(new Path("/input/testUpload.log"));
        FileInputStream input = new FileInputStream("F:/test.txt");
        
        IOUtils.copy(input, out, 1024);
    }
    
    @Test
    public void testDownload() throws IllegalArgumentException, IOException {
        FSDataInputStream input = fs.open(new Path("/input/testUpload.log"));
        FileOutputStream out = new FileOutputStream("F:/test-copy.txt");
        
        IOUtils.copy(input, out, 1024);

    }
    
    @Test
    public void testList() throws FileNotFoundException, IllegalArgumentException, IOException {
        RemoteIterator<LocatedFileStatus> ri = fs.listFiles(new Path("/input"), true);
        
        while(ri.hasNext()) {
            LocatedFileStatus next = ri.next();
            next.getBlockLocations();
            String group = next.getGroup();
            long len = next.getLen();
            String owner = next.getOwner();
            FsPermission permission = next.getPermission();
            long blockSize = next.getBlockSize();
            short rep = next.getReplication();
            
            System.out.println(permission+"\t"+owner+"\t"+group);
            System.out.println(len+"\t"+blockSize+"\t"+rep);

            BlockLocation[] blockLocations = next.getBlockLocations();
            for (BlockLocation blktn : blockLocations) {
                System.out.println("length:"+blktn.getLength());
                System.out.println("offset:"+blktn.getOffset());
                System.out.println(Arrays.toString(blktn.getHosts()));
            }
            
        }
    }

}

HDFS读流程

Hadoop浅度学习指南(HDFS、YARN、MapReduce)

  1. 打开分布式文件调用 分布式文件DistributedFileSystem.open()方法
  2. 从 NameNode 获得 DataNode 地址DistributedFileSystem 使用 RPC 调用 NameNode,NameNode返回存有该副本的 DataNode 地址,DistributedFileSystem 返回一个输入流 FSDataInputStream对象,该对象封存了输入流DFSInputStream
  3. 连接到DataNode调用 输入流 FSDataInputStream 的 read() 方法,从而 输入流DFSInputStream 连接 DataNodes
  4. 读取DataNode反复调用 read()方法,从而将数据从 DataNode 传输到客户端
  5. 读取另外的DataNode直到完成到达块的末端时候,输入流 DFSInputStream 关闭与DataNode连接, 寻找下一个 DataNode
  6. 完成读取,关闭连接,即调用输入流 FSDataInputStream.close()

HDFS写流程

Hadoop浅度学习指南(HDFS、YARN、MapReduce)

  1. 发送创建文件请求:调用分布式文件系统DistributedFileSystem.create()方法
  2. NameNode中创建文件记录:分布式文件系统DistributedFileSystem 发送 RPC 请求给namenode,namenode 检查权限后创建一条记录,返回输出流 FSDataOutputStream,封装了输出流 DFSOutputDtream
  3. 客户端写入数据:输出流 DFSOutputDtream 将数据分成一个个的数据包,并写入内部队列。DataStreamer 根据 DataNode 列表来要求 namenode 分配适合的新块来存储数据备份。一组DataNode 构成管线(管线的 DataNode 之间使用 Socket 流式通信)
  4. 使用管线传输数据:DataStreamer 将数据包流式传输到管线第一个DataNode,第一个DataNode 再传到第二个DataNode ,直到完成。
  5. 确认队列:DataNode 收到数据后发送确认,管线的DataNode所有的确认组成一个确认队列。所有DataNode 都确认,管线数据包删除。
  6. 关闭:客户端对数据量调用close()方法。将剩余所有数据写入DataNode管线,并联系NameNode且发送文件写入完成信息之前等待确认。
  7. NameNode确认
  8. 故障处理:若过程中发生故障,则先关闭管线, 把队列中所有数据包添加回去队列,确保数据包不漏。为另一个正常DataNode的当前数据块指定一个新的标识,并将该标识传送给NameNode, 一遍故障DataNode在恢复后删除上面的不完整数据块. 从管线中删除故障DataNode 并把余下的数据块写入余下正常的DataNode。NameNode发现复本两不足时,会在另一个节点创建一个新的复本

YARN

组成

  • resourcemanger:负责全局的任务调度和资源管理(内存、CPU)、启动/监控applicationMaster 、监控NodeManager
  • nodemanger:单个节点的资源管理、处理来自resourcemanger和applicationmaster的任务请求
  • client:发起任务的请求
  • container:对环境的抽象,封装了CPU、内存、环境变量
  • applicationmaster:负责管理应用,为应用申请资源,任务的监控和容错

服务功能

  • ResourceManager

    • 处理客户端请求
    • 启动/监控ApplicationMaster
    • 监控NodeManager
    • 资源分配与调度
  • NodeManager

    • 单个节点上的资源管理
    • 处理来自ResourceManager的命令
    • 处理来自ApplicationMaster的命令
  • ApplicationMaster

    • 数据切分
    • 为应用程序申请资源,并分配给内部任务
    • 任务监控与容错
  • Container

    • 对任务运行环境的抽象,封装了CPU、内存等多维资源以及环境变量、启动命令等任务运行相关的信息

YARN工作流程

  1. 客户端向ResourceManager提交应用程序,其中包括ApplicationMaster、启动ApplicationMaster的命令、用户程序等;
  2. ResourceManager为该应用程序分配第一个Container,并与对应NodeManager通信,要求它在这个Container中启动应用程序的ApplicationMaster;
  3. ApplicationMaster向ResourceManager注册自己,启动成功后与ResourceManager保持心跳;
  4. ApplicationMaster向ResourceManager申请资源;
  5. 申请资源成功后,由ApplicationMaster进行初始化,然后与NodeManager通信,要求NodeManager启动Container。然后ApplicationMaster与NodeManager保持心跳,从而对NodeManager上运行的任务进行监控和管理;
  6. Container运行期间,向ApplicationMaster汇报自己的进度和状态信息,以便ApplicationMaster掌握任务运行状态,从而在任务失败是可以重新启动;
  7. 应用运行结束后,ApplicationMaster向ResourceManager注销自己,允许其所属的Container回收。

MapReduce

Map和Reduce 计算框架,编程模型 “分而治之”的思想, 分布式并行计算

Mapper

对一些独立元素组成的列表的每一个元素进行制定的操作,可高度并行

// step 1: Map Class
    /**
     * Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
     * 
     */
    //TODO update paragram
    public static class ModuleMapper extends
            Mapper<LongWritable, Text, Text, IntWritable> {
 
        @Override
        protected void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {
            // TODO Auto-generated method stub
        }

Reducer

对一个列表元素进行合并

// step 2: Reduce Class
    /**
     * Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
     * 
     */
    //TODO
    public static class ModuleReducer extends
            Reducer<Text, IntWritable, Text, IntWritable> {
 
        @Override
        protected void reduce(Text key, Iterable<IntWritable> values,
                Context context) throws IOException, InterruptedException {
            // TODO Auto-generated method stub
        }
    }

Job

// step 3: Driver ,component job, implements Tool
    public int run(String[] args) throws Exception {
        // 1: get configration
        Configuration configuration = getConf();
 
        // 2: create Job
        Job job = Job.getInstance(configuration, this.getClass()
                .getSimpleName());
        // run jar
        job.setJarByClass(this.getClass());
 
        // 3: set job
        // input -> map -> reduce -> output
        // 3.1 input
        Path inPath = new Path(args[0]);
        FileInputFormat.addInputPath(job, inPath);
 
        // 3.2: map
        job.setMapperClass(ModuleMapper.class);
        //TODO update paragram
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
 
        // 3.3: reduce
        job.setReducerClass(ModuleReducer.class);
        //TODO
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
 
        // 3.4: output
        Path outPath = new Path(args[1]);
        FileOutputFormat.setOutputPath(job, outPath);
 
        // 4: submit job
        boolean isSuccess = job.waitForCompletion(true);
 
        return isSuccess ? 0 : 1;
    }

WordCount

package com.wordcount;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class WordCountDemo extends Configured implements Tool {

    /**
     * map 任务的定义
     * Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
     * KEYIN    偏移量                    LongWritable
     * VALUEIN    一行文本                    Text
     * KEYOUT    单词                        Text
     * VALUEOUT    1                        IntWritable
     * 
     * map任务
     * 将一行文本拆分成单词
     * 
     *
     */
    
    public static class WCMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
        
        Text keyOut = new Text();
        IntWritable valueOut = new IntWritable();
        
        @Override
        protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context)
                throws IOException, InterruptedException {
            
            System.out.println("keyIn:"+key+"\t\t"+"valueIn:"+value);
            //1. 单词拆分
            String[] vals = value.toString().split(" ");
            
            //2. 遍历输出
            for (String val : vals) {
                keyOut.set(val);
                valueOut.set(1);
                context.write(keyOut, valueOut);
                
                System.out.println("keyOut:"+keyOut+"\t\t"+"valueOut:"+valueOut);
            }
        }
    }
    
    
    /**
     * 
     * Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
     * KEYIN    单词                        Text
     * VALUEIN    单词次数的集合                list的元素    IntWritable
     * KEYOUT    单词                        Text
     * VALUEOUT    总次数                    IntWritable
     *
     */
    
    public static class WCReducer extends Reducer<Text, IntWritable, Text, IntWritable>    {
        
        IntWritable valueOut = new IntWritable();
        
        @Override
        protected void reduce(Text key, Iterable<IntWritable> values,
                Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
            
            System.out.print("keyIn:"+key+"\t\t[");
            //1. 求次数综合
            int sum = 0;
            for (IntWritable value : values) {
                sum += value.get();
                
                System.out.print(value+",\t");
            }
            System.out.println("]");
            //2. 输出
            valueOut.set(sum);
            context.write(key, valueOut);
        }
    }
    
    
    
    @Override
    public int run(String[] args) throws Exception {
        //1 设置job
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);
        job.setJarByClass(this.getClass());
        job.setJobName(this.getClass().getSimpleName());
        
        //2. 设置map类和reduce类
        job.setMapperClass(WCMapper.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        
        job.setReducerClass(WCReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        
        //3 设置输入输出路径
        FileInputFormat.setInputPaths(job, args[0]);
        
        Path out = new Path(args[1]);
        FileSystem fs = out.getFileSystem(conf);
        if(fs.exists(out)) {
            fs.delete(out, true);
        }
        FileOutputFormat.setOutputPath(job, out);
        boolean success = job.waitForCompletion(true);
        return success?1:0;
    }
    
    public static void main(String[] args) {
        try {
            int run = ToolRunner.run(new WordCountDemo(), args);
            System.out.println(run==1?"成功":"失败");
        } catch (Exception e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }

}

MapReduce实现表的join

map join

适合大小表join,将小表缓存在内存中,join发生在map端

只缓存一次,在Mapper子类中重写setup方法,在setup方法中将小表文件装入内存中

Mapper子类中map方法读取大表

package com.join;

import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.util.HashMap;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class MapJoin extends Configured implements Tool {
    

    
    public static class MJMapper extends Mapper<LongWritable, Text, Text, Text> {
        
        HashMap<String, String> cacheMap = new HashMap<String, String>();
        
        // 首相将小表读入内存
        // 该方法只在每次任务开始时加载一次
        @Override
        protected void setup(Mapper<LongWritable, Text, Text, Text>.Context context)
                throws IOException, InterruptedException {
            String path = "F:\\input\\join\\dept.log";
            
            FileReader fr = new FileReader(path);
            BufferedReader br = new BufferedReader(fr);
            
            String line = null;
            while((line=br.readLine()) != null) {
                String[] vals = line.split("\t");
                cacheMap.put(vals[0], vals[1]);
            }
            
            br.close();
            fr.close();
        }
        
        // map端根据两张表的key进行合并
        @Override
        protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context)
                throws IOException, InterruptedException {
            String[] vals = value.toString().split("\t");
            
            String deptno = cacheMap.get(vals[2]);
            String dname = cacheMap.get(deptno);
            
            context.write(new Text(deptno), new Text(dname+"\t"+vals[0]+vals[1]));
        }
    }
    
    @Override
    public int run(String[] args) throws Exception {
        //1 设置job
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);
        job.setJarByClass(this.getClass());
        job.setJobName(this.getClass().getSimpleName());
        //2 设置map类和reduce
        job.setMapperClass(MJMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);
        
        
        //3 设置输入输出路径
        FileInputFormat.setInputPaths(job, args[0]);
        
        Path out = new Path(args[1]);
        FileSystem fs = out.getFileSystem(conf);
        if(fs.exists(out)) {
            fs.delete(out, true);
        }
        FileOutputFormat.setOutputPath(job, out);
        //4 提交
        boolean success = job.waitForCompletion(true);
        return success?1:0;
    }

    
    public static void main(String[] args) {
        try {
            int run = ToolRunner.run(new MapJoin(), args);
            System.out.println(run==1?"成功":"失败");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }


}

reduce join

适合两张大表join

package com.join;

import java.io.IOException;
import java.util.ArrayList;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class ReduceJoin extends Configured implements Tool {
    /*
     * 1    技术部
     * 1002    rose    1
     */
    
    public static class RJMapper extends Mapper<LongWritable, Text, Text, Text>{
        Text keyOut = new Text();
        Text valueOut = new Text();

        @Override
        protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context)
                throws IOException, InterruptedException {
            String[] vals = value.toString().split("\t");
            
            if(vals.length == 2) {
                keyOut.set(vals[0]);
                valueOut.set(vals[1]);
            }else {
                keyOut.set(vals[2]);
                valueOut.set(vals[0]+"\t"+vals[1]);
            }
            context.write(keyOut, valueOut);
            
        }
    }
    
    /*
     * keyIn:1
     * valueIn    List{[1007    lily], [1002    rose], [1001    jack], [技术部]}
     */
     
    // reduce端合并是依靠MapReduce shuffle过程中将相同key的行放入同一台机器
    
    public static class RJReducer extends Reducer<Text, Text, Text, Text> {
        ArrayList<String> employees = new ArrayList<String>();
        
        @Override
        protected void reduce(Text keyIn, Iterable<Text> valueIn, Reducer<Text, Text, Text, Text>.Context context)
                throws IOException, InterruptedException {
            String department = null;
            employees.clear();    //这里要注意清空list
            
            for (Text tmp : valueIn) {
                String[] vals = tmp.toString().split("\t");
                // 根据length判断这是张什么表
                if(vals.length == 1) {
                    department = vals[0];
                }else if(vals.length == 2) {
                    employees.add(tmp.toString());
                }
            }
            
            for (String employee : employees) {
                context.write(keyIn, new Text(employee+"\t"+department));
            }
            
        }
    }

    @Override
    public int run(String[] args) throws Exception {
        //1 设置job
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);
        job.setJarByClass(this.getClass());
        job.setJobName(this.getClass().getSimpleName());
        //2 设置map类和reduce
        job.setMapperClass(RJMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);
        
        job.setReducerClass(RJReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        //3 设置输入输出路径
        FileInputFormat.setInputPaths(job, args[0]);
        
        Path out = new Path(args[1]);
        FileSystem fs = out.getFileSystem(conf);
        if(fs.exists(out)) {
            fs.delete(out, true);
        }
        FileOutputFormat.setOutputPath(job, out);
        //4 提交
        boolean success = job.waitForCompletion(true);
        return success?1:0;
    }

    
    public static void main(String[] args) {
        try {
            int run = ToolRunner.run(new ReduceJoin(), args);
            System.out.println(run==1?"成功":"失败");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }


}

Hadoop的安装模式

  1. 单机模式
  2. 伪分布模式(pseudo)
  3. 完全分布模式

hadoop 开发环境搭建

maven环境搭建

  1. 安装maven

    1. 解压apache-maven-3.0.5.tar.gz
    2. 配置maven环境变量
      MAVEN_HOME=[maven的解压目录]
      %MAVEN_HOME%/bin;
    3. 命令提示符 mvn -version
  2. 解压repository.tar.gz到windows磁盘(如 E:toolsrepository)
  3. 修改settings.xml配置文件中指定的repository(修改apache-maven-3.0.5confsettings.xml)

    <localRepository>D:/repository</localRepository>
  4. 配置eclipse的maven环境
    windows->preferences->maven->

    ->installations->add->勾选自己安装的maven
    ->user settings->选择mave家目录/conf/settings
  5. 创建maven工程
  6. 将${hadoop_Home}/ect/hadoop/log4j.properties拷贝到项目的src目录
  7. 修改pom.xml

windows下搭建 hadoop开发环境

  1. Windows安装hadoop

    1. 解压hadoop-2.5.0.tar.gz到本地windows磁盘
    2. 配置hadoop的环境变量

      添加环境变量            HADOOP_HOME=hadoop解压目录
      在PATH环境变量中追加    %HADOOP_HOME%/bin;
    3. 测试

      hadoop -h
  2. eclipse安装插件

    1. 解压eclipse
    2. 将hadoop-eclipse-plugin-2.6.0.jar拷贝到${MyEclispe_HOME}/plugins
    3. 打开(重启)eclispe,菜单栏->windows->Preferneces->Hadoop MapReduce
  3. eclipse配置插件参数,连接HDFS

    1. 在linux中的hadoop安装目录下的etc/hadoop/hdfs-site.xml添加如下配置,重启HDFS的进程
    <!--关闭hdfs的文件权限控制-->
     <property>
         <name>dfs.permissions</name>
             <value>false</value>        
     </property>

    eclipse->windows->show views->other->输入MapReduce->点击map reduce locations
    右击->new hadoop locations

    1. Map/Reduce Master

      Mapreduce(V2) 
      host:[hostname]
      port:8032            //resourcemanager 的默认端口号
    2. DFS Master

      DFS Master
      host:[hostname]
      port:8020
  4. 拷贝winutils.exe 和hadoop.dll到${hadoop_HOME}/bin
  5. 单独拷贝hadoop.dll到C:WindowsSystem32
  6. 创建maven工程,通过pom.xml导包

    将lo4j.perperties文件拷贝到src/main/resources

打jar包,提交集群运行

  1. jar包时,指定主类

    yarn jar pv.jar /input/2015082818 /output

  2. jar包时,不指定主类

    yarn jar pv.jar 类的全限定名 /input/2015082818 /output
    不同包中可能有相同类名,所以要指定类的全限定名

Shuffle

Hadoop浅度学习指南(HDFS、YARN、MapReduce)

MapReduce框架核心部分(设计精髓):内核

shuffle 定义

​ map() 输出开始 到 reduce()输入开始 此阶段是shuffle
​ input -> map -> shuffle -> reduce -> output

shuffle分为两个阶段

​ map shuffle phase

​ reduce shuffle phase

shuffle主要操作

​ partitioner - map

​ sorter - map & reduce

​ combiner: map phase局部聚合操作 不是所有的MapReduce程序都可以进行局部聚合的

​ compress:map phase的输出数据压缩 针对所有MapReduce程序都可以进行设置

​ group - reduce

shuffle详解

所有操作都是针对map()输出的<key, value>数据进行的

map shuffle phase

  1. 进入环形缓冲区(默认100MB)

    当达到环形缓冲区内存的80%默认情况下,将会将缓冲区中的数据spill到本地磁盘中(溢出到MapTask所运行的NodeManager机器的本地磁盘中)

  2. 溢写

    并不是立即将缓冲区中的数据溢写到本地磁盘,而是需要经过一些操作

    1. 分区paritioner

      依据此MapReduce Job中Reduce Task个数进行分区决定map输出的数据被哪个reduce任务进行处理分析默认情况下,依据key采用HashPartitioner

// 通过取余将数据分配到哪个reduce处理
HashPartitioner
    int getParitition(key, value, numreducetask) {
        return ( key.hashCode&Integer.maxValue)%numreducetask;
    }
  1. 排序sorter

    会对每个分区中的数据进行排序,默认情况下依据key进行排序

  2. spill溢写

    将分区排序后的数据写到本地磁盘的一个文件中

    反复上述的操作,产生多个小文件

  1. 当溢写结束后

    • 此时将spill到本地磁盘的小文件进行一次合并。
    • combiner: (可选)map端的reduce
    • compress:(可配置) 数据减少了, 减少网络IO; 但压缩消耗CPU性能,也需要时间

reduce shuffle phase

  • merge 合并

    各个分区的数据合并在一起(当MapTask处理数据完成以后,告知AppMaster,然后AppMaster通知所有的ReduceTask,各个ReduceTask主动到已经完成的MapTask的本地磁盘,去拉取属于自己要处理的数据(分区中))

  • 排序 对各个分区中的数据进行排序

    最后每个分区形成一个文件(map输出的数据最后在个文件中),分区的,并且各个分区的数据已经进行了排序。

  • 分组group

    将相同key的value值存入到list集合,形成新的key, list(value),将key/value对数据传递给reduce()函数进行处理。

最后将(key, list(value))传给 reduce()

map个数及reduce个数确定

map个数确定

FileInputFormat.setMaxInputSplitSize(job, size);        设置切片最大值
FileInputFormat.setMinInputSplitSize(job, size);        设置切片最小值

FileInputFormat
        public List<InputSplit> getSplits(JobContext job){。。。}
            
        protected long computeSplitSize(long blockSize, long minSize, long maxSize) {
                    return Math.max(minSize, Math.min(maxSize, blockSize));
        }
    
        // minSize<=maxSize<blockSize    提高并发
        // minSize>blockSize            降低并发

reduce个数确定

job.setNumReduceTasks(2);
HashParitioner 决定map输出的类被哪个reduce处理

自定义shuffle

自定义key

  • key 和 value 都可以使用自定义类
  • 自定义的类不使用 Java 自带的 serializable 接口,改用hadoop 提供的Writable 接口
  • 注意重写 toString、 write、readFields
package com.flow;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.Writable;

/**
 * 不用serializable
 * 
 * 用Hadoop的Writable
 *
 */

public class Flow implements Writable {
    
    private long up;
    private long down;
    private long sum;
    
    
    public long getUp() {
        return up;
    }
    public void setUp(long up) {
        this.up = up;
    }
    public long getDown() {
        return down;
    }
    public void setDown(long down) {
        this.down = down;
    }
    public long getSum() {
        return sum;
    }
    public void setSum(long sum) {
        this.sum = sum;
    }
    
    

    @Override
    public String toString() {
        return up + "\t" + down + "\t" + sum;
    }
    @Override
    public void write(DataOutput out) throws IOException {
        out.writeLong(up);
        out.writeLong(down);
        out.writeLong(sum);

    }
    @Override
    public void readFields(DataInput in) throws IOException {
        up = in.readLong();
        down = in.readLong();
        sum = in.readLong();
    }
}

自定义分区

  • 调用 job 的 setNumReduceTasks 方法设置reduce 个数
  • setPartitionerClass 设置分区
public static class MyPartitioner extends Partitioner<Text, Flow> {

        @Override
        public int getPartition(Text key, Flow value, int numPartitions) {
            if(value.getSum()<1024) {
                return 0;
            }else if(value.getSum()<10*1024) {
                return 1;
            }
            return 2;
        }    
    }

排序

只能按照key排序,如果需要多重排序,需要自定义key
在shuffle过程中自动排序,无需手动调用方法

public class MyKey implements WritableComparable<MyKey>
//要排序的类要实现WritableComparable接口

    @Override
    public int compareTo(MyKey o) {
        long result = o.getSum() - this.getSum();
        if(result>0) {
            return 1;
        }else if(result<0) {
            return -1;
        }
        return o.getPhone().compareTo(this.getPhone());
    }

combiner

map端的小reduce,对每个map后的value进行reduce,减少数据传输

可以通过设置job.setCombinerClass(WCReducer.class);设置combiner

前后效果对比

原始数据
hello world
hello hadoop

hello world
hello java

keyIn:hadoop        [1,    ]
keyIn:hello        [1,    1,    1,    1,    ]
keyIn:java        [1,    ]
keyIn:world        [1,    1,    ]



keyIn:hadoop        [1,    ]
keyIn:hello        [2,    2,    ]
keyIn:java        [1,    ]
keyIn:world        [1,    1,    ]

分组

根据需求将key中相同的字段作为同一个key以减少键值对,作为一种优化的手段

重写 RawComparator 方法合并key中相同字段

通过 job.setGroupingComparatorClass(Mygroup.class); 调用

public static class Mygroup implements RawComparator<Person> {

        @Override
        public int compare(Person o1, Person o2) {
            // TODO Auto-generated method stub
            return 0;
        }

        @Override
        public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
            return WritableComparator.compareBytes(b1, 0, l1-4, b2, 0, l2-4);
        }
        
    }

hadoop优化

  1. 可以设置block默认大小
  2. 设置map个数
  3. 调整环形缓冲区大小
  4. 自定义分区 --> 解决数据清倾斜问题
  5. 自定义 combiner --> map端的小reduce,减少网络传输损耗
  6. 自定义分组 --> 减少键值对
  7. 设置reduce个数 --> 加快处理速度
  8. CombinerFileInputFormat --> 合并小文件
  9. 根据业务自定义key和value

Java MapReduce编程错误

  • org.apache.hadoop.io.LongWritable cannot be cast to org.apache.hadoop.io.IntWritable

    map方法把文件的行号当成key,所以要用LongWritable。

相关推荐