Mongodb中Aggregation特性

    Mongodb是目前最受欢迎的大数据存储平台之一,它可以作为云计算技术的底层存储层,比如为spark、hadoop、pig、hive、drill等计算框架提供源数据。Mongodb本身也提供了aggregation、mapreduce特性,以支持对大数据的计算、统计、分类等需求。

     Aggregation简单来说,就是提供数据统计、分析、分类的方法,这与mapreduce有异曲同工之处,只不过mongodb做了更多的封装与优化,让数据操作更加便捷和易用。Aggregation操作,接收指定collection的数据集,通过计算后返回result数据;一个aggregation操作,从input源数据到output结果数据,中间会依次经过多个stages,整体而言就是一个pipeline;目前有10种stages,我们稍后介绍;它还提供了丰富的Expression(表达式)来辅助计算。我们展示一个例子:

    SQL语义:

select zipcode, count(*) AS total from <collection> where status = 0 group by zipcode order by total

    原始数据结构:

{ "zipcode" : 1000000, "amount" : 5, "created" : { "$date" : 1448445890763 }, "status" : 1 }

    Aggregation查询语句:

db.<collection>.aggregate([
	{"$match" : { "status" : 0 }}, 
	{"$group" : { "_id" : "$zipcode", "total" : { "$sum" : "$amount" }}},
	{"$sort":{total : 1}},
	{"$out":"_group_"}])

    JAVA代码示例: 

List<Document> pipeline = new ArrayList<Document>();
Document match = new Document("$match",new Document("status",0));
pipeline.add(match);//match stage
Document group = new Document();
group.put("$group", new Document("_id", "$zipcode").append("total", new Document("$sum", "$amount")));
pipeline.add(group);

Document sort = new Document("$sort",new Document("total",1));
pipeline.add(sort);

String output = "_group_";
Document out = new Document("$out",output);
pipeline.add(out);
source.aggregate(pipeline)
        .allowDiskUse(true);
MongoCollection<Document> result = db.getCollection(output);
MongoCursor<Document> cursor = result.find().batchSize(100).iterator();
while (cursor.hasNext()) {
    Document item = cursor.next();
    System.out.println(item.toJson());
}
cursor.close();

    此aggregation操作最终的结果输出到了“_group_”临时表中,我们可以查看它的结果数据大致如下:

{ "_id" : 1000005, "total" : 43425 }
{ "_id" : 1000008, "total" : 43809 }
{ "_id" : 1000003, "total" : 44716 }
{ "_id" : 1000000, "total" : 45130 }
{ "_id" : 1000002, "total" : 45205 }
...

   mongodb已经提供了三个“单一用途的”aggregation操作:

    1)count:计算符合query的documents的总个数,语义同“select count(*) from <table> where <query>”,mongodb的语法为:

db.<collection>.count({"status":0})

    2)distinct:获取符合query中指定filed的不同值的列表,语义同“select distinct(<field>) from <table> where <query>”。

db.<collection>.distinct("zipcode",{"status" : 0})

    3)group:分组,注意它和aggregation中的“$group”有些类似,但不是一个操作,因为group无法在sharding环境中使用,所以mongodb建议使用“$group”进行替代,此处不再详解,请参见【group操作

