Mongo数据库 --- Mongo Pipeline
- 什么是Mongo Pipeline
- Mongo Pipeline常用的几个Stage
- Explanation with example:
- MongoDB $match
- MongoDB $project
- MongoDB $group
- MongoDB $unwind
- MongoDB $count
- MongoDB $addFields
- Some Query Examples
- 在C#中使用Aggreagtion Pipeline
- **方法一: 使用RawBsonDocument**
- 使用 Fluent API
什么是Mongo Pipeline
- 在MongoDB中,聚合管道(aggregation pipeline)是一种用于处理和转换数据的机制。它允许您按顺序对集合中的文档执行一系列操作,并将结果从一个阶段传递到下一个阶段。聚合管道由多个阶段组成,每个阶段在输入文档上执行特定的操作,并生成转换后的输出,作为下一个阶段的输入。
- 聚合管道中的每个阶段接收输入文档,并应用某种操作,例如过滤、分组、投影或排序,以生成一组修改后的文档。一个阶段的输出成为下一个阶段的输入,形成了一个数据处理的管道流程.
- 聚合管道提供了一种强大而灵活的方式来执行复杂的数据处理任务,例如过滤、分组、排序、连接和聚合数据,所有这些都可以在MongoDB查询框架内完成。它能够高效地处理大型数据集,并提供了一种方便的方式来在返回结果之前对数据进行形状和操作
- MongoDB的聚合管道(aggregation pipeline)是在MongoDB服务器上执行的。聚合管道操作在服务器端进行,而不是将数据取出并在本地内存中执行
Mongo Pipeline常用的几个Stage
- $match:根据指定的条件筛选文档,类似于查询中的find操作。可以使用各种条件和表达式进行匹配。
- $project:重新塑造文档结构,包括选择特定字段、排除字段、创建计算字段、重命名字段等。还可以使用表达式进行计算和转换。
- $group:按照指定的键对文档进行分组,并对每个组执行聚合操作,如计算总和、平均值、计数等。可以进行多字段分组和多个聚合操作。
- $sort:根据指定的字段对文档进行排序,可以指定升序或降序排序。
- $limit:限制返回结果的文档数量,只保留指定数量的文档。
- $skip:跳过指定数量的文档,返回剩余的文档。
- $unwind:将包含数组的字段拆分为多个文档,每个文档包含数组中的一个元素。这在对数组字段进行聚合操作时很有用。
- $lookup:执行左连接操作,将当前集合中的文档与其他集合中的文档进行关联。可以根据匹配条件将相关文档合并到结果中
Explanation with example:
Example使用以下数据
//University Collection
{country : 'Spain',city : 'Salamanca',name : 'USAL',location : {type : 'Point',coordinates : [ -5.6722512,17, 40.9607792 ]},students : [{ year : 2014, number : 24774 },{ year : 2015, number : 23166 },{ year : 2016, number : 21913 },{ year : 2017, number : 21715 }]
}{country : 'Spain',city : 'Salamanca',name : 'UPSA',location : {type : 'Point',coordinates : [ -5.6691191,17, 40.9631732 ]},students : [{ year : 2014, number : 4788 },{ year : 2015, number : 4821 },{ year : 2016, number : 6550 },{ year : 2017, number : 6125 }]
}
//Course Collection
{university : 'USAL',name : 'Computer Science',level : 'Excellent'
}
{university : 'USAL',name : 'Electronics',level : 'Intermediate'
}
{university : 'USAL',name : 'Communication',level : 'Excellent'
}
MongoDB $match
- 根据指定的条件筛选文档,类似于查询中的find操作。可以使用各种条件和表达式进行匹配。
db.universities.aggregate([{ $match : { country : 'Spain', city : 'Salamanca' } }
]).pretty()
Output:
{country : 'Spain',city : 'Salamanca',name : 'USAL',location : {type : 'Point',coordinates : [ -5.6722512,17, 40.9607792 ]},students : [{ year : 2014, number : 24774 },{ year : 2015, number : 23166 },{ year : 2016, number : 21913 },{ year : 2017, number : 21715 }]
}{country : 'Spain',city : 'Salamanca',name : 'UPSA',location : {type : 'Point',coordinates : [ -5.6691191,17, 40.9631732 ]},students : [{ year : 2014, number : 4788 },{ year : 2015, number : 4821 },{ year : 2016, number : 6550 },{ year : 2017, number : 6125 }]
}
MongoDB $project
- 重新塑造文档结构,包括选择特定字段、排除字段、创建计算字段、重命名字段等。还可以使用表达式进行计算和转换
- In the code that follows, please note that:
- We must explicitly write _id : 0 when this field is not required
- Apart from the _id field, it is sufficient to specify only those fields we need to obtain as a result of the query
db.universities.aggregate([{ $project : { _id : 0, country : 1, city : 1, name : 1 } }
]).pretty()
Output:
{ "country" : "Spain", "city" : "Salamanca", "name" : "USAL" }
{ "country" : "Spain", "city" : "Salamanca", "name" : "UPSA" }
MongoDB $group
- 按照指定的键对文档进行分组,并对每个组执行聚合操作,如计算总和、平均值、计数等。可以进行多字段分组和多个聚合操作。
//$sum : 1 的意思是计算每个分组中document的数量,$sum : 2 则是doccument数量的两倍
db.universities.aggregate([{ $group : { _id : '$name', totaldocs : { $sum : 1 } } }
]).pretty()
Output:
{ "_id" : "UPSA", "totaldocs" : 1 }
{ "_id" : "USAL", "totaldocs" : 1 }
- $group支持的operator
- $count: Calculates the quantity of documents in the given group.
- $max Displays the maximum value of a document’s field in the collection.
- $min Displays the minimum value of a document’s field in the collection.
- $avg Displays the average value of a document’s field in the collection.
- $sum Sums up the specified values of all documents in the collection.
- $push Adds extra values into the array of the resulting document.
MongoDB $unwind
- 将包含数组的字段拆分为多个文档,每个文档包含数组中的一个元素。这在对数组字段进行聚合操作时很有用
db.universities.aggregate([{ $match : { name : 'USAL' } },{ $unwind : '$students' }
]).pretty()
- unwind students会把每个student记录提取出来和剩下的属性拼起来变成一个独立的dcoument
Input
{country : 'Spain',city : 'Salamanca',name : 'USAL',location : {type : 'Point',coordinates : [ -5.6722512,17, 40.9607792 ]},students : [{ year : 2014, number : 24774 },{ year : 2015, number : 23166 },]
}
Output:
{"_id" : ObjectId("5b7d9d9efbc9884f689cdba9"),"country" : "Spain","city" : "Salamanca","name" : "USAL","location" : {"type" : "Point","coordinates" : [-5.6722512,17,40.9607792]},"students" : {"year" : 2014, "number" : 24774}
}
{"_id" : ObjectId("5b7d9d9efbc9884f689cdba9"),"country" : "Spain","city" : "Salamanca","name" : "USAL","location" : {"type" : "Point","coordinates" : [-5.6722512,17,40.9607792]},"students" : {"year" : 2015,"number" : 23166}
}
MongoDB $count
- The $count stage provides an easy way to check the number of documents obtained in the output of the previous stages of the pipeline
db.universities.aggregate([{ $unwind : '$students' },{ $count : 'total_documents' }
]).pretty()
Output:
{ "total_documents" : 8 }
MongoDB $addFields
- It is possible that you need to make some changes to your output in the way of new fields. In the next example, we want to add the year of the foundation of the university.
- addField 不会修改数据库
db.universities.aggregate([{ $match : { name : 'USAL' } },{ $addFields : { foundation_year : 1218 } }
]).pretty()
{"_id" : ObjectId("5b7d9d9efbc9884f689cdba9"),"country" : "Spain","city" : "Salamanca","name" : "USAL","location" : {"type" : "Point","coordinates" : [-5.6722512,17,40.9607792]},"students" : [{"year" : 2014,"number" : 24774},{"year" : 2015,"number" : 23166},{"year" : 2016,"number" : 21913},{"year" : 2017,"number" : 21715}],"foundation_year" : 1218
}
Some Query Examples
db.collection.aggregate([{ $match: { age: { $gt: 30 } } }
])
db.collection.aggregate([{ $group: { _id: "$category", total: { $sum: "$quantity" } } }
])
//This example selects the "name" field and creates a new field called "fullName"
//by concatenating the "firstName" and "lastName" fields.
db.collection.aggregate([{ $project: { _id: 0, name: 1, fullName: { $concat: ["$firstName", " ", "$lastName"] } } }
])
//This example sorts documents in descending order based on the "age" field.
db.collection.aggregate([{ $sort: { age: -1 } }
])
//This example limits the output to only the first 10 documents.
db.collection.aggregate([{ $limit: 10 }
])
在C#中使用Aggreagtion Pipeline
方法一: 使用RawBsonDocument
BsonDocument pipelineStage1 = new BsonDocument{{"$match", new BsonDocument{{ "username", "nraboy" }}}
};BsonDocument pipelineStage2 = new BsonDocument{{ "$project", new BsonDocument{{ "_id", 1 },{ "username", 1 },{ "items", new BsonDocument{{"$map", new BsonDocument{{ "input", "$items" },{ "as", "item" },{"in", new BsonDocument{{"$convert", new BsonDocument{{ "input", "$$item" },{ "to", "objectId" }}}}}}}}}}}
};BsonDocument pipelineStage3 = new BsonDocument{{"$lookup", new BsonDocument{{ "from", "movies" },{ "localField", "items" },{ "foreignField", "_id" },{ "as", "movies" }}}
};BsonDocument pipelineStage4 = new BsonDocument{{ "$unwind", "$movies" }
};BsonDocument pipelineStage5 = new BsonDocument{{"$group", new BsonDocument{{ "_id", "$_id" },{ "username", new BsonDocument{{ "$first", "$username" }} },{ "movies", new BsonDocument{{ "$addToSet", "$movies" }}}}}
};BsonDocument[] pipeline = new BsonDocument[] { pipelineStage1, pipelineStage2, pipelineStage3, pipelineStage4, pipelineStage5
};List<BsonDocument> pResults = playlistCollection.Aggregate<BsonDocument>(pipeline).ToList();foreach(BsonDocument pResult in pResults) {Console.WriteLine(pResult);
}
使用 Fluent API
var pResults = playlistCollection.Aggregate().Match(new BsonDocument{{ "username", "nraboy" }}).Project(new BsonDocument{{ "_id", 1 },{ "username", 1 },{"items", new BsonDocument{{"$map", new BsonDocument{{ "input", "$items" },{ "as", "item" },{"in", new BsonDocument{{"$convert", new BsonDocument{{ "input", "$$item" },{ "to", "objectId" }}}}}}}}}}).Lookup("movies", "items", "_id", "movies").Unwind("movies").Group(new BsonDocument{{ "_id", "$_id" },{"username", new BsonDocument{{ "$first", "$username" }}},{"movies", new BsonDocument{{ "$addToSet", "$movies" }}}}).ToList();foreach(var pResult in pResults) {Console.WriteLine(pResult);
}