Mongodb中Aggregation特性
Mongodb是目前最受欢迎的大数据存储平台之一,它可以作为云计算技术的底层存储层,比如为spark、hadoop、pig、hive、drill等计算框架提供源数据。Mongodb本身也提供了aggregation、mapreduce特性,以支持对大数据的计算、统计、分类等需求。
Aggregation简单来说,就是提供数据统计、分析、分类的方法,这与mapreduce有异曲同工之处,只不过mongodb做了更多的封装与优化,让数据操作更加便捷和易用。Aggregation操作,接收指定collection的数据集,通过计算后返回result数据;一个aggregation操作,从input源数据到output结果数据,中间会依次经过多个stages,整体而言就是一个pipeline;目前有10种stages,我们稍后介绍;它还提供了丰富的Expression(表达式)来辅助计算。我们展示一个例子:
SQL语义:
select zipcode, count(*) AS total from <collection> where status = 0 group by zipcode order by total
原始数据结构:
{ "zipcode" : 1000000, "amount" : 5, "created" : { "$date" : 1448445890763 }, "status" : 1 }
Aggregation查询语句:
db.<collection>.aggregate([ {"$match" : { "status" : 0 }}, {"$group" : { "_id" : "$zipcode", "total" : { "$sum" : "$amount" }}}, {"$sort":{total : 1}}, {"$out":"_group_"}])
JAVA代码示例:
List<Document> pipeline = new ArrayList<Document>(); Document match = new Document("$match",new Document("status",0)); pipeline.add(match);//match stage Document group = new Document(); group.put("$group", new Document("_id", "$zipcode").append("total", new Document("$sum", "$amount"))); pipeline.add(group); Document sort = new Document("$sort",new Document("total",1)); pipeline.add(sort); String output = "_group_"; Document out = new Document("$out",output); pipeline.add(out); source.aggregate(pipeline) .allowDiskUse(true); MongoCollection<Document> result = db.getCollection(output); MongoCursor<Document> cursor = result.find().batchSize(100).iterator(); while (cursor.hasNext()) { Document item = cursor.next(); System.out.println(item.toJson()); } cursor.close();
此aggregation操作最终的结果输出到了“_group_”临时表中,我们可以查看它的结果数据大致如下:
{ "_id" : 1000005, "total" : 43425 } { "_id" : 1000008, "total" : 43809 } { "_id" : 1000003, "total" : 44716 } { "_id" : 1000000, "total" : 45130 } { "_id" : 1000002, "total" : 45205 } ...
mongodb已经提供了三个“单一用途的”aggregation操作:
1)count:计算符合query的documents的总个数,语义同“select count(*) from <table> where <query>”,mongodb的语法为:
db.<collection>.count({"status":0})
2)distinct:获取符合query中指定filed的不同值的列表,语义同“select distinct(<field>) from <table> where <query>”。
db.<collection>.distinct("zipcode",{"status" : 0})
3)group:分组,注意它和aggregation中的“$group”有些类似,但不是一个操作,因为group无法在sharding环境中使用,所以mongodb建议使用“$group”进行替代,此处不再详解,请参见【group操作】
一、特性
1、Aggregation有几个核心的特性:
1)支持多种stages
2)可以将计算结果保存在collection中,在sharding环境中仍然适用,而且在output之前可以对结果数据进行“修剪”;当然可以将结果数据保存在内存(inline)并返回cursor,便于客户端访问结果数据。
3)同一种stage可以出现多次 ,这相对于mapreduce而言是个巨大的进步。(这种情况通常需要多个mapreduce才能完成)
4)stages中可以使用内置的多种计算操作、表达式,几乎可以覆盖绝大部分数据统计、分析的需求,所以用aggregation开发日常数据处理任务,是非常简单的事情。当然相对于mapreduce这种更加开放的计算模型(基于javascript语法和解析引擎),aggregation在计算复杂的数据任务时还不够灵活,但是书写mapreduce脚本也是一件复杂的事情。
2、局限性
无论如何,aggregation返回的计算结果,每条document的大小不得超过16M(mongodb中document的最大尺寸,此参数不可修改),在下文中我们会了解到$push、$addToSet等计算操作,有可能会导致此问题的发生。在一些计算方式中,可能最终结果只产出一条document,那么就更需要注意此问题。通常情况下,aggregation可以output任意多条documents,结果数据的总size是没有限制的,但是结果不能输出到sharding collection中,这与mapreduce是有区别的。
如果你从事过大数据计算、数据统计等相关工作,应该知道每个计算任务(job或task)都会使用独立的有限大小的内存空间,mongodb没有提供复杂的内存分配模型(任务调度算法),只是简单的限定每个stage最多使用100M内存,如果超过此值将终止计算并返回error;为了支持较大数据集合的处理,我们可以指定“allowDiskUse”参数将“溢出”的数据写入本地的临时文件中(临时的collection),这个参数我们通常需要设定为true。(参见上述示例)
3、优化
aggregation操作,简单而言就是将整个collection的数据全部input到pipeline中,经过多个stages计算之后,输出少量数据的过程。由mongodb引擎的特性决定,读取整个collection数据对磁盘和内存消耗都是非常巨大的,即使是在sharding环境中;那么尽可能的减少input数据量可以有效的提升处理的效率,对于有sort操作的,就需要更加谨慎,因为排序更加消耗内存和CPU。那么提高效率的手段之一,就是尽可能的使用索引(indexes),让$match和$sort尽可能的出现在pipeline的开端,$match使用索引可以更加高效的过滤input数据集进而不需要全表扫描,$sort使用索引可以避免额外的排序(因为索引本身就是有序的),索引只会在访问原始document时有效,如果是pipeline的中间数据,比如$group之后的$sort将无法使用到索引;当然索引的建立,是依据application中数据的访问模式而定,我们通常不会根据aggregation的需要而增加索引。
如果只需要对部分数据进行计算,那么可以将$match、$limit、$skip放置在pipeline的开端,那么handler只需要读取有效的、较少数量的documents;不恰当的做法,是将$limit、$skip放置在pipeline的末端,这意味着可能读取、处理更多的数据,产生更多的结果集,这意味着对性能的浪费。
4、sharded collection
上文已经提到sharding collection不支持group方法(即db.<collection>.group(),而非pipeline中的$group);但是它支持aggregation pipeline和mapreduce操作。当aggregation操作sharded collection时,pipeline将会被分成2部分:源数据过滤(filters),和后期处理(prost-processing);第一部分主要就是从collection中读取数据,或者根据索引过滤数据,位于pipeline开端的$match、$limit将会包含在内,mongos将会把aggregation操作发给特定的shards,如果$match中不包含sharding key,那么将会把操作发给所有的shards,并在每个shard上执行aggregation的第一部分的stages,此部分最大的特点就是不对数据进行计算,仅仅筛选并创建cursor;第二部分是后期处理部分,它将在primary shard上运行(概念参见【primary shard】),通常为$group、$unwind、$out、$project,其他stages也可以在第二部分,priamry合并其他shards上的cursor(迭代读取数据),读取数据并执行aggregation的第二部分的操作,然后将最终执行结果返回给mongos。(旧版中第二部分pipeline在mongos上执行);由此可见,尽可能早的过滤数据,可以有效降低primary的负载压力。我们可以在aggregate方法中指定“explain”参数,查看各个阶段执行的计划,其中cursor字段即为第一部分,其他的为第二部分,我们也能看出cursor分布在shards的情况。
db.orders.aggregate( [ { $match: { status: "A" } }, { $group: { _id: "$cust_id", total: { $sum: "$amount" } } }, { $sort: { total: -1 } } ], { explain: true } )
其中limit、skip在各个shards上的分发原理,请参见【mongos部分】
3.2版本调整:mongodb为了提升sharding集群的整体性能,除了$out和$lookup需要在primary shard上运行外,其他的stages均可以并行的在符合匹配规则的shards执行,然后将各自的results数据转发给其中某个shard做最后的merge,这种方式有效了减轻了primary shard的负载压力;简单而言,3.2的pipleline处理模式基本没有改变,只是“第二部分”工作将从primary shard上转义到其中某个shard上。
二、Stages
Pipeline是有多个stages组成,每个stage提供不同的数据操作,stages在声明时是数组结构,具有有序性,当前stage的计算结果将作为下一个stage的输入,除$out之外,每种stages均可以在pipeline中出现多次,aggregate支持如下几种stages:
1、$project
与mongodb查询语句中的“projection”功能类似,对输入的document字段进行“取舍”,指定哪些字段将会包含(include)或者不包含(exclude)在输出中。
$project:{_id:0,name:{$concat:["$last", "," ,"$first"},"userid":1}
默认情况下“_id”字段一定被包含,除非显式指定“_id : 0”才能排除掉;“0”或者false表示“不包含”,“1”或者true表示包含,如果值是一个expression,则表示新增或者重置字段的值。比如上述文档,表示_id字段排除掉,“userid”字段包含,增加一个新字段“name”,其值为“last”和“first”两个字段值的拼接。
当需要增加一个字段,它的值确实是1、0、true或者false时,这就与$project语义有些冲突,为了确保值原样输出,那么就需要告知$project不要解析它们,这需要使用$literal操作符:
$project:{"sum":{$literal : 1}}
和document访问模式一样,$project也支持通过“.”路径访问内嵌文档:
$project:{_id:0,"friends.name":1}或者 $project:{_id:0,"firends":{"name":1}}
3.2+版本之后,$avg、$max、$min、$sum这四个“聚合函数”(原来只能在$group中使用)也可以在$project中使用了。
2、$match
根据query条件筛选文件,符合条件的文件将会传递给下一个stage。$match的语法和mongodb query语法一样,且$match中不能使用aggregate expression或者比较操作,只能使用query操作允许的操作符(参见Query操作)。$match用于筛选数据,为了提高后续的数据处理效率,尽可能的将$match放于pipeline的前端以减少数据读取或者计算量;当然$match也可以放在$out之前用于控制数据输出量。如果$match放于pipeline的开始,还可以使用到indexes机制提高数据查询的性能。
$match:{"status":1}
3、$redact
根据字段所处的document结构的级别,对文档进行“修剪”,它通常和“判断语句if-else”结合使用即“$cond”。$redact可选值有3个:
1)$$DESCEND:包含当前document级别的所有fields。当前级别字段的内嵌文档将会被继续检测。
2)$$PRUNE:不包含当前文档或者内嵌文档级别的所有字段,不会继续检测此级别的其他字段,即使这些字段的内嵌文档持有相同的访问级别。
3)$$KEEP:包含当前文档或内嵌文档级别的所有字段,不再继续检测此级别的其他字段,即使这些字段的内嵌文档中持有不同的访问级别。
{ _id: 1, tags: [ "G", "STLW" ], year: 2014, subsections: [ { subtitle: "Section 1", tags: [ "SI", "G" ], }, { subtitle: "Section 2", tags: [ "STLW" ], }, { subtitle: "Section 3", tags: [ "TK" ], content: { tags: [ "HCS" ] } } ] }
那么对于语句:
$redact: {$cond: { if: { $gt: [ { $size: { $setIntersection: [ "$tags", ["STLW","G"] ] } }, 0 ] }, then: "$$DESCEND", else: "$$PRUNE" } }
$setIntersection表示将2个数组的交集中不同元素的个数,$cond就是一个三元表达式,此例中表示“如果交集元素的个数大于0,则值为为$$DESCEND,否则为$$PRUNE”。对于此文档(ROOT级别)的最高级别的tags值为["G","STLW"],此级别值为$$DESCEND,即此tage同级别的其他字段将会包含;那么继续检测“subsections.tags”级别的所有文档(是个数组,则逐个检测),基本思路类似,如果此级别返回$$DECEND那么继续检测“subsections.tags.content.tags”是否符合访问规则,如果返回$$PRUNE,那么此tags所在的内嵌文档的所有字段将被排除,即使与此tags同级别的contents.tags符合访问规则。最终输出结果:
{ "_id" : 1, "tags" : [ "G", "STLW" ], "year" : 2014, "subsections" : [ { "subtitle" : "Section 1", "tags" : [ "SI", "G" ], "content" : "Section 1" }, { "subtitle" : "Section 2: Analysis", "tags" : [ "STLW" ], "content" : "Section 2" } ] }
4、$limit、$skip、$sort
同Query中的limit、skip、$sort限定符。如果$limit、$sort相邻出现,那么mongodb将会使用“TOP N”计算方式来处理数据,如果N较小这些数据可以在内存中排序完成,如果N很大(> 100M),则需要指定“allowDiskUse”为true,否则会发生错误。
5、$unwind
将指定的数组结构拆解成多条document,其中指定的Field必须是数组。3.2+版本之后,Field可以不是数组,此时将会把它当做只有一个item的数组处理。比如文档:
{"_id" : 1,"item":"ABC1","sizes":["S","M","L"]}
执行$unwind:"$sizes"时输出结果为:
{ "_id" : 1, "item" : "ABC1", "sizes" : "S" } { "_id" : 1, "item" : "ABC1", "sizes" : "M" } { "_id" : 1, "item" : "ABC1", "sizes" : "L" }
6、$out
必须为pipeline最后一个stage,将计算结果写入到指定的collection中,格式:$out : <collection>,不支持sharded collection,也不支持capped collection。如果指定的collection不存在,则直接创建。如果此collection已经存在,那么结果数据将原子性的替换旧的collection,即在结果数据写入时,不会改变原collection数据和索引,当数据写入完毕后原子性替换,,如果在写入出错时,原collection的数据不会有任何改变。
7、$group
分组,语义同SQL中的“group by”,语法:$group:{_id:<group key>,<field>:{<accumulator>:<expression>},...}。“_id”字段是必须的,其值是group操作的key,可以为null或者计算表达式;可以增加其他自定义的字段,字段值可以为group支持的计算表达式(Expressions),$group首先将数据根据key进行分组,然后再对每个组执行如下的表达式计算:
1)$sum:对每个group指定字段值进行累加计算。忽略非数字的值。
$group:{_id:"$productId","totalAmount":{$sum : "$quantity"}} ##在order表中,统计每个商品的销售总数量
2)$avg:对每个group进行“平均值”,忽略非数字的值。
$group:{_id:"$categoryId","avgPrice":{$avg : "$price"}}
3)$first:返回每个group的第一条数据,顺序有$sort决定,如果没有排序,默认为文档的自然存储顺序。
$sort:{categoryId : 1,price : 1}, $group:{_id:"categoryId","cheapest":{$first:"$price"}}
4)$last:返回每个group的最后一条数据,顺序有$sort决定,如果没有排序,默认为文档的自然存储顺序。
5)$max、$min:获取每个group中最大、最小值。
$sort:{categoryId:1} $group:{_id:"categoryId","maxPrice":{$max:"$price"}} ##获取每个品类下,价格最高的商品
6)$push:将指定的表达式的值添加到一个数组中,注意最终返回的document尺寸不要超过16M。
$group:{_id:"author","bookIds":{$push:"$bookId"}} ##获取每个作者的图书ID的列表
7)$addToSet:将表达式的值添加到一个集合中(无重复值),注意最终分那会的document尺寸不要超过16M。
$group:{_id:"categoryId","shopIds":{$addToSet:"$shopId"}} ##获取每个品类下,不同商铺的ID
上述7个操作符只能在$group中使用,$group通常用于获取某一维度不同的值、计算总和、分类数据,是aggregation操作中最重要的stage;如果将group的功能转换成mapreduce可能需要书写大量的代码。$group的key可以为null或者一个常量值,用于计算全局的数据,比如统计所有“商品的库存总价值”:
$group:null,"total":{$sum:{$multiply:["$price","$quantity"]}}
8)$geoNear:用于地理位置数据分析,参见【geoNear】
9)$sample:
3.2+版本中新增,类似于大数据中的“数据采样”方式,从input中随机选择N条documents,语法“{$sample: {size: <N>}}”。如果N大于collection中总数据的5%,那么$sample将会执行collection扫描、sort,然后选择top N条文档;如果N小于5%,对于wiredTiger而言则会遍历collection并使用“伪随机”的方式选取N条文档,对于MMAPv1引擎则在_id索引上随机选取N条文档。(本文没有测试$sample)
10)$lookup
3.2+版本新增,“left outer join”功能;从事过大数据开发的人都知道,如果仅仅基于mapreducer计算模式,实现跨文件“join”还是一件比较复杂的事情;mongodb提供了$lookup以支持简单的join,以当前collection作为主表(左端),将另一个non-sharded collection作为辅表参与join。语法:
{ $lookup: { from: <参与join的辅表>, localField: <参与join匹配的本表字段> foreignField: <参与join的辅表字段> as: <将辅表数据输出到此字段中> } }
其中from表示一个参与join的辅表,这个表不能是sharding collection;localField和foreignField就是参与join的key,如果documents不存在上述field,那么将以null参与匹配;“as”指定一个输出字段,将辅表匹配成功的documents添加到此“字段”中,作为结果输出。
##比如主表为orders,数据结构为: {_id:1,"product_id":1000,"price":30.0} ##辅表为products,数据结果为: {_id:1,"id":1000,"name":"mongodb reference"}
{ $lookup: { from:"products", localField:"product_id", foreignField:"id", as:"products" } }
执行结果可能类似于:
{ _id:1, "product_id":1000, "price":32.0, "products":[ {_id:1,id:1000,"name":"mongodb reference"} ] }
三、Expressions(表达式)
在$group中我们已经了解到了几个expressions,此外aggregation还支持多种其他类型的表达式,它们为计算过程提供了很大的便利性。$match只能使用query支持的表达式,$group中特定的几个表达式只能在$group中使用,对于其他的stage均可以使用下文中介绍的表达式。【参见表达式】
表达式中可以使用“field path”(字段路径)来访问输入文档的字段值,字段路径有“$”和字段名称构成,即“$<field>”,比如:$userid表示访问useid字段值,支持内嵌文档的访问,比如$user.name。有一个特殊的字段路径,即$$ROOT或者$$CURRENT,表示当前文档,$$ROOT.userid其实和$userid一样,我们通常在使用字段路径时忽略$$ROOT。$$ROOT、$$CURRENT和上文提到的$$DESCEND、$$PRUNE、$$KEEP一样,同属系统变量。不过通常我们还需要在aggregate中声明自定义的变量。
1、自定义变量
1)$let
指定临时变量,并返回变量计算的结果。$let包含2个字段:vars和in,其中vars用该声明临时变量,in用来计算,其中vars中的变量只能在in中使用,其生命周期也只在当前$let中有效;为了避免与“字段路径”的访问模式混淆,在in中访问vars中的变量需要使用$$前缀。示例:
$project:{ totalAmount :{ $let { vars : { total : {$multiply : {$price,$quantity}}, in: {$subtract : {'$$total','$discounted'}} } } }
在$let中声明一个变量total,值为价格与数量乘积,最终totalAmount的值为“总价 - 折扣”。
2)$map
对array数组元素进行迭代,并返回一个新的数组。类似一个foreach过程,语法结构如下:
$map : {input:<expression>,as:<item>,in:<expression>}
它有三个字段,input表示需要计算的数组,as表示当前数组元素的别名,in可以使用当前元素的别名进行计算;示例如下:
##原始数据 {_id : 1, "numbers" : [2,5,8]} {_id : 2, "numbers" : [0,1,3]} ##aggregation $project : { "numbers" : { $map : {input : "$numbers",as:"item",in:{$add : {'$item',2}}} } } ##result {_id : 1, "numbers" : [4,7,9]} {_id : 2, "numbers" : [2,3,5]}
2、$literal(字面值)
或许大家都比较熟悉这个概念,即$literal的值仅作字符串处理,原样输出,不进行表达式解析和计算,这类似于XML结构中的CDATA。比如:
$literal : {$add: [2,3]} 最终输出"$add:[2,3]" $literal : "$name" 最终输出"$name" ##----示例---- ##原始文档 {_id:1,"product":"book1","price":"$10"} {_id:1,"product":"book2","price":"45.01"} ##aggregation {$project: {costByDollar:{ $eq : [ {$literal : "$"},{$substr:["$price",0,1]} ] } } ##输出结果 {_id:1,"costByDollar":true} {_id:1,"costByDollar":false}
3、Boolean表达式
表达式返回true或者false,目前支持三种操作$and、$or、$not;语义基本与query中相同操作符一致。具体参见【Boolean】
4、比较表达式(Comparison)
语义基本同query,只是语法不同,它们需要返回true、false,比如:$eq、$gt等等。参见【Comparison】
5、集合操作(Set)
对于多个Arrays进行比较操作:
1)$setEquals:如果输入的两个或者多个Arrays中都持有相同的Set(不同值的列表)则返回true,否则返回false。
2)$setIntersection:返回输入的多个Arrays的交集。
3)$setUnion:返回并集。
5)$setDifference:返回差集,即在第一个Set中存在,但在第二个Set中不存在的数组元素。只接收2个数组参数。
6)$setIsSubset:如果第一个Set为第二个Set的子集(包括完全相同)则返回true,否则返回false。
7)$anyElementsTrue:如果此Set中有任何一个值为true,则返回true,否则返回false。只接收一个Array参数。
8)$allElementsTrue:只有当所有的元素都为true时,才返回true。只接收一个Array参数。
6、数学计算
对数字类型的字段值进行计算,比如$add(+),$subtract(-),$multiply(*),$divide(/),$mod(%),3.2版本新增计算表达式;语法都一样,例如:
$project : { totalAmount : {$multiply: {"$price","$qunatity"}} }
7、字符串操作
对字符串进行操作,非常简便有用的方法:
1)$concat:拼接字符串,此操作可以接收多个参数列表。
$concat:["$lastname",",","$firstname"]
2)$substr:截取字符串,语法:$substr : [<input expression>,<start>,<length>],其中start默认起始为0。
3)$toLower、$toUper:大小写转换
4)$strcasecmp:不分大小写,进行字符串比较,如果2个字符串一样,则返回0;如果第一个字符串比第二个“大”(字典顺序),则返回1,否则返回false。
8、$size:计算数组的元素个数,语法:$size : <array expression>;3.2版本新增数组表达式
9、日期计算
对Date或者timestamp进行计算,比如获取时间的月份、时间戳等等。简单列举如下:
$dayOfYear:当前日期是此年的第几天,0~366(闰年);$dayOfMonth:当前日期是本月的第几天,0~31;$dayOfWeek:当前日期是本周的第几天,1~7,1表示周日,7表示周六;$year:返回当前日期的年份,比如2014。$month:返回当前日期的月份。$week:返回当前日期为当年的第几周,0~53周。$hour、$minute、$second、$millisecond表示获取当前日期的时、分、秒、毫秒部分。
$dateToString:日期的格式转换,这个比较常用,语法格式:$dataToString : {format:<format>,date:"$date"}。格式占位符如下:
%Y:年份,4个数字,比如2014;%m:月份,2个数字,01~12;%d:日期,2个数字,01~31;%H:小时,2个数字,00~23;%M:分钟,00~59;%S:秒,00~59;%L:毫秒,000~999;%j:当前day为年的第几天,001~366;%U:当前日期所在年的第几周,00~53;%w:day是当前周的第几天,1~7。
$project : { "dateAsString":{$dateToString:{format:"%Y-%m-%d %H:%M:%S",date:"$date"}} }
10、条件表达式
1)$cond
上文中我们已经看到相关示例,它的作用等同于一个“三元表达式”,语法:$cond:{if : <boolean-expression>, then : <true-expression>,else : <false-expression> }。
2)$ifNull
判定指定表达式是否为null,比如字段不存在(undefined)、值为null。语法:$ifNull: [<expression>,<return-value expression>],如果expression的值为null则返回“return-value”。
四、示例
通过上文,我们基本了解了aggretation的原理和使用技巧,对于复杂的数据计算通常需要多次计算才能产出结果。此外还需要强调一点,良好的数据结构和数据存储模型,对数据分析和统计具有非常大的帮助,所以我们希望大家在数据分析之前,和appliation约定数据的模型,如果数据模型相差甚远,那么需要进行一次(或者增量的)清洗数据,并转存到特定的数据仓库中(比如mongodb,hive等)。
因为aggregate和mapreduce每次任务只能操作一个collection,数据数据来自多个collection,那么就需要多次分析,将分析结果统一写入一个result collection中,然后在对result进行最终的计算。
数据校对是个非常棘手的问题,通常可以采用“取样法”来测试,比如限定一定范围(或者随机)对整体数据取样1/10,然后对这个小数据集先运行数据统计脚本,查看结果是否有异动,对于异常情况适度调整算法。
接下来我们使用aggregate操作完成如下数据统计指标:
1)横向维度:统计当前月份之前的6个月新用户在当月的留存;比如现在为9月份,统计出3~8月份注册的用户,在9月份的留存情况。
2)纵向维度:分平台,分别统计“android”、“ios”、“pc”三个平台在横向维度的数据。
期望数据结果:
------------------------------------------------------------- | 平台\月份 | 1 | 2 | 3 | 4 | 5 | 6 | ------------------------------------------------------------- | android | 100 | 80 | 100 | 150 | 160 | 200 | ------------------------------------------------------------- | ios | 100 | 80 | 100 | 150 | 160 | 200 | ------------------------------------------------------------- | pc | 100 | 80 | 100 | 150 | 160 | 200 | -------------------------------------------------------------
我们假定有个collection为user,当用户注册时新建一条记录,created表示用户注册的时间,modfied表示用户最后登录的时间;userid上创建唯一索引,created、modified创建了组合索引,原始数据格式:
{_id : 1,userid:"10010","platform":"android","created":{"$date":1448805204482},"modified":{"$date":1448805204482}
aggregate语句:
db.user.aggregate([ { "$match" : { "created" : { "$gte" : { "$date" : 1430379347719 }, "$lt" : { "$date" : 1446307200000 } }, "modified" : { "$gte" : { "$date" : 1446190547719 } } } }, { "$group" : { "_id" : { "month" : { "$month" : "$created" }, "platform" : "$platform" }, "total" : { "$sum" : 1 } } }, { "$project" : { "platform" : "$_id.platform", "total" : 1, "month" : "$_id.month", "_id" : 0 } }, { "$sort" : { "platform" : 1, "month" : 1 } }, { "$group" : { "_id" : "$platform", "result" : { "$push" : { "month" : "$month", "total" : "$total" } } } }, { "$match" : { "created" : { "$gte" : { "$date" : 1430379347719 }, "$lt" : { "$date" : 1446307200000 } }, "modified" : { "$gte" : { "$date" : 1446190547719 } } } }, { "$group" : { "_id" : { "month" : { "$month" : "$created" }, "platform" : "$platform" }, "total" : { "$sum" : 1 } } }, { "$project" : { "platform" : "$_id.platform", "total" : 1, "month" : "$_id.month", "_id" : 0 } }, { "$sort" : { "platform" : 1, "month" : 1 } } ] }
JAVA程序实例:
MongoCollection<Document> collection = db.getCollection("user"); Date now = new Date();//11月30日 List<Document> pipeline = new ArrayList<Document>(); Document filter = new Document(); //过滤: //1)注册时间在6各月之前,不包括当月; //2)最后登陆时间在当月 filter.put("created", new Document("$gte",DateUtils.addMonths(now, -7)).append("$lt", DateUtils.truncate(now, Calendar.MONTH))); filter.put("modified",new Document("$gte",DateUtils.addMonths(now, -1))); Document match = new Document("$match",filter); pipeline.add(match); Document group = new Document(); //第一次group,将“月份”和“平台”作为复合key group.put("$group", new Document("_id", new Document("month", new Document("$month", "$created")).append("platform", "$platform")) .append("total", new Document("$sum", 1))); pipeline.add(group); //重命名字段名 Document project = new Document("$project",new Document("platform","$_id.platform") .append("total",1).append("month","$_id.month").append("_id",0)); pipeline.add(project); //按照平台和月份排序,确保,同一个平台的数据,是按照月份排序的,因为接下来要使用$push Document sort = new Document("$sort",new Document("platform",1).append("month", 1)); pipeline.add(sort); //第二次group,将上述输出数据,转换成根据平台为维度的inline方式。便于页面展示。 Document finalGroup = new Document("$group",new Document("_id","$platform") .append("result", new Document("$push", new Document("month", "$month").append("total", "$total")))); pipeline.add(finalGroup); //pipeline.add(new Document("$out","_" + DateFormatUtils.format(now,"yyyyMMdd"))); MongoCursor<Document> cursor = collection.aggregate(pipeline).allowDiskUse(true).iterator(); while (cursor.hasNext()) { Document item = cursor.next(); System.out.println(item.toJson()); } cursor.close();
输出结果示例:
{ "_id": "pc", "result": [ { "month": 5, "total": 2736 }, { "month": 6, "total": 2877 }, { "month": 7, "total": 2718 }, { "month": 8, "total": 2791 }, { "month": 9, "total": 2887 }, { "month": 10, "total": 2800 } ] } { "_id": "android", "result": [ { "month": 5, "total": 2742 }, { "month": 6, "total": 2850 }, { "month": 7, "total": 2782 }, { "month": 8, "total": 2752 }, { "month": 9, "total": 2740 }, { "month": 10, "total": 2941 } ] }
非常有趣的是,aggreate可以支持2个group、甚至更多个group操作,这大大减少了数据分析的开发难度,如果使用mapreduce可能需要开发多个脚本且运行多次。在技术日新月异的今天,基于大数据平台的数据计算技术不断涌现,出现大量了“SQL-LIKE Engineer”,它们可以将SQL语句转换成适用于NOSQL平台的操作,比如将SQL语句转换成Mongodb的aggreation或者mapreduce(内部或将数据转存到hadoop上进行mapreduce),这样又进一步的提升了开发效率,我们稍后介绍Apache Drill:mongodb平台上的SQL-Like引擎。