一、特性

    1、Aggregation有几个核心的特性:

    1)支持多种stages

    2)可以将计算结果保存在collection中,在sharding环境中仍然适用,而且在output之前可以对结果数据进行“修剪”;当然可以将结果数据保存在内存(inline)并返回cursor,便于客户端访问结果数据。

    3)同一种stage可以出现多次 ,这相对于mapreduce而言是个巨大的进步。(这种情况通常需要多个mapreduce才能完成) 

    4)stages中可以使用内置的多种计算操作、表达式,几乎可以覆盖绝大部分数据统计、分析的需求,所以用aggregation开发日常数据处理任务,是非常简单的事情。当然相对于mapreduce这种更加开放的计算模型(基于javascript语法和解析引擎),aggregation在计算复杂的数据任务时还不够灵活,但是书写mapreduce脚本也是一件复杂的事情。

    2、局限性

    无论如何,aggregation返回的计算结果,每条document的大小不得超过16M(mongodb中document的最大尺寸,此参数不可修改),在下文中我们会了解到$push、$addToSet等计算操作,有可能会导致此问题的发生。在一些计算方式中,可能最终结果只产出一条document,那么就更需要注意此问题。通常情况下,aggregation可以output任意多条documents,结果数据的总size是没有限制的,但是结果不能输出到sharding collection中,这与mapreduce是有区别的。

    如果你从事过大数据计算、数据统计等相关工作,应该知道每个计算任务(job或task)都会使用独立的有限大小的内存空间,mongodb没有提供复杂的内存分配模型(任务调度算法),只是简单的限定每个stage最多使用100M内存,如果超过此值将终止计算并返回error;为了支持较大数据集合的处理,我们可以指定“allowDiskUse”参数将“溢出”的数据写入本地的临时文件中(临时的collection),这个参数我们通常需要设定为true。(参见上述示例)

    

    3、优化

    aggregation操作,简单而言就是将整个collection的数据全部input到pipeline中,经过多个stages计算之后,输出少量数据的过程。由mongodb引擎的特性决定,读取整个collection数据对磁盘和内存消耗都是非常巨大的,即使是在sharding环境中;那么尽可能的减少input数据量可以有效的提升处理的效率,对于有sort操作的,就需要更加谨慎,因为排序更加消耗内存和CPU。那么提高效率的手段之一,就是尽可能的使用索引(indexes),让$match和$sort尽可能的出现在pipeline的开端,$match使用索引可以更加高效的过滤input数据集进而不需要全表扫描,$sort使用索引可以避免额外的排序(因为索引本身就是有序的),索引只会在访问原始document时有效,如果是pipeline的中间数据,比如$group之后的$sort将无法使用到索引;当然索引的建立,是依据application中数据的访问模式而定,我们通常不会根据aggregation的需要而增加索引。

    如果只需要对部分数据进行计算,那么可以将$match、$limit、$skip放置在pipeline的开端,那么handler只需要读取有效的、较少数量的documents;不恰当的做法,是将$limit、$skip放置在pipeline的末端,这意味着可能读取、处理更多的数据,产生更多的结果集,这意味着对性能的浪费。

    4、sharded collection

    上文已经提到sharding collection不支持group方法(即db.<collection>.group(),而非pipeline中的$group);但是它支持aggregation pipeline和mapreduce操作。当aggregation操作sharded collection时,pipeline将会被分成2部分:源数据过滤(filters),和后期处理(prost-processing);第一部分主要就是从collection中读取数据,或者根据索引过滤数据,位于pipeline开端的$match、$limit将会包含在内,mongos将会把aggregation操作发给特定的shards,如果$match中不包含sharding key,那么将会把操作发给所有的shards,并在每个shard上执行aggregation的第一部分的stages,此部分最大的特点就是不对数据进行计算,仅仅筛选并创建cursor;第二部分是后期处理部分,它将在primary shard上运行(概念参见【primary shard】),通常为$group、$unwind、$out、$project,其他stages也可以在第二部分,priamry合并其他shards上的cursor(迭代读取数据),读取数据并执行aggregation的第二部分的操作,然后将最终执行结果返回给mongos。(旧版中第二部分pipeline在mongos上执行);由此可见,尽可能早的过滤数据,可以有效降低primary的负载压力。我们可以在aggregate方法中指定“explain”参数,查看各个阶段执行的计划,其中cursor字段即为第一部分,其他的为第二部分,我们也能看出cursor分布在shards的情况。

db.orders.aggregate(
                     [
                       { $match: { status: "A" } },
                       { $group: { _id: "$cust_id", total: { $sum: "$amount" } } },
                       { $sort: { total: -1 } }
                     ],
                     {
                       explain: true
                     }
                   )

    其中limit、skip在各个shards上的分发原理,请参见【mongos部分】

    3.2版本调整:mongodb为了提升sharding集群的整体性能,除了$out和$lookup需要在primary shard上运行外,其他的stages均可以并行的在符合匹配规则的shards执行,然后将各自的results数据转发给其中某个shard做最后的merge,这种方式有效了减轻了primary shard的负载压力;简单而言,3.2的pipleline处理模式基本没有改变,只是“第二部分”工作将从primary shard上转义到其中某个shard上。

