Hadoop里面的MapReduce编程模型

Hadoop里面的MapReduce编程模型,非常灵活,大部分环节我们都可以重写它的API,来灵活定制我们自己的一些特殊需求。

今天散仙要说的这个分区函数Partitioner,也是一样如此,下面我们先来看下Partitioner的作用:
对map端输出的数据key作一个散列,使数据能够均匀分布在各个reduce上进行后续操作,避免产生热点区。
Hadoop默认使用的分区函数是Hash Partitioner,源码如下:

/** 
 * Licensed to the Apache Software Foundation (ASF) under one 
 * or more contributor license agreements.  See the NOTICE file 
 * distributed with this work for additional information 
 * regarding copyright ownership.  The ASF licenses this file 
 * to you under the Apache License, Version 2.0 (the 
 * "License"); you may not use this file except in compliance 
 * with the License.  You may obtain a copy of the License at 
 * 
 *     http://www.apache.org/licenses/LICENSE-2.0 
 * 
 * Unless required by applicable law or agreed to in writing, software 
 * distributed under the License is distributed on an "AS IS" BASIS, 
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
 * See the License for the specific language governing permissions and 
 * limitations under the License. 
 */  
  
package org.apache.hadoop.mapreduce.lib.partition;  
  
import org.apache.hadoop.mapreduce.Partitioner;  
  
/** Partition keys by their {@link Object#hashCode()}. */  
public class HashPartitioner<K, V> extends Partitioner<K, V> {  
  
  /** Use {@link Object#hashCode()} to partition. */  
  public int getPartition(K key, V value,  
                          int numReduceTasks) {  
      //默认使用key的hash值与上int的最大值,避免出现数据溢出 的情况  
    return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;  
  }  
  
}  
/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.mapreduce.lib.partition;

import org.apache.hadoop.mapreduce.Partitioner;

/** Partition keys by their {@link Object#hashCode()}. */
public class HashPartitioner<K, V> extends Partitioner<K, V> {

  /** Use {@link Object#hashCode()} to partition. */
  public int getPartition(K key, V value,
                          int numReduceTasks) {
	  //默认使用key的hash值与上int的最大值,避免出现数据溢出 的情况
    return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
  }

}


大部分情况下,我们都会使用默认的分区函数,但有时我们又有一些,特殊的需求,而需要定制Partition来完成我们的业务,案例如下:
对如下数据,按字符串的长度分区,长度为1的放在一个,2的一个,3的各一个。

河南省;1  
河南;2  
中国;3  
中国人;4  
大;1  
小;3  
中;11  
河南省;1
河南;2
中国;3
中国人;4
大;1
小;3
中;11


这时候,我们使用默认的分区函数,就不行了,所以需要我们定制自己的Partition,首先分析下,我们需要3个分区输出,所以在设置reduce的个数时,一定要设置为3,其次在partition里,进行分区时,要根据长度具体分区,而不是根据字符串的hash码来分区。核心代码如下:

/** 
 * Partitioner 
 *  
 *  
 * */  
 public static class PPartition extends Partitioner<Text, Text>{   
    @Override  
    public int getPartition(Text arg0, Text arg1, int arg2) {  
         /** 
          * 自定义分区,实现长度不同的字符串,分到不同的reduce里面 
          *  
          * 现在只有3个长度的字符串,所以可以把reduce的个数设置为3 
          * 有几个分区,就设置为几 
          * */  
          
        String key=arg0.toString();  
        if(key.length()==1){  
            return 1%arg2;  
        }else if(key.length()==2){  
            return 2%arg2;  
        }else if(key.length()==3){  
            return 3%arg2;  
        }  
          
            
          
        return  0;  
    }  
       
       
       
       
 }  
/**
	 * Partitioner
	 * 
	 * 
	 * */
	 public static class PPartition extends Partitioner<Text, Text>{ 
		@Override
		public int getPartition(Text arg0, Text arg1, int arg2) {
			 /**
			  * 自定义分区,实现长度不同的字符串,分到不同的reduce里面
			  * 
			  * 现在只有3个长度的字符串,所以可以把reduce的个数设置为3
			  * 有几个分区,就设置为几
			  * */
			
			String key=arg0.toString();
			if(key.length()==1){
				return 1%arg2;
			}else if(key.length()==2){
				return 2%arg2;
			}else if(key.length()==3){
				return 3%arg2;
			}
			
			  
			
			return  0;
		}
		 
		 
		 
		 
	 }



全部代码如下:

package com.partition.test;  
  
import java.io.IOException;  
  
import org.apache.hadoop.fs.FileSystem;  
import org.apache.hadoop.fs.Path;  
import org.apache.hadoop.io.LongWritable;  
import org.apache.hadoop.io.Text;  
import org.apache.hadoop.mapred.JobConf;  
import org.apache.hadoop.mapreduce.Job;  
import org.apache.hadoop.mapreduce.Mapper;  
import org.apache.hadoop.mapreduce.Partitioner;  
import org.apache.hadoop.mapreduce.Reducer;  
import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;  
import org.apache.hadoop.mapreduce.lib.db.DBInputFormat;  
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;  
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;  
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;  
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;  
  
import com.qin.operadb.PersonRecoder;  
import com.qin.operadb.ReadMapDB;  
   
  
/** 
 * @author qindongliang 
 *  
 * 大数据交流群:376932160 
 *  
 *  
 * **/  
public class MyTestPartition {  
      
    /** 
     * map任务 
     *  
     * */  
    public static class PMapper extends Mapper<LongWritable, Text, Text, Text>{  
              
        @Override  
        protected void map(LongWritable key, Text value,Context context)  
                throws IOException, InterruptedException {  
            // System.out.println("进map了");  
            //mos.write(namedOutput, key, value);  
            String ss[]=value.toString().split(";");  
              
            context.write(new Text(ss[0]), new Text(ss[1]));  
              
              
              
        }  
          
          
    }  
      
    /** 
     * Partitioner 
     *  
     *  
     * */  
     public static class PPartition extends Partitioner<Text, Text>{   
        @Override  
        public int getPartition(Text arg0, Text arg1, int arg2) {  
             /** 
              * 自定义分区,实现长度不同的字符串,分到不同的reduce里面 
              *  
              * 现在只有3个长度的字符串,所以可以把reduce的个数设置为3 
              * 有几个分区,就设置为几 
              * */  
              
            String key=arg0.toString();  
            if(key.length()==1){  
                return 1%arg2;  
            }else if(key.length()==2){  
                return 2%arg2;  
            }else if(key.length()==3){  
                return 3%arg2;  
            }  
              
                
              
            return  0;  
        }  
           
           
           
           
     }  
       
   
     /*** 
      * Reduce任务 
      *  
      * **/  
     public static class PReduce extends Reducer<Text, Text, Text, Text>{  
         @Override  
        protected void reduce(Text arg0, Iterable<Text> arg1, Context arg2)  
                throws IOException, InterruptedException {  
               
              String key=arg0.toString().split(",")[0];  
             System.out.println("key==> "+key);  
             for(Text t:arg1){  
                 //System.out.println("Reduce:  "+arg0.toString()+"   "+t.toString());  
                 arg2.write(arg0, t);  
             }  
                 
               
        }  
           
       
           
     }  
       
       
     public static void main(String[] args) throws Exception{  
         JobConf conf=new JobConf(ReadMapDB.class);  
         //Configuration conf=new Configuration();  
         conf.set("mapred.job.tracker","192.168.75.130:9001");  
        //读取person中的数据字段  
         conf.setJar("tt.jar");  
        //注意这行代码放在最前面,进行初始化,否则会报  
       
       
        /**Job任务**/  
        Job job=new Job(conf, "testpartion");  
        job.setJarByClass(MyTestPartition.class);  
        System.out.println("模式:  "+conf.get("mapred.job.tracker"));;  
        // job.setCombinerClass(PCombine.class);  
         job.setPartitionerClass(PPartition.class);  
           
         job.setNumReduceTasks(3);//设置为3  
         job.setMapperClass(PMapper.class);  
        // MultipleOutputs.addNamedOutput(job, "hebei", TextOutputFormat.class, Text.class, Text.class);  
        // MultipleOutputs.addNamedOutput(job, "henan", TextOutputFormat.class, Text.class, Text.class);  
         job.setReducerClass(PReduce.class);  
         job.setOutputKeyClass(Text.class);  
         job.setOutputValueClass(Text.class);  
          
        String path="hdfs://192.168.75.130:9000/root/outputdb";  
        FileSystem fs=FileSystem.get(conf);  
        Path p=new Path(path);  
        if(fs.exists(p)){  
            fs.delete(p, true);  
            System.out.println("输出路径存在,已删除!");  
        }  
        FileInputFormat.setInputPaths(job, "hdfs://192.168.75.130:9000/root/input");  
        FileOutputFormat.setOutputPath(job,p );  
        System.exit(job.waitForCompletion(true) ? 0 : 1);    
           
           
    }  
      
      
  
}  
package com.partition.test;

