文章目录
- 聚合操作
- 聚合管道
- 管道(Pipeline)和阶段(Stage)
- 常用的聚合阶段运算符
- 准备数据集,执行脚本
- $project
- $match
- $count
- $group
- $unwind
- $limit
- $skip
- $sort
- $lookup
- 聚合操作案例1
- 聚合操作案例2
- 聚合优化
- 执行顺序
- 内存排序
- 整合Springboot进行聚合操作
- 案例一
- 案例二
- 案例三
聚合操作
聚合操作允许用户处理多个文档并返回计算结果。
聚合操作包含三类:单一作用聚合、聚合管道、MapReduce。我们主要掌握其中的聚合管道方式即可。
-
单一作用聚合
提供了对常见聚合过程的简单访问,操作都从单个集合聚合文档。
MongoDB提供
db.collection.estimatedDocumentCount(), db.collection.countDocument(), db.collection.distinct()
这类单一作用的聚合函数。 -
聚合管道
一个数据聚合的框架,模型基于数据处理流水线的概念。文档进入多级管道,将文档转换为聚合结果。
-
MapReduce
操作具有两个阶段:处理每个文档并向每个输入文档发射一个或多个对象的map阶段,以及reduce组合map操作的输出阶段。
从MongoDB 5.0开始,map-reduce操作已被弃用。聚合管道比映射-reduce操作提供更好的性能和可用性。
MongoDB 6.0在原有聚合功能的基础上,推出了如下新特性以及优化项:
- 分片集群实例支持 l o o k u p 和 lookup和 lookup和graphLookup。
- 改进$lookup对JOINS的支持。
- 改进$graphLookup对图遍历的支持。
- 提升$lookup性能,部分场景中性能提升可达百倍。
聚合管道
管道(Pipeline)和阶段(Stage)
整个聚合运算过程称为管道(Pipeline),它是由多个阶段(Stage)组成的, 每个管道:
- 接受一系列文档(原始数据);
- 每个阶段对这些文档进行一系列运算;
- 结果文档输出给下一个阶段;
聚合管道操作语法:
pipeline = [$stage1, $stage2, ...$stageN];
db.collection.aggregate(pipeline, {options})
- pipelines 一组数据聚合阶段。除
$out、$Merge、$geonear
阶段之外,每个阶段都可以在管道中出现多次。 - options 可选,聚合操作的其他参数。包含:查询计划、是否使用临时文件、 游标、最大操作时间、读写策略、强制索引等等
案例:先匹配出status字段为A的文档,再通过_id
进行分组,并对amount字段进行求和,将求和的结果取一个别名total
在下面分组的阶段中,我们的字段名都是这种格式"$字段名"
常用的聚合阶段运算符
SQL 聚合映射图表
文档:Aggregation Pipeline Stages — MongoDB Manual
阶段运算符 | 描述 | SQL等价运算符 |
---|---|---|
$match | 筛选条件 | WHERE |
$project | 投影 | AS |
$lookup | 左外连接 | LEFT OUTER JOIN |
$sort | 排序 | ORDER BY |
$group | 分组 | GROUP BY |
$skip + $limit | 分页 | |
$unwind | 展开数组 | 把多个值展开为多个文档 |
$graphLookup | 图搜索 | |
f a c e t / facet/ facet/bucket | 分面搜索 |
聚合表达式
获取字段信息
$<field> : 用 $ 指示字段路径
$<field>.<sub field> : 使用 $ 和 . 来指示内嵌文档的路径
常量表达式
$literal :<value> : 指示常量 <value>
系统变量表达式
$$<variable> 使用 $$ 指示系统变量
$$CURRENT 指示管道中当前操作的文档
准备数据集,执行脚本
var tags = ["nosql","mongodb","document","developer","popular"];
var types = ["technology","sociality","travel","novel","literature"];
var books=[];
for(var i=0;i<50;i++){var typeIdx = Math.floor(Math.random()*types.length);var tagIdx = Math.floor(Math.random()*tags.length);var tagIdx2 = Math.floor(Math.random()*tags.length);var favCount = Math.floor(Math.random()*100);var username = "xx00"+Math.floor(Math.random()*10);var age = 20 + Math.floor(Math.random()*15);var book = {title: "book-"+i,type: types[typeIdx],tag: [tags[tagIdx],tags[tagIdx2]],favCount: favCount,author: {name:username,age:age}};books.push(book)
}
db.books.insertMany(books);
为接下来的测试案例,可以直接在mondosh命令行执行,当然也可以创建js文件,通过db.load()方式执行
插入之后的数据
$project
投影操作, 将原始字段投影成指定名称,也就是mysql中的字段别名功能。 如将集合中的 title 投影成 name
db.books.aggregate([{ $project: { name: "$title" } }])
$project 可以灵活控制输出文档的格式,也可以剔除不需要的字段
db.books.aggregate([{$project: {_id: 0,author: 1,type: 1}}
])
从嵌套文档中排除字段
db.books.aggregate([{$project: {_id: 0,"author.name": 1,type: 1}}
])# 或者使用下面这种写法
db.books.aggregate([{$project: {_id: 0,type: 1,author: { name: 1 }}}
])
$match
$match用于对文档进行筛选,之后可以在得到的文档子集上做聚合。也就是类似于Mysql的where功能
在实际应用中尽可能将$match放在管道的前面位置
# 查找type字段为novel的文档
db.books.aggregate([{$match: {type: "novel"}}
])
筛选管道操作和其他管道操作配合时候时,尽量放到开始阶段,这样可以减少后续管道操作符要操作的文档数,提升效率
# 先查询type字段为novel的文档,然后再投影,只显示某些字段
db.books.aggregate([{$match: {type: "novel"}},{$project: {_id:0, title:1, type:1, favCount:1}}
])
$count
计数并返回与查询匹配的结果数
# 先查询type字段为novel的文档,然后再统计个数,并将该值分配给type_count
db.books.aggregate([{$match: {type: "novel"}},{$count: "type_count"}
])# 执行结果
[ { type_count: 9 } ]
$group
官方文档
按指定的表达式对文档进行分组,并将每个不同分组的文档输出到下一个阶段
使用 $group
管道阶段中的 _id
字段来设置组键。也就是说我想按照什么来进行分组就需要在_id
中设置
# 标准格式
{$group:{_id: <expression>, // Group key<field1>: { <accumulator1> : <expression1> },...}}
_id
字段是必填的;但是,可以指定_id
值为null来为整个输入文档计算累计值。<field1>
可选。使用累加器操作符<accumulator>
操作符必须是累加器操作符之一
常用的accumulator操作符如下所示
名称 | 描述 | 类比sql |
---|---|---|
$avg | 计算均值 | avg |
$first | 返回每组第一个文档,如果有排序,按照排序,如果没有按照默认的存储的顺序的第一个文档。 | limit 0,1 |
$last | 返回每组最后一个文档,如果有排序,按照排序,如果没有按照默认的存储的顺序的最后个文档。 | - |
$max | 根据分组,获取集合中所有文档对应值得最大值。 | max |
$min | 根据分组,获取集合中所有文档对应值得最小值。 | min |
$push | 将指定的表达式的值添加到一个数组中。 | - |
$addToSet | 将表达式的值添加到一个集合中(无重复值,无序)。 | - |
$sum | 计算总和 | sum |
$stdDevPop | 返回输入值的总体标准偏差(population standard deviation) | - |
$stdDevSamp | 返回输入值的样本标准偏差(the sample standard deviation) | - |
$group阶段的内存限制为100M。
默认情况下,如果stage超过此限制, g r o u p 将产生错误。但是,要允许处理大型数据集,请将 a l l o w D i s k U s e 选项设置为 t r u e 以启用 group将产生错误。但是,要允许处理大型数据集,请将allowDiskUse选项设置为true以启用 group将产生错误。但是,要允许处理大型数据集,请将allowDiskUse选项设置为true以启用group操作以写入临时文件。
# book的数量,收藏总数和平均值 这里就不需要进行分组了
db.books.aggregate([{$group: {_id: null,bookcountSum: { $sum: 1 },favCountSum: { $sum: "$favCount" },favCountAvg: { $avg: "$favCount" }}}
])# 执行结果
[{_id: null,bookcountSum: 50,favCountSum: 2349,favCountAvg: 46.98}
# 统计每个作者的book收藏总数
db.books.aggregate([{$group: {_id: "$author.name",bookFavSum: { $sum: "$favCount" }}}
])# 执行结果
[{ _id: 'xx000', bookFavSum: 373 },{ _id: 'xx004', bookFavSum: 173 },{ _id: 'xx009', bookFavSum: 313 },{ _id: 'xx001', bookFavSum: 143 },{ _id: 'xx002', bookFavSum: 377 },{ _id: 'xx005', bookFavSum: 245 },{ _id: 'xx003', bookFavSum: 137 },{ _id: 'xx008', bookFavSum: 187 },{ _id: 'xx006', bookFavSum: 198 },{ _id: 'xx007', bookFavSum: 203 }
]
# 统计每个作者的每本book的收藏数
db.books.aggregate([{$group: {_id: {name: "$author.name",book: "$title"},bookFavSum: {$sum: "$favCount"}}}
])# 执行结果
[{ _id: { name: 'xx001', book: 'book-0' }, bookFavSum: 63 },{ _id: { name: 'xx009', book: 'book-8' }, bookFavSum: 62 },{ _id: { name: 'xx002', book: 'book-22' }, bookFavSum: 39 },{ _id: { name: 'xx008', book: 'book-12' }, bookFavSum: 37 },{ _id: { name: 'xx006', book: 'book-33' }, bookFavSum: 9 },{ _id: { name: 'xx002', book: 'book-7' }, bookFavSum: 69 },{ _id: { name: 'xx005', book: 'book-38' }, bookFavSum: 27 },{ _id: { name: 'xx004', book: 'book-23' }, bookFavSum: 85 },{ _id: { name: 'xx007', book: 'book-34' }, bookFavSum: 26 },{ _id: { name: 'xx004', book: 'book-21' }, bookFavSum: 78 },{ _id: { name: 'xx000', book: 'book-29' }, bookFavSum: 54 },{ _id: { name: 'xx006', book: 'book-40' }, bookFavSum: 93 },{ _id: { name: 'xx000', book: 'book-27' }, bookFavSum: 93 },{ _id: { name: 'xx002', book: 'book-24' }, bookFavSum: 63 },{ _id: { name: 'xx006', book: 'book-41' }, bookFavSum: 1 },{ _id: { name: 'xx005', book: 'book-45' }, bookFavSum: 56 },{ _id: { name: 'xx009', book: 'book-46' }, bookFavSum: 3 },{ _id: { name: 'xx007', book: 'book-28' }, bookFavSum: 10 },{ _id: { name: 'xx006', book: 'book-49' }, bookFavSum: 6 },{ _id: { name: 'xx006', book: 'book-17' }, bookFavSum: 32 }
]
# 每个作者book 的type合集
db.books.aggregate([{$group: {_id: {name: "$author.name"},typeList: {$addToSet: "$type"}}}
])# 执行结果为
[{_id: { name: 'xx000' },typeList: [ 'technology', 'literature', 'travel', 'novel' ]},{ _id: { name: 'xx004' }, typeList: [ 'technology', 'literature' ] },{_id: { name: 'xx009' },typeList: [ 'novel', 'technology', 'literature', 'sociality' ]},{_id: { name: 'xx001' },typeList: [ 'literature', 'sociality', 'technology' ]},
......
]
$unwind
官方文档
可以将数组拆分为单独的文档
# 您可以将数组字段路径传递给 $unwind。使用该语法时,如果字段值为 null、缺失或空数组,则 $unwind 不会输出文档。
# 如需指定字段路径,在字段名称前加上美元符号 $,并用引号括起来。
{ $unwind: <field path> }
v3.2+支持如下语法:
{$unwind:{#要指定字段路径,在字段名称前加上$符并用引号括起来。path: <field path>,#可选,一个新字段的名称用于存放元素的数组索引。该名称不能以$开头。includeArrayIndex: <string>, #可选,default :false,若为true,如果路径为空,缺少或为空数组,则$unwind输出文档preserveNullAndEmptyArrays: <boolean> } }
# 姓名为xx006的作者的book的tag数组拆分为多个文档
db.books.aggregate([{$match: {"author.name": "xx006"}},{$unwind: "$tag"}
])# 或者是下面这种写法
db.books.aggregate([{$match: {"author.name": "xx006"}},{$unwind: {path: "$tag",includeArrayIndex: "tagIndex",preserveNullAndEmptyArrays: true}}
])# 输出结果为
[
.....{_id: ObjectId("66a5b0106ae7536b7b29726f"),title: 'book-49',type: 'travel',tag: 'nosql',favCount: 6,author: { name: 'xx006', age: 30 },tagIndex: Long("0") # tagIndex为我自定义的字段名 原数组索引下标},{_id: ObjectId("66a5b0106ae7536b7b29726f"),title: 'book-49',type: 'travel',tag: 'developer',favCount: 6,author: { name: 'xx006', age: 30 },tagIndex: Long("1") # 原数组索引下标}
]
# 每个作者的book的tag合集
db.books.aggregate([{$unwind: {path: "$tag"}},{$group: {_id: "$author.name",types: {$addToSet: "$tag"}}}
])# 输出结果
[{_id: 'xx000',types: [ 'document', 'mongodb', 'nosql', 'popular', 'developer' ]},{_id: 'xx004',types: [ 'mongodb', 'nosql', 'developer', 'popular' ]},{_id: 'xx001',types: [ 'document', 'mongodb', 'nosql', 'popular', 'developer' ]},{_id: 'xx003',types: [ 'mongodb', 'developer', 'nosql', 'popular' ]},
......
]
案例
示例数据
# tag为[]空数组、没有tag字段、tag数组有值 三种情况
db.books.insert([
{"title" : "book-51","type" : "technology","favCount" : 11,"tag":[],"author" : {"name" : "hushang","age" : 28}
},{"title" : "book-52","type" : "technology","favCount" : 15,"author" : {"name" : "hushang","age" : 28}
},{"title" : "book-53","type" : "technology","tag" : ["nosql","document"],"favCount" : 20,"author" : {"name" : "hushang","age" : 28}
}])
测试
# 只要作者为hushang的文档 使用includeArrayIndex选项来输出数组元素的数组索引
db.books.aggregate([{$match: {"author.name": "hushang"}},{$unwind: {path: "$tag",includeArrayIndex: 'tagIndex'}}
])# 执行结果,只有两条数据
[{_id: ObjectId("66a5cf9fe78ef1c1a23525b3"),title: 'book-53',type: 'technology',tag: 'nosql',favCount: 20,author: { name: 'hushang', age: 28 },tagIndex: Long("0") # 源数组下标索引},{_id: ObjectId("66a5cf9fe78ef1c1a23525b3"),title: 'book-53',type: 'technology',tag: 'document',favCount: 20,author: { name: 'hushang', age: 28 },tagIndex: Long("1") # 源数组下标索引}
]
# 使用preserveNullAndEmptyArrays选项在输出中包含缺少path字段,null或空数组的文档
db.books.aggregate([{$match: {"author.name": "hushang"}},{$unwind: {path: "$tag",includeArrayIndex: 'tagIndex',preserveNullAndEmptyArrays: true}}
])# 此时的执行结果就有四条数据了,将另外两个tag字段为空数组 或者 null 也一起输出了
[{ # 没有tag字段,并且tagIndex为null_id: ObjectId("66a5cf9fe78ef1c1a23525b1"),title: 'book-51',type: 'technology',favCount: 11,author: { name: 'hushang', age: 28 },tagIndex: null},{ # 没有tag字段,并且tagIndex为null_id: ObjectId("66a5cf9fe78ef1c1a23525b2"),title: 'book-52',type: 'technology',favCount: 15,author: { name: 'hushang', age: 28 },tagIndex: null},{_id: ObjectId("66a5cf9fe78ef1c1a23525b3"),title: 'book-53',type: 'technology',tag: 'nosql',favCount: 20,author: { name: 'hushang', age: 28 },tagIndex: Long("0")},{_id: ObjectId("66a5cf9fe78ef1c1a23525b3"),title: 'book-53',type: 'technology',tag: 'document',favCount: 20,author: { name: 'hushang', age: 28 },tagIndex: Long("1")}
]
$limit
官方文档
限制传递到管道中下一阶段的文档数
db.books.aggregate([{$limit: 5}
])
此操作仅返回管道传递给它的前5个文档。 $limit对其传递的文档内容没有影响。
注意:当$sort
在管道中的$limit
之前出现时,$sort
操作只会在过程中维持前n个结果,其中n是指定的限制,而MongoDB只需要将n个项存储在内存中。
$skip
官方文档
跳过进入stage的指定数量的文档,并将其余文档传递到管道中的下一个阶段
此操作将跳过管道传递给它的前5个文档。 $skip对沿着管道传递的文档的内容没有影响。
db.books.aggregate([{ $skip : 5 }
]);
$sort
官方文档
将所有输入文档进行排序,然后按照排序将其返回至管道。
语法:
{ $sort: { <field1>: <sort order>, <field2>: <sort order> ... } }
$sort
接受指定要作为排序依据的字段以及相应排序顺序的文档。
- field表示要排序依据的字段
- sort order,如果为1表示升序排序,-1表示降序排序
如果对多个字段进行排序,则按从左到右的顺序进行排序。
db.books.aggregate([{$sort: {favCount: 1,"author.age": -1}}
])
$lookup
$lookup 官方文档
5.1 版本中进行了更改。可以将 $lookup
与分片集合一起使用。
对同一 数据库中的一个集合执行左外连接,以过滤“已连接”集合中的文档以便进行处理。
$lookup
阶段向每个输入文档添加一个新的数组字段,(可根据需要命名新key )。数组列存放的数据是来自被Join集合的适配文档,如果没有,集合为空(即 为[ ])
db.collection.aggregate([{$lookup: {from: "<collection to join>",localField: "<field from the input documents>",foreignField: "<field from the documents of the from collection>",as: "<output array field>"}})
关键字 | 说明 |
---|---|
from | 同一个数据库下等待被Join的集合。 |
localField | 源集合中的match值,如果输入的集合中,某文档没有 localField这个Key(Field),在处理的过程中,会默认为此文档含有 localField:null的键值对。 |
foreignField | 待Join的集合的match值,如果待Join的集合中,文档没有foreignField值,在处理的过程中,会默认为此文档含有 foreignField:null的键值对。 |
as | 为输出文档的新增值命名。如果输入的集合中已存在该值,则会覆盖掉 |
注意:null = null 此为真
案例:
数据准备
db.orders.insertMany( [{ "_id" : 1, "item" : "almonds", "price" : 12, "quantity" : 2 },{ "_id" : 2, "item" : "pecans", "price" : 20, "quantity" : 1 },{ "_id" : 3 }
] )db.inventory.insertMany( [{ "_id" : 1, "sku" : "almonds", "description": "product 1", "instock" : 120 },{ "_id" : 2, "sku" : "bread", "description": "product 2", "instock" : 80 },{ "_id" : 3, "sku" : "cashews", "description": "product 3", "instock" : 60 },{ "_id" : 4, "sku" : "pecans", "description": "product 4", "instock" : 70 },{ "_id" : 5, "sku": null, "description": "Incomplete" },{ "_id" : 6 }
] )
orders
集合的字段 item
和来自 inventory
集合的 sku
字段,将来自 orders
的文档与来自 inventory
集合的文档联接在一起:
# orders集合的item字段 inventory集合的sku字段
db.orders.aggregate( [{$lookup:{from: "inventory",localField: "item",foreignField: "sku",as: "inventory_docs"}}
] )# 该操作对应于如下伪 SQL 语句:
SELECT *, inventory_docs
FROM orders
WHERE inventory_docs IN (SELECT *FROM inventoryWHERE sku = orders.item
)
# 输出结果为下面三行,注意最后一行,null == null 条件成立的结果
[{_id: 1,item: 'almonds',price: 12,quantity: 2,inventory_docs: [{_id: 1,sku: 'almonds',description: 'product 1',instock: 120}]},{_id: 2,item: 'pecans',price: 20,quantity: 1,inventory_docs: [{ _id: 4, sku: 'pecans', description: 'product 4', instock: 70 }]},{_id: 3,inventory_docs: [ { _id: 5, sku: null, description: 'Incomplete' }, { _id: 6 } ]}
]
聚合操作案例1
原始数据
test> db.books.find()
[{_id: ObjectId("66a5b0106ae7536b7b29723e"),title: 'book-0',type: 'literature',tag: [ 'nosql', 'popular' ],favCount: 63,author: { name: 'xx001', age: 26 }},{_id: ObjectId("66a5b0106ae7536b7b29723f"),title: 'book-1',type: 'novel',tag: [ 'popular', 'popular' ],favCount: 54,author: { name: 'xx005', age: 22 }},{_id: ObjectId("66a5b0106ae7536b7b297240"),title: 'book-2',type: 'technology',tag: [ 'popular', 'popular' ],favCount: 24,author: { name: 'xx005', age: 23 }},......
]
# 统计每个分类的book文档数量
db.books.aggregate([{$group:{_id: "$type",bookcount: {$count: {}}}},{$sort:{bookcount: -1}}
])
# 输出结果
[{ _id: 'technology', bookcount: 15 },{ _id: 'sociality', bookcount: 11 },{ _id: 'literature', bookcount: 11 },{ _id: 'novel', bookcount: 9 },{ _id: 'travel', bookcount: 7 }
]
# 标签的热度排行,标签的热度则按其关联book文档的收藏数(favCount)来计算
db.books.aggregate([{$match:{favCount: {$gt: 0}}},{$unwind:{path: "$tag",includeArrayIndex: "arrayIndex",preserveNullAndEmptyArrays: true}},{$group:{_id: "$tag",countSum: {$sum: "$favCount"}}},{$sort:{countSum: -1}}
]
- 过滤掉favCount=0的文档
- 将tag标签中的数组拆分为一个个的文档,这样一个包含3个标签的文档会被拆解为3个条目。
- 再按照tag分组,并求出各个标签订阅的总数
- 对总数排序
# 执行结果
[{ _id: 'popular', total: 1159 },{ _id: 'nosql', total: 1095 },{ _id: 'developer', total: 1014 },{ _id: 'mongodb', total: 872 },{ _id: 'document', total: 598 }
]
聚合操作案例2
导入邮政编码数据集:https://media.mongodb.org/zips.json
使用mongoimport工具导入数据
MongoDB Database Tools 工具下载路径
mongoimport -h 192.168.75.100 -d test -u hushang -p 123456 --authenticationDatabase=admin -c zips --file D:\downfile\goodle下载\zips.json
h,–host :代表远程连接的数据库地址,默认连接本地Mongo数据库;
–port:代表远程连接的数据库的端口,默认连接的远程端口27017;
-u,–username:代表连接远程数据库的账号,如果设置数据库的认证,需要指定用户账号;
-p,–password:代表连接数据库的账号对应的密码;
-d,–db:代表连接的数据库;
-c,–collection:代表连接数据库中的集合;
-f, --fields:代表导入集合中的字段;
–type:代表导入的文件类型,包括csv和json,tsv文件,默认json格式;
–file:导入的文件名称
–headerline:导入csv文件时,指明第一行是列名,不需要导入;
test> show collections
books
emp
zips
test> db.zips.countDocuments()
29353# 原始数据
test> db.zips.find()
[{_id: '01035',city: 'HADLEY',loc: [ -72.571499, 42.36062 ],pop: 4231,state: 'MA'},{_id: '01030',city: 'FEEDING HILLS',loc: [ -72.675077, 42.07182 ],pop: 11985,state: 'MA'},{_id: '01026',city: 'CUMMINGTON',loc: [ -72.905767, 42.435296 ],pop: 1484,state: 'MA'},
......
]
# 返回人口超过1000万的州
db.zips.aggregate([{$group: {_id: "$state",popSum: { $sum: "$pop" }}},{ $match: { popSum: { $gte: 1000000 } } }
])
# 返回各州中各个城市平均人口
db.zips.aggregate([{$group: {_id: { state: "$state", city: "$city" },cityPop: { $sum: "$pop" }}},{$group: {_id: "$_id.state",avgCityPop: { $avg: "$cityPop" }}}
])
# 按州返回人口最大和最小的城市
db.zips.aggregate( [# 计算出各个州下 各个城市的人数{ $group:{_id: { state: "$state", city: "$city" },pop: { $sum: "$pop" }}},# 对人数进行排序{ $sort: { pop: 1 } },# 取排序后的第一个数据和最后一个数据{ $group:{_id : "$_id.state",biggestCity: { $last: "$_id.city" },biggestPop: { $last: "$pop" },smallestCity: { $first: "$_id.city" },smallestPop: { $first: "$pop" }}},# 对输出的内容包装一下{ $project:{ _id: 0,state: "$_id",biggestCity: { name: "$biggestCity", pop: "$biggestPop" },smallestCity: { name: "$smallestCity", pop: "$smallestPop" }}}
] )
聚合优化
https://www.mongodb.com/docs/manual/core/aggregation-pipeline-optimization/
聚合优化的三大目标:
- 尽可能利用索引完成搜索和排序
- 尽早尽多减少数据量,比如使用$match
- 尽可能减少执行步骤
执行顺序
$match/$sort vs $project/$addFields
为了使查询能够命中索引,$match/$sort
步骤需要在最前面,该原则适用于MongoDB <=3.4版本。MongoDB 3.6开始具备一定的自动优化能力。
$project + $skip/$limit
$skip/$limit
应该尽可能放在$project
之前,减少$project
投影的工作量 。3.6开始自动完成这个优化。
内存排序
在没有索引支持的情况下,MongoDB最多只支持使用100MB内存进行排序。
假设总共可用内存为16GB,一个请求最多可以使用100MB内存排序,总共可以有16000/ 100= 160个请求同时执行。
内存排序消耗的不仅是内存,还有大量CPU
方案一: $sort + $limit
只排Top N ,只要N条记录总和不超过100MB即可
方案二: {allowDiskUse: true}
使用磁盘作为交换空间完成全量,超出100MB部分与磁盘交换排序
方案三: 索引排序
使用索引完成排序,没有内存限制
整合Springboot进行聚合操作
SpringBoot整合MongoDB的详细流程请参考上文《SpringBoot整合MongoDB》
MongoTemplate提供了aggregate方法来实现对数据的聚合操作。
基于聚合管道mongodb提供的可操作的内容:
基于聚合操作Aggregation.group,mongodb提供可选的表达式
案例一
返回人口超过1000万的州
db.zips.aggregate([{$group: {_id: "$state",popSum: { $sum: "$pop" }}},{ $match: { popSum: { $gte: 1000000 } } }
])
import org.springframework.data.mongodb.core.aggregation.*;
import org.springframework.data.mongodb.core.query.Criteria;/*** 返回人口超过1000万的州* 先对州进行分组* 求和,州的总人数* 筛选、匹配、过滤*/
@Test
public void testGtPop() {//$groupGroupOperation groupOperation = Aggregation.group("state").sum("pop").as("popSum");//$matchCriteria criteria = Criteria.where("popSum").gte(1000000);MatchOperation matchOperation = Aggregation.match(criteria);// 按顺序组合每一个聚合步骤TypedAggregation<Zips> zipsTypedAggregation = Aggregation.newAggregation(Zips.class,groupOperation, matchOperation);// 执行聚合操作,如果不使用 Map,也可以使用自定义的实体类来接收数据AggregationResults<Map> aggregationResults = mongoTemplate.aggregate(zipsTypedAggregation, Map.class);// 取出最终结果List<Map> mappedResults = aggregationResults.getMappedResults();mappedResults.forEach(System.out::println);}
案例二
返回各州中各个城市平均人口
db.zips.aggregate([{$group: {_id: { state: "$state", city: "$city" },cityPop: { $sum: "$pop" }}},{$group: {_id: "$_id.state",avgCityPop: { $avg: "$cityPop" }}},{$sort: {avgCityPop: -1}}
])
/*** 返回各州中各个城市平均人口* 先安装州、城市进行分组* 得到哥走中各个城市的人口* 再求平均值*/
@Test
public void testPopAvg(){//$groupGroupOperation cityGroup = Aggregation.group("state", "city").sum("pop").as("cityPop");GroupOperation avgCityGroup = Aggregation.group("_id.state").avg("cityPop").as("avgCityPop");// $sortSortOperation sortOperation = Aggregation.sort(Sort.Direction.DESC, "avgCityPop");// 按顺序组合每一个聚合步骤TypedAggregation<Zips> zipsTypedAggregation = Aggregation.newAggregation(Zips.class,cityGroup, avgCityGroup, sortOperation);// 执行聚合操作,如果不使用 Map,也可以使用自定义的实体类来接收数据AggregationResults<Map> aggregationResults = mongoTemplate.aggregate(zipsTypedAggregation, Map.class);// 取出最终结果List<Map> mappedResults = aggregationResults.getMappedResults();mappedResults.forEach(System.out::println);
}
案例三
按州返回人口最大和最小的城市
# 按州返回人口最大和最小的城市
db.zips.aggregate( [{ $group:{_id: { state: "$state", city: "$city" },pop: { $sum: "$pop" }}},{ $sort: { pop: 1 } },{ $group:{_id : "$_id.state",biggestCity: { $last: "$_id.city" },biggestPop: { $last: "$pop" },smallestCity: { $first: "$_id.city" },smallestPop: { $first: "$pop" }}},{ $project:{ _id: 0,state: "$_id",biggestCity: { name: "$biggestCity", pop: "$biggestPop" },smallestCity: { name: "$smallestCity", pop: "$smallestPop" }}},{ $sort: { state: 1 } }
] )
@Test
public void testMinMaxPop(){// $groupGroupOperation cityPopGroup = Aggregation.group("state", "city").sum("pop").as("pop");// $sortSortOperation popSort = Aggregation.sort(Sort.Direction.ASC, "pop");// $groupGroupOperation lastFirstGroup = Aggregation.group("_id.state").last("_id.city").as("biggestCity").last("pop").as("biggestPop").first("_id.city").as("smallestCity").first("pop").as("smallestPop");// $projectProjectionOperation projectionOperation = Aggregation.project("state", "biggestCity", "smallestCity").and("_id").as("state").andExpression("{ name: \"$biggestCity\", pop: \"$biggestPop\" }").as("biggestCity").andExpression("{ name: \"$smallestCity\", pop: \"$smallestPop\" }").as("smallestCity");// $sortSortOperation stateSort = Aggregation.sort(Sort.Direction.ASC, "state");// 按顺序组合每一个聚合步骤TypedAggregation<Zips> zipsTypedAggregation = Aggregation.newAggregation(Zips.class,cityPopGroup, popSort, lastFirstGroup, projectionOperation, stateSort);// 执行聚合操作,如果不使用 Map,也可以使用自定义的实体类来接收数据AggregationResults<Map> aggregationResults = mongoTemplate.aggregate(zipsTypedAggregation, Map.class);// 取出最终结果List<Map> mappedResults = aggregationResults.getMappedResults();mappedResults.forEach(System.out::println);}