二、Stages

    Pipeline是有多个stages组成,每个stage提供不同的数据操作,stages在声明时是数组结构,具有有序性,当前stage的计算结果将作为下一个stage的输入除$out之外,每种stages均可以在pipeline中出现多次,aggregate支持如下几种stages:

    1、$project

    与mongodb查询语句中的“projection”功能类似,对输入的document字段进行“取舍”,指定哪些字段将会包含(include)或者不包含(exclude)在输出中。

$project:{_id:0,name:{$concat:["$last", "," ,"$first"},"userid":1}

    默认情况下“_id”字段一定被包含,除非显式指定“_id : 0”才能排除掉;“0”或者false表示“不包含”,“1”或者true表示包含,如果值是一个expression,则表示新增或者重置字段的值。比如上述文档,表示_id字段排除掉,“userid”字段包含,增加一个新字段“name”,其值为“last”和“first”两个字段值的拼接。

    当需要增加一个字段,它的值确实是1、0、true或者false时,这就与$project语义有些冲突,为了确保值原样输出,那么就需要告知$project不要解析它们,这需要使用$literal操作符:

$project:{"sum":{$literal : 1}}

    和document访问模式一样,$project也支持通过“.”路径访问内嵌文档:

$project:{_id:0,"friends.name":1}或者
$project:{_id:0,"firends":{"name":1}}

    3.2+版本之后,$avg、$max、$min、$sum这四个“聚合函数”(原来只能在$group中使用)也可以在$project中使用了。

    2、$match

    根据query条件筛选文件,符合条件的文件将会传递给下一个stage。$match的语法和mongodb query语法一样,且$match中不能使用aggregate expression或者比较操作,只能使用query操作允许的操作符(参见Query操作。$match用于筛选数据,为了提高后续的数据处理效率,尽可能的将$match放于pipeline的前端以减少数据读取或者计算量;当然$match也可以放在$out之前用于控制数据输出量。如果$match放于pipeline的开始,还可以使用到indexes机制提高数据查询的性能。

$match:{"status":1}

    3、$redact

    根据字段所处的document结构的级别,对文档进行“修剪”,它通常和“判断语句if-else”结合使用即“$cond”。$redact可选值有3个:

    1)$$DESCEND:包含当前document级别的所有fields。当前级别字段的内嵌文档将会被继续检测。

    2)$$PRUNE:不包含当前文档或者内嵌文档级别的所有字段,不会继续检测此级别的其他字段,即使这些字段的内嵌文档持有相同的访问级别。

    3)$$KEEP:包含当前文档或内嵌文档级别的所有字段,不再继续检测此级别的其他字段,即使这些字段的内嵌文档中持有不同的访问级别。

{
  _id: 1,
  tags: [ "G", "STLW" ],
  year: 2014,
  subsections: [
    {
      subtitle: "Section 1",
      tags: [ "SI", "G" ],
    },
    {
      subtitle: "Section 2",
      tags: [ "STLW" ],
    },
    {
      subtitle: "Section 3",
      tags: [ "TK" ],
      content: {
        tags: [ "HCS" ]
      }
    }
  ]
}

    那么对于语句:

$redact: {$cond: {
           if: { 
           	$gt: [ { $size: { $setIntersection: [ "$tags", ["STLW","G"] ] } }, 0 ] },
           then: "$$DESCEND",
           else: "$$PRUNE"
         }
       }

    $setIntersection表示将2个数组的交集中不同元素的个数,$cond就是一个三元表达式,此例中表示“如果交集元素的个数大于0,则值为为$$DESCEND,否则为$$PRUNE”。对于此文档(ROOT级别)的最高级别的tags值为["G","STLW"],此级别值为$$DESCEND,即此tage同级别的其他字段将会包含;那么继续检测“subsections.tags”级别的所有文档(是个数组,则逐个检测),基本思路类似,如果此级别返回$$DECEND那么继续检测“subsections.tags.content.tags”是否符合访问规则,如果返回$$PRUNE,那么此tags所在的内嵌文档的所有字段将被排除,即使与此tags同级别的contents.tags符合访问规则。最终输出结果:

{
  "_id" : 1,
  "tags" : [ "G", "STLW" ],
  "year" : 2014,
  "subsections" : [
    {
      "subtitle" : "Section 1",
      "tags" : [ "SI", "G" ],
      "content" : "Section 1"
    },
    {
      "subtitle" : "Section 2: Analysis",
      "tags" : [ "STLW" ],
      "content" : "Section 2"
    }
  ]
}

    4、$limit、$skip、$sort

    同Query中的limit、skip、$sort限定符。如果$limit、$sort相邻出现,那么mongodb将会使用“TOP N”计算方式来处理数据,如果N较小这些数据可以在内存中排序完成,如果N很大(> 100M),则需要指定“allowDiskUse”为true,否则会发生错误。

    5、$unwind

    将指定的数组结构拆解成多条document,其中指定的Field必须是数组。3.2+版本之后,Field可以不是数组,此时将会把它当做只有一个item的数组处理。比如文档:

{"_id" : 1,"item":"ABC1","sizes":["S","M","L"]}

    执行$unwind:"$sizes"时输出结果为:

{ "_id" : 1, "item" : "ABC1", "sizes" : "S" }
{ "_id" : 1, "item" : "ABC1", "sizes" : "M" }
{ "_id" : 1, "item" : "ABC1", "sizes" : "L" }

    6、$out

    必须为pipeline最后一个stage,将计算结果写入到指定的collection中,格式:$out : <collection>,不支持sharded collection,也不支持capped collection。如果指定的collection不存在,则直接创建。如果此collection已经存在,那么结果数据将原子性的替换旧的collection,即在结果数据写入时,不会改变原collection数据和索引,当数据写入完毕后原子性替换,,如果在写入出错时,原collection的数据不会有任何改变。

    7、$group

    分组,语义同SQL中的“group by”,语法:$group:{_id:<group key>,<field>:{<accumulator>:<expression>},...}。“_id”字段是必须的,其值是group操作的key,可以为null或者计算表达式;可以增加其他自定义的字段,字段值可以为group支持的计算表达式(Expressions),$group首先将数据根据key进行分组,然后再对每个组执行如下的表达式计算:

    1)$sum:对每个group指定字段值进行累加计算。忽略非数字的值。

$group:{_id:"$productId","totalAmount":{$sum : "$quantity"}}
##在order表中,统计每个商品的销售总数量

    2)$avg:对每个group进行“平均值”,忽略非数字的值。

$group:{_id:"$categoryId","avgPrice":{$avg : "$price"}}

    3)$first:返回每个group的第一条数据,顺序有$sort决定,如果没有排序,默认为文档的自然存储顺序。

$sort:{categoryId : 1,price : 1},
$group:{_id:"categoryId","cheapest":{$first:"$price"}}

    4)$last:返回每个group的最后一条数据,顺序有$sort决定,如果没有排序,默认为文档的自然存储顺序。

    5)$max、$min:获取每个group中最大、最小值。

$sort:{categoryId:1}
$group:{_id:"categoryId","maxPrice":{$max:"$price"}}
##获取每个品类下,价格最高的商品

    6)$push:将指定的表达式的值添加到一个数组中,注意最终返回的document尺寸不要超过16M。

$group:{_id:"author","bookIds":{$push:"$bookId"}}
##获取每个作者的图书ID的列表

    7)$addToSet:将表达式的值添加到一个集合中(无重复值),注意最终分那会的document尺寸不要超过16M。