import java.io.IOException;

import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
import org.apache.hadoop.mapreduce.lib.db.DBInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

import com.qin.operadb.PersonRecoder;
import com.qin.operadb.ReadMapDB;
 

/**
 * @author qindongliang
 * 
 * 大数据交流群:376932160
 * 
 * 
 * **/
public class MyTestPartition {
	
	/**
	 * map任务
	 * 
	 * */
	public static class PMapper extends Mapper<LongWritable, Text, Text, Text>{
			
		@Override
		protected void map(LongWritable key, Text value,Context context)
				throws IOException, InterruptedException {
			// System.out.println("进map了");
			//mos.write(namedOutput, key, value);
			String ss[]=value.toString().split(";");
			
			context.write(new Text(ss[0]), new Text(ss[1]));
			
			
			
		}
		
		
	}
	
	/**
	 * Partitioner
	 * 
	 * 
	 * */
	 public static class PPartition extends Partitioner<Text, Text>{ 
		@Override
		public int getPartition(Text arg0, Text arg1, int arg2) {
			 /**
			  * 自定义分区,实现长度不同的字符串,分到不同的reduce里面
			  * 
			  * 现在只有3个长度的字符串,所以可以把reduce的个数设置为3
			  * 有几个分区,就设置为几
			  * */
			
			String key=arg0.toString();
			if(key.length()==1){
				return 1%arg2;
			}else if(key.length()==2){
				return 2%arg2;
			}else if(key.length()==3){
				return 3%arg2;
			}
			
			  
			
			return  0;
		}
		 
		 
		 
		 
	 }
	 
 
	 /***
	  * Reduce任务
	  * 
	  * **/
	 public static class PReduce extends Reducer<Text, Text, Text, Text>{
		 @Override
		protected void reduce(Text arg0, Iterable<Text> arg1, Context arg2)
				throws IOException, InterruptedException {
			 
			  String key=arg0.toString().split(",")[0];
			 System.out.println("key==> "+key);
			 for(Text t:arg1){
				 //System.out.println("Reduce:  "+arg0.toString()+"   "+t.toString());
				 arg2.write(arg0, t);
			 }
			   
			 
		}
		 
	 
		 
	 }
	 
	 
	 public static void main(String[] args) throws Exception{
		 JobConf conf=new JobConf(ReadMapDB.class);
		 //Configuration conf=new Configuration();
	  	 conf.set("mapred.job.tracker","192.168.75.130:9001");
		//读取person中的数据字段
	  	 conf.setJar("tt.jar");
		//注意这行代码放在最前面,进行初始化,否则会报
	 
	 
		/**Job任务**/
		Job job=new Job(conf, "testpartion");
		job.setJarByClass(MyTestPartition.class);
		System.out.println("模式:  "+conf.get("mapred.job.tracker"));;
		// job.setCombinerClass(PCombine.class);
		 job.setPartitionerClass(PPartition.class);
		 
		 job.setNumReduceTasks(3);//设置为3
		 job.setMapperClass(PMapper.class);
		// MultipleOutputs.addNamedOutput(job, "hebei", TextOutputFormat.class, Text.class, Text.class);
		// MultipleOutputs.addNamedOutput(job, "henan", TextOutputFormat.class, Text.class, Text.class);
		 job.setReducerClass(PReduce.class);
		 job.setOutputKeyClass(Text.class);
		 job.setOutputValueClass(Text.class);
	    
		String path="hdfs://192.168.75.130:9000/root/outputdb";
		FileSystem fs=FileSystem.get(conf);
		Path p=new Path(path);
		if(fs.exists(p)){
			fs.delete(p, true);
			System.out.println("输出路径存在,已删除!");
		}
		FileInputFormat.setInputPaths(job, "hdfs://192.168.75.130:9000/root/input");
		FileOutputFormat.setOutputPath(job,p );
		System.exit(job.waitForCompletion(true) ? 0 : 1);  
		 
		 
	}
	
	

}


运行情况如下:

模式:  192.168.75.130:9001  
输出路径存在,已删除!  
WARN - JobClient.copyAndConfigureFiles(746) | Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.  
INFO - FileInputFormat.listStatus(237) | Total input paths to process : 1  
WARN - NativeCodeLoader.<clinit>(52) | Unable to load native-hadoop library for your platform... using builtin-java classes where applicable  
WARN - LoadSnappy.<clinit>(46) | Snappy native library not loaded  
INFO - JobClient.monitorAndPrintJob(1380) | Running job: job_201404101853_0005  
INFO - JobClient.monitorAndPrintJob(1393) |  map 0% reduce 0%  
INFO - JobClient.monitorAndPrintJob(1393) |  map 100% reduce 0%  
INFO - JobClient.monitorAndPrintJob(1393) |  map 100% reduce 11%  
INFO - JobClient.monitorAndPrintJob(1393) |  map 100% reduce 22%  
INFO - JobClient.monitorAndPrintJob(1393) |  map 100% reduce 55%  
INFO - JobClient.monitorAndPrintJob(1393) |  map 100% reduce 100%  
INFO - JobClient.monitorAndPrintJob(1448) | Job complete: job_201404101853_0005  
INFO - Counters.log(585) | Counters: 29  
INFO - Counters.log(587) |   Job Counters   
INFO - Counters.log(589) |     Launched reduce tasks=3  
INFO - Counters.log(589) |     SLOTS_MILLIS_MAPS=7422  
INFO - Counters.log(589) |     Total time spent by all reduces waiting after reserving slots (ms)=0  
INFO - Counters.log(589) |     Total time spent by all maps waiting after reserving slots (ms)=0  
INFO - Counters.log(589) |     Launched map tasks=1  
INFO - Counters.log(589) |     Data-local map tasks=1  
INFO - Counters.log(589) |     SLOTS_MILLIS_REDUCES=30036  
INFO - Counters.log(587) |   File Output Format Counters   
INFO - Counters.log(589) |     Bytes Written=61  
INFO - Counters.log(587) |   FileSystemCounters  
INFO - Counters.log(589) |     FILE_BYTES_READ=93  
INFO - Counters.log(589) |     HDFS_BYTES_READ=179  
INFO - Counters.log(589) |     FILE_BYTES_WRITTEN=218396  
INFO - Counters.log(589) |     HDFS_BYTES_WRITTEN=61  
INFO - Counters.log(587) |   File Input Format Counters   
INFO - Counters.log(589) |     Bytes Read=68  
INFO - Counters.log(587) |   Map-Reduce Framework  
INFO - Counters.log(589) |     Map output materialized bytes=93  
INFO - Counters.log(589) |     Map input records=7  
INFO - Counters.log(589) |     Reduce shuffle bytes=93  
INFO - Counters.log(589) |     Spilled Records=14  
INFO - Counters.log(589) |     Map output bytes=61  
INFO - Counters.log(589) |     Total committed heap usage (bytes)=207491072  
INFO - Counters.log(589) |     CPU time spent (ms)=2650  
INFO - Counters.log(589) |     Combine input records=0  
INFO - Counters.log(589) |     SPLIT_RAW_BYTES=111  
INFO - Counters.log(589) |     Reduce input records=7  
INFO - Counters.log(589) |     Reduce input groups=7  
INFO - Counters.log(589) |     Combine output records=0  
INFO - Counters.log(589) |     Physical memory (bytes) snapshot=422174720  
INFO - Counters.log(589) |     Reduce output records=7  
INFO - Counters.log(589) |     Virtual memory (bytes) snapshot=2935713792  
INFO - Counters.log(589) |     Map output records=7  
模式:  192.168.75.130:9001
输出路径存在,已删除!
WARN - JobClient.copyAndConfigureFiles(746) | Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
INFO - FileInputFormat.listStatus(237) | Total input paths to process : 1
WARN - NativeCodeLoader.<clinit>(52) | Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
WARN - LoadSnappy.<clinit>(46) | Snappy native library not loaded
INFO - JobClient.monitorAndPrintJob(1380) | Running job: job_201404101853_0005
INFO - JobClient.monitorAndPrintJob(1393) |  map 0% reduce 0%
INFO - JobClient.monitorAndPrintJob(1393) |  map 100% reduce 0%
INFO - JobClient.monitorAndPrintJob(1393) |  map 100% reduce 11%
INFO - JobClient.monitorAndPrintJob(1393) |  map 100% reduce 22%
INFO - JobClient.monitorAndPrintJob(1393) |  map 100% reduce 55%
INFO - JobClient.monitorAndPrintJob(1393) |  map 100% reduce 100%
INFO - JobClient.monitorAndPrintJob(1448) | Job complete: job_201404101853_0005
INFO - Counters.log(585) | Counters: 29
INFO - Counters.log(587) |   Job Counters 
INFO - Counters.log(589) |     Launched reduce tasks=3
INFO - Counters.log(589) |     SLOTS_MILLIS_MAPS=7422
INFO - Counters.log(589) |     Total time spent by all reduces waiting after reserving slots (ms)=0
INFO - Counters.log(589) |     Total time spent by all maps waiting after reserving slots (ms)=0
INFO - Counters.log(589) |     Launched map tasks=1
INFO - Counters.log(589) |     Data-local map tasks=1
INFO - Counters.log(589) |     SLOTS_MILLIS_REDUCES=30036
INFO - Counters.log(587) |   File Output Format Counters 
INFO - Counters.log(589) |     Bytes Written=61
INFO - Counters.log(587) |   FileSystemCounters
INFO - Counters.log(589) |     FILE_BYTES_READ=93
INFO - Counters.log(589) |     HDFS_BYTES_READ=179
INFO - Counters.log(589) |     FILE_BYTES_WRITTEN=218396
INFO - Counters.log(589) |     HDFS_BYTES_WRITTEN=61
INFO - Counters.log(587) |   File Input Format Counters 
INFO - Counters.log(589) |     Bytes Read=68
INFO - Counters.log(587) |   Map-Reduce Framework
INFO - Counters.log(589) |     Map output materialized bytes=93
INFO - Counters.log(589) |     Map input records=7
INFO - Counters.log(589) |     Reduce shuffle bytes=93
INFO - Counters.log(589) |     Spilled Records=14
INFO - Counters.log(589) |     Map output bytes=61
INFO - Counters.log(589) |     Total committed heap usage (bytes)=207491072
INFO - Counters.log(589) |     CPU time spent (ms)=2650
INFO - Counters.log(589) |     Combine input records=0
INFO - Counters.log(589) |     SPLIT_RAW_BYTES=111
INFO - Counters.log(589) |     Reduce input records=7
INFO - Counters.log(589) |     Reduce input groups=7
INFO - Counters.log(589) |     Combine output records=0
INFO - Counters.log(589) |     Physical memory (bytes) snapshot=422174720
INFO - Counters.log(589) |     Reduce output records=7
INFO - Counters.log(589) |     Virtual memory (bytes) snapshot=2935713792
INFO - Counters.log(589) |     Map output records=7


