POSIX多线程的使用方式中, 有一种很重要的方式-----流水线(亦称为“管道”)方式,“数据元素”流串行地被一组线程按顺序执行。它的使用架构可参考下图:
为了回应用户对简单数据访问的需求,MongoDB2.2版本引入新的功能 聚合框架 (Aggregation Framework) ,它是数据聚合的一个新框架,其概念类似于数据处理的管道。 每个文档通过一个由多个节点组成的管道,每个节点有自己特殊的功能(分组、过滤等),文档经过管道处理后,最后输出相应的结果。管道基本的功能有两个:
其他的一些功能还包括按照某个指定的字段分组和排序等。而且在每个阶段还可以使用表达式操作符计算平均值和拼接字符串等相关操作。管道提供了一个MapReduce 的替代方案,MapReduce使用相对来说比较复杂,而管道的拥有固定的接口(操作符表达),使用比较简单,对于大多数的聚合任务管道一般来说是首选方法。
该框架使用声明性管道符号来支持类似于SQL Group By操作的功能,而不再需要用户编写自定义的JavaScript例程。
“$group”操作与SQL的Group By子句用途相同,但是使用起来却更像是LINQ中的分组运算符。与取回一行平面数据不同,“$group”操作的结果集会呈现为一个持续的嵌套结构。正因如此,使用“$group”可以返回聚合信息,例如对于每个分组中的实际文档,计算文档整体或部分的数目和平均值。
管道是由一个个功能节点组成的,这些节点用管道操作符来进行表示。聚合管道以一个集合中的所有文档作为开始,然后这些文档从一个操作节点 流向下一个节点 ,每个操作节点对文档做相应的操作。这些操作可能会创建新的文档或者过滤掉一些不符合条件的文档,在管道中可以对文档进行重复操作。
Name |
Description |
Reshapes a document stream. $project can rename, add, or remove fields as well as create computed values and sub-documents. |
Filters the document stream, and only allows matching documents to pass into the next pipeline stage. $match uses standard MongoDB queries. |
Restricts the number of documents in an aggregation pipeline. |
Skips over a specified number of documents from the pipeline and returns the rest. |
Takes an array of documents and returns them as a stream of documents. |
Groups documents together for the purpose of calculating aggregate values based on a collection of documents. |
Takes all input documents and returns them in a stream of sorted documents. |
Returns an ordered stream of documents based on proximity to a geospatial point. |
1. $project: 数据投影,主要用于重命名、增加和删除字段
doctoredPageViews : { $add:["$pageViews", 10] }
page_views : "$pageViews" ,
dpv : { $add:["$pageViews", 10] }
2.$match: 滤波操作,筛选符合条件文档,作为下一阶段的输入
db.articles.aggregate( [
{ $match : { score : { $gt : 70, $lte : 90 } } },
{ $group: { _id: null , count: { $sum: 1 } } }
3. $limit: 限制经过管道的文档数量
4. $skip: 从待操作集合开始的位置跳过文档的数目
> db.article.find() { "_id" : ObjectId("528751b0e7f3eea3d1412ce2"),
"author" : "Jone", "title" : "Abook",
"tags" : [ "good", "fun", "good" ] }
> db.article.aggregate({$project:{author:1,title:1,tags:1}},{$unwind:"$tags"}) { "result" : [ { "_id" : ObjectId("528751b0e7f3eea3d1412ce2"), "author" : "Jone", "title" : "A book", "tags" : "good" }, { "_id" : ObjectId("528751b0e7f3eea3d1412ce2"), "author" : "Jone", "title" : "A book", "tags" : "fun" }, { "_id" : ObjectId("528751b0e7f3eea3d1412ce2"), "author" : "Jone", "title" : "A book", "tags" : "good" } ], "ok" : 1 }
> db.article.aggregate({$project:{author:1,title:1,tags:1}},{$unwind:"$tag"}) { "result" : [ ], "ok" : 1 } 将$tags改为$tag因不存在该字段,该文档被忽略,输出的结果为空
> db.article.aggregate({$project:{author:1,title:1,tags:1}},{$unwind:"$title"})
Error: Printing Stack Trace at printStackTrace (src/mongo/shell/utils.js:37:15) at DBCollection.aggregate (src/mongo/shell/collection.js:897:9) at (shell):1:12 Sat Nov 16 19:16:54.488 JavaScript execution failed: aggregate failed: { "errmsg" : "exception: $unwind: value at end of field path must be an array", "code" : 15978, "ok" : 0 } at src/mongo/shell/collection.js:L898
6.$group 对数据进行分组
docsPerAuthor : { $sum : 1 },
viewsPerAuthor : { $sum : "$pageViews" }
注意: 1.$group的输出是无序的。
7.$sort : 对文档按照指定字段排序
db.users.aggregate( { $sort : { age : -1, posts: 1 } });
2.MongoDB 24.对内存做了优化,在管道中如果$sort出现在$limit之前的话,$sort只会对前$limit个文档进行操作,这样在内存中也只会保留前$limit个文档,从而可以极大的节省内存
Field |
Type |
Description |
GeoJSON point or legacy coordinate pairs |
The point for which to find the closest documents. |
The output field that contains the calculated distance. To specify a field within a subdocument, use dot notation . |
Optional. The maximum number of documents to return. The default value is 100. See also the num option. |
Optional. The num option provides the same function as the limitoption. Both define the maximum number of documents to return. If both options are included, the num value overrides the limit value. |
Optional. A distance from the center point. Specify the distance in radians. MongoDB limits the results to those documents that fall within the specified distance from the center point. |
Optional. Limits the results to the documents that match the query. The query syntax is the usual MongoDB read operation query syntax. |
Optional. If true, MongoDB references points using a spherical surface. The default value is false. |
Optional. The factor to multiply all distances returned by the query. For example, use the distanceMultiplier to convert radians, as returned by a spherical query, to kilometers by multiplying by the radius of the Earth. |
Optional. This specifies the output field that identifies the location used to calculate the distance. This option is useful when a location field contains multiple locations. To specify a field within a subdocument, use dot notation . |
Optional. If this value is true, the query returns a matching document once, even if more than one of the document’s location fields match the query. If this value is false, the query returns a document multiple times if the document has multiple matching location fields. See $uniqueDocs for more information. |
near: [40.724, -73.997],
distanceField: "dist.calculated",
query: { type: "public" },
includeLocs: "dist.location",
uniqueDocs: true ,
"name" : "Washington Square",
"calculated" : 0.0050990195135962296,
"location" : [ 40.729, -73.996 ]
"name" : "Sara D. Roosevelt Park",
"calculated" : 0.006082762530298062,
"location" : [ 40.723, -73.991 ]
注意: 1.使用$goNear只能在管道处理的开始第一个阶段进行
3.$gonNear和geoNear命令比较相似,但是也有一些不同:distanceField在$geoNear中是必选的,而在 geoNear中是可选的;includeLocs在$geoNear中是string类型,而在geoNear中是boolen类型。
Name |
Description |
Returns an array of all the unique values for the selected field among for each document in that group. |
Returns the first value in a group. |
Returns the last value in a group. |
Returns the highest value in a group. |
Returns the lowest value in a group. |
Returns an average of all the values in a group. |
Returns an array of all values for the selected field among for each document in that group. |
Returns the sum of all the values in a group. |
Name |
Description |
Returns true only when all values in its input array are true. |
Returns true when any value in its input array are true. |
Returns the boolean value that is the opposite of the input value. |
Name |
Description |
Compares two values and returns the result of the comparison as an integer. |
Takes two values and returns true if the values are equivalent. |
Takes two values and returns true if the first is larger than the second. |
Takes two values and returns true if the first is larger than or equal to the second. |
Takes two values and returns true if the second value is larger than the first. |
Takes two values and returns true if the second value is larger than or equal to the first. |
Takes two values and returns true if the values are not equivalent. |
Name |
Description |
Computes the sum of an array of numbers. |
Takes two numbers and divides the first number by the second. |
Takes two numbers and calcualtes the modulo of the first number divided by the second. |
Computes the product of an array of numbers. |
Takes two numbers and subtracts the second number from the first. |
Name |
Description |
Concatenates two strings. |
Compares two strings and returns an integer that reflects the comparison. |
Takes a string and returns portion of that string. |
Converts a string to lowercase. |
Converts a string to uppercase. |
Name |
Description |
Converts a date to a number between 1 and 366. |
Converts a date to a number between 1 and 31. |
Converts a date to a number between 1 and 7. |
Converts a date to the full year. |
Converts a date into a number between 1 and 12. |
Converts a date into a number between 0 and 53 |
Converts a date into a number between 0 and 23. |
Converts a date into a number between 0 and 59. |
Converts a date into a number between 0 and 59. May be 60 to account for leap seconds. |
Returns the millisecond portion of a date as an integer between 0 and 999. |
Name |
Description |
A ternary operator that evaluates one expression, and depending on the result returns the value of one following expressions. |
Evaluates an expression and returns a value. |
1.$sort + $skip + $limit顺序优化
{ $sort: { age : -1 } },
{ $sort: { age : -1 } },
此时$limit = 优化前$skip+优化前$limit
2.$limit + $skip + $limit + $skip Sequence Optimization
进一步优化:两个$limit可以直接取最小值 ,两个$skip可以直接相加:
3.Projection Optimization
在管道内不能操作 Symbol, MinKey, MaxKey, DBRef, Code, CodeWScope类型的数据( 2.4版本解除了对二进制数据的限制).
管道线的输出结果不能超过BSON 文档的大小(16M),如果超出的话会产生错误.
首先下载测试数据: http://media.mongodb.org/zips.json并导入到数据库中。
var connectionString = ConfigurationManager.AppSettings["MongodbConnection"];
var client = new MongoClient(connectionString);
var DatabaseName = ConfigurationManager.AppSettings["DatabaseName"];
string collName = ConfigurationManager.AppSettings["collName"];
MongoServer mongoDBConn = client.GetServer();
MongoDatabase db = mongoDBConn.GetDatabase(DatabaseName);
MongoCollection<BsonDocument> table = db[collName];
var group = new BsonDocument
{"$group", new BsonDocument
"totalPop", new BsonDocument
var sort = new BsonDocument
{"$sort", new BsonDocument{ { "_id",1 }}}
var pipeline = new[] { group, sort };
var result = table.Aggregate(pipeline);
var matchingExamples = result.ResultDocuments.Select(x => x.ToDynamic()).ToList();
foreach (var example in matchingExamples)
var message = string.Format("{0}- {1}", example["_id"], example["totalPop"]);
> db.zipcode.aggregate({$group:{_id:{state:"$state",city:"$city"},pop:{$sum:"$pop"}}},
var group1 = new BsonDocument
{"$group", new BsonDocument
"_id",new BsonDocument
"pop", new BsonDocument
var group2 = new BsonDocument
{"$group", new BsonDocument
"avCityPop", new BsonDocument
var pipeline1 = new[] { group1,group2, sort };
var result1 = table.Aggregate(pipeline1);
var matchingExamples1 = result1.ResultDocuments.Select(x => x.ToDynamic()).ToList();
foreach (var example in matchingExamples1)
var message = string.Format("{0}- {1}", example["_id"], example["avCityPop"]);
var sort1 = new BsonDocument
{"$sort", new BsonDocument{ { "pop",1 }}}
var group3 = new BsonDocument
"$group", new BsonDocument
"biggestCity",new BsonDocument
"biggestPop",new BsonDocument
"smallestCity",new BsonDocument
"smallestPop",new BsonDocument
var project = new BsonDocument
"$project", new BsonDocument
{"biggestCity",new BsonDocument
{"smallestCity",new BsonDocument
var pipeline2 = new[] { group1,sort1 ,group3, project };
var result2 = table.Aggregate(pipeline2);
var matchingExamples2 = result2.ResultDocuments.Select(x => x.ToDynamic()).ToList();
foreach (var example in matchingExamples2)
//var message = string.Format("{0}- {1}", example["_id"], example["avCityPop"]);
对于大多数的聚合操作,聚合管道可以提供很好的性能和一致的接口,使用起来比较简单, 和MapReduce一样,它也可以作用于分片集合,但是输出的结果只能保留在一个文档中,要遵守BSON Document大小限制(当前是16M)。
MongoDB 聚合管道(Aggregation Pipeline)