$group:{_id:"categoryId","shopIds":{$addToSet:"$shopId"}}
##获取每个品类下,不同商铺的ID

    上述7个操作符只能在$group中使用,$group通常用于获取某一维度不同的值、计算总和、分类数据,是aggregation操作中最重要的stage;如果将group的功能转换成mapreduce可能需要书写大量的代码。$group的key可以为null或者一个常量值,用于计算全局的数据,比如统计所有“商品的库存总价值”:

$group:null,"total":{$sum:{$multiply:["$price","$quantity"]}}

    8)$geoNear:用于地理位置数据分析,参见【geoNear

    9)$sample:

    3.2+版本中新增,类似于大数据中的“数据采样”方式,从input中随机选择N条documents,语法“{$sample: {size: <N>}}”。如果N大于collection中总数据的5%,那么$sample将会执行collection扫描、sort,然后选择top N条文档;如果N小于5%,对于wiredTiger而言则会遍历collection并使用“伪随机”的方式选取N条文档,对于MMAPv1引擎则在_id索引上随机选取N条文档。(本文没有测试$sample)

    10)$lookup

    3.2+版本新增,“left outer join”功能;从事过大数据开发的人都知道,如果仅仅基于mapreducer计算模式,实现跨文件“join”还是一件比较复杂的事情;mongodb提供了$lookup以支持简单的join,以当前collection作为主表(左端),将另一个non-sharded collection作为辅表参与join。语法:

{
    $lookup:
    {
        from: <参与join的辅表>,
        localField: <参与join匹配的本表字段>
        foreignField: <参与join的辅表字段>
        as: <将辅表数据输出到此字段中>
    }
}
 

    其中from表示一个参与join的辅表,这个表不能是sharding collection;localField和foreignField就是参与join的key,如果documents不存在上述field,那么将以null参与匹配;“as”指定一个输出字段,将辅表匹配成功的documents添加到此“字段”中,作为结果输出。

##比如主表为orders,数据结构为:
{_id:1,"product_id":1000,"price":30.0}
##辅表为products,数据结果为:
{_id:1,"id":1000,"name":"mongodb reference"}
 
{
    $lookup: {
        from:"products",
        localField:"product_id",
        foreignField:"id",
        as:"products"
    }
}
 

    执行结果可能类似于:

{
    _id:1,
    "product_id":1000,
    "price":32.0,
    "products":[
        {_id:1,id:1000,"name":"mongodb reference"}
    ]
}
 

三、Expressions(表达式)

    在$group中我们已经了解到了几个expressions,此外aggregation还支持多种其他类型的表达式,它们为计算过程提供了很大的便利性。$match只能使用query支持的表达式,$group中特定的几个表达式只能在$group中使用,对于其他的stage均可以使用下文中介绍的表达式。【参见表达式

    表达式中可以使用“field path”(字段路径)来访问输入文档的字段值,字段路径有“$”和字段名称构成,即“$<field>”,比如:$userid表示访问useid字段值,支持内嵌文档的访问,比如$user.name。有一个特殊的字段路径,即$$ROOT或者$$CURRENT,表示当前文档,$$ROOT.userid其实和$userid一样,我们通常在使用字段路径时忽略$$ROOT。$$ROOT、$$CURRENT和上文提到的$$DESCEND、$$PRUNE、$$KEEP一样,同属系统变量。不过通常我们还需要在aggregate中声明自定义的变量。

    1、自定义变量

    1)$let

    指定临时变量,并返回变量计算的结果。$let包含2个字段:vars和in,其中vars用该声明临时变量,in用来计算,其中vars中的变量只能在in中使用,其生命周期也只在当前$let中有效;为了避免与“字段路径”的访问模式混淆,在in中访问vars中的变量需要使用$$前缀。示例:

$project:{
    totalAmount :{
        $let {
            vars : {
                total : {$multiply : {$price,$quantity}},
            in: {$subtract : {'$$total','$discounted'}}
        }
    }
}

    在$let中声明一个变量total,值为价格与数量乘积,最终totalAmount的值为“总价 - 折扣”。

    2)$map

    对array数组元素进行迭代,并返回一个新的数组。类似一个foreach过程,语法结构如下:

$map : {input:<expression>,as:<item>,in:<expression>}

    它有三个字段,input表示需要计算的数组,as表示当前数组元素的别名,in可以使用当前元素的别名进行计算;示例如下:

##原始数据
{_id : 1, "numbers" : [2,5,8]}
{_id : 2, "numbers" : [0,1,3]}
##aggregation
$project : {
    "numbers" : {
        $map : {input : "$numbers",as:"item",in:{$add : {'$item',2}}}
    }
}
##result
{_id : 1, "numbers" : [4,7,9]}
{_id : 2, "numbers" : [2,3,5]}

    2、$literal(字面值)

    或许大家都比较熟悉这个概念,即$literal的值仅作字符串处理,原样输出,不进行表达式解析和计算,这类似于XML结构中的CDATA。比如:

$literal : {$add: [2,3]}    最终输出"$add:[2,3]"
$literal : "$name"            最终输出"$name"
##----示例----
##原始文档
{_id:1,"product":"book1","price":"$10"}
{_id:1,"product":"book2","price":"45.01"}

##aggregation
{$project:
    {costByDollar:{
        $eq : [
           {$literal : "$"},{$substr:["$price",0,1]}
        ]
    }
}
##输出结果
{_id:1,"costByDollar":true}
{_id:1,"costByDollar":false}

    3、Boolean表达式

    表达式返回true或者false,目前支持三种操作$and、$or、$not;语义基本与query中相同操作符一致。具体参见【Boolean

    4、比较表达式(Comparison)

    语义基本同query,只是语法不同,它们需要返回true、false,比如:$eq、$gt等等。参见【Comparison

    5、集合操作(Set)

    对于多个Arrays进行比较操作:

    1)$setEquals:如果输入的两个或者多个Arrays中都持有相同的Set(不同值的列表)则返回true,否则返回false。

    2)$setIntersection:返回输入的多个Arrays的交集。

    3)$setUnion:返回并集。

    5)$setDifference:返回差集,即在第一个Set中存在,但在第二个Set中不存在的数组元素。只接收2个数组参数。

    6)$setIsSubset:如果第一个Set为第二个Set的子集(包括完全相同)则返回true,否则返回false。

    7)$anyElementsTrue:如果此Set中有任何一个值为true,则返回true,否则返回false。只接收一个Array参数。

    8)$allElementsTrue:只有当所有的元素都为true时,才返回true。只接收一个Array参数。

    

    6、数学计算

    对数字类型的字段值进行计算,比如$add(+),$subtract(-),$multiply(*),$divide(/),$mod(%),3.2版本新增计算表达式;语法都一样,例如:

$project : {
    totalAmount : {$multiply: {"$price","$qunatity"}}
}

    7、字符串操作

    对字符串进行操作,非常简便有用的方法:

    1)$concat:拼接字符串,此操作可以接收多个参数列表。

$concat:["$lastname",",","$firstname"]

    2)$substr:截取字符串,语法:$substr : [<input expression>,<start>,<length>],其中start默认起始为0。

    3)$toLower、$toUper:大小写转换

    4)$strcasecmp:不分大小写,进行字符串比较,如果2个字符串一样,则返回0;如果第一个字符串比第二个“大”(字典顺序),则返回1,否则返回false。

    8、$size:计算数组的元素个数,语法:$size : <array expression>;3.2版本新增数组表达式

    9、日期计算

    对Date或者timestamp进行计算,比如获取时间的月份、时间戳等等。简单列举如下:

    $dayOfYear:当前日期是此年的第几天,0~366(闰年);$dayOfMonth:当前日期是本月的第几天,0~31;$dayOfWeek:当前日期是本周的第几天,1~7,1表示周日,7表示周六;$year:返回当前日期的年份,比如2014。$month:返回当前日期的月份。$week:返回当前日期为当年的第几周,0~53周。$hour、$minute、$second、$millisecond表示获取当前日期的时、分、秒、毫秒部分。

    $dateToString:日期的格式转换,这个比较常用,语法格式:$dataToString : {format:<format>,date:"$date"}。格式占位符如下:

    %Y:年份,4个数字,比如2014;%m:月份,2个数字,01~12;%d:日期,2个数字,01~31;%H:小时,2个数字,00~23;%M:分钟,00~59;%S:秒,00~59;%L:毫秒,000~999;%j:当前day为年的第几天,001~366;%U:当前日期所在年的第几周,00~53;%w:day是当前周的第几天,1~7。