运行后的结果文件如下:
Hadoop里面的MapReduce编程模型

其中,part-r-000000里面的数据

中国人 4  
河南省 1  
中国人	4
河南省	1



其中,part-r-000001里面的数据

中   11  
大   1  
小   3  
中	11
大	1
小	3




其中,part-r-000002里面的数据

中国  3  
河南  2  
中国	3
河南	2


至此,我们使用自定义的分区策略完美的实现了,数据分区了。


总结:引用一段话

   (Partition)分区出现的必要性,如何使用Hadoop产生一个全局排序的文件?最简单的方法就是使用一个分区,但是该方法在处理大型文件时效率极低,因为一台机器必须处理所有输出文件,从而完全丧失了MapReduce所提供的并行架构的优势。事实上我们可以这样做,首先创建一系列排好序的文件;其次,串联这些文件(类似于归并排序);最后得到一个全局有序的文件。主要的思路是使用一个partitioner来描述全局排序的输出。比方说我们有1000个1-10000的数据,跑10个ruduce任务, 如果我们运行进行partition的时候,能够将在1-1000中数据的分配到第一个reduce中,1001-2000的数据分配到第二个reduce中,以此类推。即第n个reduce所分配到的数据全部大于第n-1个reduce中的数据。这样,每个reduce出来之后都是有序的了,我们只要cat所有的输出文件,变成一个大的文件,就都是有序的了

基本思路就是这样,但是现在有一个问题,就是数据的区间如何划分,在数据量大,还有我们并不清楚数据分布的情况下。一个比较简单的方法就是采样,假如有一亿的数据,我们可以对数据进行采样,如取10000个数据采样,然后对采样数据分区间。在Hadoop中,patition我们可以用TotalOrderPartitioner替换默认的分区。然后将采样的结果传给他,就可以实现我们想要的分区。在采样时,我们可以使用hadoop的几种采样工具,RandomSampler,InputSampler,IntervalSampler。

       这样,我们就可以对利用分布式文件系统进行大数据量的排序了,我们也可以重写Partitioner类中的compare函数,来定义比较的规则,从而可以实现字符串或其他非数字类型的排序,也可以实现二次排序乃至多次排序。 

相关推荐