MongoDB之Hadoop驱动介绍

1. 一些概念

Hadoop是一套Apache开源的分布式计算框架,其中包括了分布式文件系统DFS与分布式计算模型MapReduce,而MongoDB是一个面向文档的分布式数据库,它是NoSql的一种,而这里所要介绍的就是一个MongoDB的Hadoop驱动,这里就是把MongoDB作为MapReduce的输入源,充分利用MapReduce的优势来对MongoDB的数据进行处理与计算。

2. MongoDB的Hadoop驱动

目前这个版本的Hadoop驱动还是测试版本,还不能应用到实际的生产环境中去。
你可以从下面网址https://github.com/mongodb/mongo-hadoop下载到最新的驱动包,下面是它的一些依赖说明:
  • 目前推荐用最新的Hadoop 0.20.203版本,或者是用Cloudera CHD3还做
  • MongoDB的版本最好是用1.8+
  • 还有是MongoDB的java驱动必须是2.5.3+
它的一些特点:
  • 提供了一个Hadoop的Input和Output适配层,读于对数据的读入与写出
  • 提供了大部分参数的可配置化,这些参数都可有XML配置文件来进行配置,你可以在配置文件中定义要查询的字段,查询条件,排序策略等
目前还不支持的功能:
  • 目前还不支持多Sharding的源数据读取
  • 还不支持数据的split操作

3. 代码分析
运行其examples中的WordCount.java代码

 
  1. // 事先在MongoDB的test数据库的in表中加入的测试样本,使用如下方法    
  2.    /** 
  3. * test.in db.in.insert( { x : "eliot was here" } ) db.in.insert( { x : 
  4. * "eliot is here" } ) db.in.insert( { x : "who is here" } ) = 
  5. */  
  6. ublic class WordCount {  
  7.   
  8.   
  9.    private static final Log log = LogFactory.getLog( WordCount.class );  
  10.   
  11.   
  12. // 这是一个Map操作   
  13.    public static class TokenizerMapper extends Mapper<Object, BSONObject, Text, IntWritable> {  
  14.   
  15.   
  16.        private final static IntWritable one = new IntWritable( 1 );  
  17.        private final Text word = new Text();  
  18.   
  19.   
  20.        public void map( Object key , BSONObject value , Context context ) throws IOException, InterruptedException{  
  21.   
  22.   
  23.            System.out.println( "key: " + key );  
  24.            System.out.println( "value: " + value );  
  25.   
  26.   
  27.         // 对词进行按空格切分   
  28.            final StringTokenizer itr = new StringTokenizer( value.get( "x" ).toString() );  
  29.            while ( itr.hasMoreTokens() ) {  
  30.                word.set( itr.nextToken() );  
  31.                context.write( word, one ); // 这里的key为词,而value为1   
  32.            }  
  33.        }  
  34.    }  
  35.   
  36.   
  37. // 这是Reduce操作,用于计算词出现的频率   
  38.    public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {  
  39.   
  40.   
  41.        private final IntWritable result = new IntWritable();  
  42.   
  43.   
  44.        public void reduce( Text key , Iterable<IntWritable> values , Context context ) throws IOException, InterruptedException{  
  45.   
  46.   
  47.         // 计算词出现的频率,把相同词的value相加   
  48.            int sum = 0;  
  49.            for ( final IntWritable val : values ) {  
  50.                sum += val.get();  
  51.            }  
  52.            result.set( sum );  
  53.            context.write( key, result ); // key为单个词,value为这个词所对应的词频率   
  54.        }  
  55.    }  
  56.   
  57.   
  58.    public static void main( String[] args ) throws Exception{  
  59.   
  60.   
  61.        final Configuration conf = new Configuration();  
  62.     // 定义MongoDB数据库的输入与输出表名,这里是调用本地的MongoDB,默认端口号为27017   
  63.        MongoConfigUtil.setInputURI( conf, "mongodb://localhost/test.in" );  
  64.        MongoConfigUtil.setOutputURI( conf, "mongodb://localhost/test.out" );  
  65.        System.out.println( "Conf: " + conf );  
  66.   
  67.   
  68.        final Job job = new Job( conf , "word count" );  
  69.   
  70.   
  71.        job.setJarByClass( WordCount.class );  
  72.   
  73.   
  74.     // 定义Mapper,Reduce与Combiner类   
  75.        job.setMapperClass( TokenizerMapper.class );  
  76.   
  77.   
  78.        job.setCombinerClass( IntSumReducer.class );  
  79.        job.setReducerClass( IntSumReducer.class );  
  80.   
  81.   
  82.     // 定义Mapper与Reduce的输出key/value的类型   
  83.        job.setOutputKeyClass( Text.class );  
  84.        job.setOutputValueClass( IntWritable.class );  
  85.   
  86.   
  87.     // 定义InputFormat与OutputFormat的类型   
  88.        job.setInputFormatClass( MongoInputFormat.class );  
  89.        job.setOutputFormatClass( MongoOutputFormat.class );  
  90.   
  91.   
  92.        System.exit( job.waitForCompletion( true ) ? 0 : 1 );  
  93.    }  
  94.       

4. 分块机制的简单介绍

这里没有实现对不同shard的split操作,也就是说,对于分布在不同shard上的数据,只会产生一个Map操作。
这里本人提供了一个分片的思路,有兴趣的可以讨论一下。

我们知道,对于Collection分块后,会产生一个Config数据库,在这个数据库下有一个叫做chunks的表,其中每个chunk记录了start_row与end_row,而这些chunk可以分布在不同的shard上,我们可以通过分析这个Collection来得到每个shard上的chunk信息,从而把每个shard上的chunk信息组合成一个InputSplit,这就是这里的MongoInputSplit,这样的话,只要去修改MongoInputFormat这个类的getSplits这个方法,加入对chunks表的分析,得到shard的信息,这样就可以实现多split的Map操作,对于不同的Shard,每个Map都会调用本地的Mongos代理服务,这样就实现了移动计算而不是移动数据的目的。

相关推荐