$project : {
   "dateAsString":{$dateToString:{format:"%Y-%m-%d %H:%M:%S",date:"$date"}}
}

    10、条件表达式

    1)$cond

    上文中我们已经看到相关示例,它的作用等同于一个“三元表达式”,语法:$cond:{if : <boolean-expression>, then : <true-expression>,else : <false-expression> }。

    2)$ifNull

    判定指定表达式是否为null,比如字段不存在(undefined)、值为null。语法:$ifNull: [<expression>,<return-value expression>],如果expression的值为null则返回“return-value”。

四、示例

    通过上文,我们基本了解了aggretation的原理和使用技巧,对于复杂的数据计算通常需要多次计算才能产出结果。此外还需要强调一点,良好的数据结构和数据存储模型,对数据分析和统计具有非常大的帮助,所以我们希望大家在数据分析之前,和appliation约定数据的模型,如果数据模型相差甚远,那么需要进行一次(或者增量的)清洗数据,并转存到特定的数据仓库中(比如mongodb,hive等)。

    因为aggregate和mapreduce每次任务只能操作一个collection,数据数据来自多个collection,那么就需要多次分析,将分析结果统一写入一个result collection中,然后在对result进行最终的计算。

    数据校对是个非常棘手的问题,通常可以采用“取样法”来测试,比如限定一定范围(或者随机)对整体数据取样1/10,然后对这个小数据集先运行数据统计脚本,查看结果是否有异动,对于异常情况适度调整算法。

    接下来我们使用aggregate操作完成如下数据统计指标:

    1)横向维度:统计当前月份之前的6个月新用户在当月的留存;比如现在为9月份,统计出3~8月份注册的用户,在9月份的留存情况。

    2)纵向维度:分平台,分别统计“android”、“ios”、“pc”三个平台在横向维度的数据。

    期望数据结果:

-------------------------------------------------------------
|  平台\月份  |   1   |   2  |   3   |   4   |   5   |   6   |
-------------------------------------------------------------
|  android   |  100  |  80  |  100  |   150 |  160  |  200  |
-------------------------------------------------------------
|    ios     |  100  |  80  |  100  |   150 |  160  |  200  |
-------------------------------------------------------------
|    pc      |  100  |  80  |  100  |   150 |  160  |  200  |
-------------------------------------------------------------

    我们假定有个collection为user,当用户注册时新建一条记录,created表示用户注册的时间,modfied表示用户最后登录的时间;userid上创建唯一索引,created、modified创建了组合索引,原始数据格式:

{_id : 1,userid:"10010","platform":"android","created":{"$date":1448805204482},"modified":{"$date":1448805204482}

    aggregate语句:

db.user.aggregate([
	{ "$match" : {
		 "created" : { 
		 	"$gte" : { "$date" : 1430379347719 }, "$lt" : { "$date" : 1446307200000 } }, 
		 "modified" : { "$gte" : { "$date" : 1446190547719 } } 
		 }
	 },
	{ "$group" : { 
		"_id" : { "month" : { "$month" : "$created" }, 
		"platform" : "$platform" }, 
		"total" : { "$sum" : 1 }
		}
	 },
	{ "$project" : { 
		"platform" : "$_id.platform", 
		"total" : 1, 
		"month" : "$_id.month",
		 "_id" : 0 
		}
	 },
	{ "$sort" : { "platform" : 1, "month" : 1 } },
	{ "$group" : { 
		"_id" : "$platform", 
		"result" : { "$push" : { "month" : "$month", "total" : "$total" } } 
		}
	 },
	{ "$match" : { 
		"created" : { "$gte" : { "$date" : 1430379347719 }, "$lt" : { "$date" : 1446307200000 } }, 
		"modified" : { "$gte" : { "$date" : 1446190547719 } } 
		} 
	},
	{ "$group" : { 
		"_id" : { "month" : { "$month" : "$created" }, "platform" : "$platform" }, 
		"total" : { "$sum" : 1 } 
		} 
	},
	{ "$project" : { 
		"platform" : "$_id.platform", 
		"total" : 1, 
		"month" : "$_id.month", 
		"_id" : 0 
		} 
	},
	{ "$sort" : { "platform" : 1, "month" : 1 } }
	]
}

  

    JAVA程序实例:

MongoCollection<Document> collection = db.getCollection("user");
Date now = new Date();//11月30日
List<Document> pipeline = new ArrayList<Document>();

Document filter = new Document();
//过滤:
//1)注册时间在6各月之前,不包括当月;
//2)最后登陆时间在当月
filter.put("created",
        new Document("$gte",DateUtils.addMonths(now, -7)).append("$lt", DateUtils.truncate(now, Calendar.MONTH)));
filter.put("modified",new Document("$gte",DateUtils.addMonths(now, -1)));

Document match = new Document("$match",filter);
pipeline.add(match);

Document group = new Document();
//第一次group,将“月份”和“平台”作为复合key
group.put("$group", new Document("_id", new Document("month", new Document("$month", "$created")).append("platform", "$platform"))
        .append("total", new Document("$sum", 1)));
pipeline.add(group);

//重命名字段名
Document project = new Document("$project",new Document("platform","$_id.platform")
        .append("total",1).append("month","$_id.month").append("_id",0));

pipeline.add(project);

//按照平台和月份排序,确保,同一个平台的数据,是按照月份排序的,因为接下来要使用$push
Document sort = new Document("$sort",new Document("platform",1).append("month", 1));
pipeline.add(sort);

//第二次group,将上述输出数据,转换成根据平台为维度的inline方式。便于页面展示。
Document finalGroup = new Document("$group",new Document("_id","$platform")
        .append("result", new Document("$push", new Document("month", "$month").append("total", "$total"))));
pipeline.add(finalGroup);

//pipeline.add(new Document("$out","_" + DateFormatUtils.format(now,"yyyyMMdd")));

MongoCursor<Document> cursor = collection.aggregate(pipeline).allowDiskUse(true).iterator();
while (cursor.hasNext()) {
    Document item = cursor.next();
    System.out.println(item.toJson());
}
cursor.close();

    输出结果示例:

{
    "_id": "pc",
    "result": [
        {
            "month": 5,
            "total": 2736
        },
        {
            "month": 6,
            "total": 2877
        },
        {
            "month": 7,
            "total": 2718
        },
        {
            "month": 8,
            "total": 2791
        },
        {
            "month": 9,
            "total": 2887
        },
        {
            "month": 10,
            "total": 2800
        }
    ]
}
{
    "_id": "android",
    "result": [
        {
            "month": 5,
            "total": 2742
        },
        {
            "month": 6,
            "total": 2850
        },
        {
            "month": 7,
            "total": 2782
        },
        {
            "month": 8,
            "total": 2752
        },
        {
            "month": 9,
            "total": 2740
        },
        {
            "month": 10,
            "total": 2941
        }
    ]
}

    非常有趣的是,aggreate可以支持2个group、甚至更多个group操作,这大大减少了数据分析的开发难度,如果使用mapreduce可能需要开发多个脚本且运行多次。在技术日新月异的今天,基于大数据平台的数据计算技术不断涌现,出现大量了“SQL-LIKE Engineer”,它们可以将SQL语句转换成适用于NOSQL平台的操作,比如将SQL语句转换成Mongodb的aggreation或者mapreduce(内部或将数据转存到hadoop上进行mapreduce),这样又进一步的提升了开发效率,我们稍后介绍Apache Drill:mongodb平台上的SQL-Like引擎。

相关推荐