上一篇简单介绍了mongo的增删改查,今天来聊聊mongo的聚合操作
什么是 MongoDB 聚合框架
MongoDB 聚合框架(Aggregation Framework)是一个计算框架,它可以:
• 作用在一个或几个集合上;
• 对集合中的数据进行的一系列运算;
• 将这些数据转化为期望的形式;
相当于 SQL 查询中的(GROUP BY,LEFT OUTER JOIN,AS等)
管道(Pipeline)和步骤(Stage)
整个聚合运算过程称为管道(Pipeline),它是由多个步骤(Stage)组成的,每个管道:
• 接受一系列文档(原始数据);
• 每个步骤对这些文档进行一系列运算;
• 结果文档输出给下一个步骤;
可以想象下java的stream流,对数据进行一系列处理后输出最终结果
聚合运算的基本格式
pipeline = [$stage1, $stage2, ...$stageN];
db.<COLLECTION>.aggregate(
pipeline,
{ options }
);
常见步骤
步骤 | 作用 | SQL等价运算符 |
---|---|---|
$match | 过滤 | WHERE |
$project | 投影 | AS |
$sort | 排序 | ORDER BY |
$group | 分组 | GROUP BY |
$skip/$limit | 结果限制 | SKIP/LIMIT |
$lookup | 左外连接 | LEFT OUTER JOIN |
$match
$match功能类似find,只是在聚合操作中,match对数据筛选之后可以进行其他聚合操作;聚合操作建议一开始就使用match过滤数据,match可以同find一样使用索引
$match同find返回游标,如果一个游标已经遍历完,则会自动关闭;如果没有遍历完,则需要手动调用 close()方法,否则该游标将在服务器上存在 10 分钟(默认值)后超时释放,以防造成不必要的资源浪费,若10分支内不能遍历完游标,建议根据条件过滤后分批次获取数据
值得注意的是,在使用聚合时,每个管道默认允许占用的内存不超过 100M,否则会抛出错误。所以绝大多数情况下我们都应该从一个 match 管道开始,将操作的文档首先收缩到一个合理的范围内(允许命中索引),以确保 Mongo 可以高效地完成后续的统计变换等步骤。有必要的话还可以指定使用磁盘来缓存结果,避免内存耗尽无法完成(这只是一个容错的方案,并非一个好的方案)。
db.collection.aggregate(pipelines, { allowDiskUse: true })
$match同find不支持在查询中直接使用聚合操作,需要使用时得借用$expr操作
$expr不支持多键索引
$expr可以和$cond一起使用,完成条件查询
对以下文档,qty大于100,price打5折,小于100打75折; 取得price最终结果大于5的文档数据
db.supplies.insertMany([
{ "_id" : 1, "item" : "binder", "qty" : NumberInt("100"), "price" : NumberDecimal("12") },
{ "_id" : 2, "item" : "notebook", "qty" : NumberInt("200"), "price" : NumberDecimal("8") },
{ "_id" : 3, "item" : "pencil", "qty" : NumberInt("50"), "price" : NumberDecimal("6") },
{ "_id" : 4, "item" : "eraser", "qty" : NumberInt("150"), "price" : NumberDecimal("3") },
{ "_id" : 5, "item" : "legal pad", "qty" : NumberInt("42"), "price" : NumberDecimal("10") }
])
// Aggregation expression to calculate discounted price
let discountedPrice = {
$cond: {
if: { $gte: ["$qty", 100] },
then: { $multiply: ["$price", NumberDecimal("0.50")] },
else: { $multiply: ["$price", NumberDecimal("0.75")] }
}
};
// Query the supplies collection using the aggregation expression
db.supplies.find( { $expr: { $lt:[ discountedPrice, NumberDecimal("5") ] } });
//match例子
db.articles.aggregate( [
{ $match: { $or: [ { score: { $gt: 70, $lt: 90 } }, { views: { $gte: 1000 } } ] } },
{ $group: { _id: null, count: { $sum: 1 } } }
] );
$lookup
lookup相当于左连接,3.6.0版本后新增加了不相关子查询的功能
首先看看相关字段的左连接,格式如下
{
$lookup:
{
from: <collection to join>,
localField: <field from the input documents>,
foreignField: <field from the documents of the "from" collection>,
as: <output array field>
}
}
select a.c_name as name from a left join b on a.id = b.did
from 相当于 b,不能被分片
localField 相当于 a.id,若文档没有改字段,视为对null连接
foreignField 相当于 b.did,若文档没有改字段,视为对null连接
as 不太相当于 as ,因为是给整个结果命名,若a文档已有这个字段名,字段被覆盖
db.orders.insert([
{ "_id" : 1, "item" : "almonds", "price" : 12, "quantity" : 2 },
{ "_id" : 2, "item" : "pecans", "price" : 20, "quantity" : 1 },
{ "_id" : 3 }
])
db.inventory.insert([
{ "_id" : 1, "sku" : "almonds", description: "product 1", "instock" : 120 },
{ "_id" : 2, "sku" : "bread", description: "product 2", "instock" : 80 },
{ "_id" : 3, "sku" : "cashews", description: "product 3", "instock" : 60 },
{ "_id" : 4, "sku" : "pecans", description: "product 4", "instock" : 70 },
{ "_id" : 5, "sku": null, description: "Incomplete" },
{ "_id" : 6 }
])
orders对inventory执行连接后的结果
db.orders.aggregate([
{
$lookup:
{
from: "inventory",
localField: "item",
foreignField: "sku",
as: "inventory_docs"
}
}
])
//result
{
"_id" : 1,
"item" : "almonds",
"price" : 12,
"quantity" : 2,
"inventory_docs" : [
{ "_id" : 1, "sku" : "almonds", "description" : "product 1", "instock" : 120 }
]
}
{
"_id" : 2,
"item" : "pecans",
"price" : 20,
"quantity" : 1,
"inventory_docs" : [
{ "_id" : 4, "sku" : "pecans", "description" : "product 4", "instock" : 70 }
]
}
{
"_id" : 3,
"inventory_docs" : [
{ "_id" : 5, "sku" : null, "description" : "Incomplete" },
{ "_id" : 6 }
]
}
在/$lookup阶段,from对应的文档不能被分片,如果要将要分片集合与未分片集合连接在一起,可以拿分片集合去连接未分片集合。文档设计的时候尽可能内嵌对象(冗余)来消除使用连接的情况
db.shardedCollection.aggregate([
{ $lookup: { from: "unshardedCollection", ... } }
])
在官方例子可以看到一个有趣的例子
db.orders.aggregate([
{
$lookup: {
from: "items",
localField: "item", // field in the orders collection
foreignField: "item", // field in the items collection
as: "fromItems"
}
},
{
$replaceRoot: { newRoot: { $mergeObjects: [ { $arrayElemAt: [ "$fromItems", 0 ] }, "$$ROOT" ] } }
},
{ $project: { fromItems: 0 } }
])
//里面有replaceRoot,mergeObjects,$$ROOT,留到下一篇聊聊,顺便聊下mongoDB Compass的聚合查询用法
接下来不相关子查询(除了单个字段匹配连接外,允许其他连接条件)
{
$lookup:
{
from: <collection to join>,
let: { <var_1>: <expression>, …, <var_n>: <expression> },
pipeline: [ <pipeline to execute on the collection to join> ],
as: <output array field>
}
}
let: a文档的字段先进行声明,使用"$$<variable>"语法;可选值
pipeline: 连接条件,不允许出现$out与$merge
db.orders.insert([
{ "_id" : 1, "item" : "almonds", "price" : 12, "ordered" : 2 },
{ "_id" : 2, "item" : "pecans", "price" : 20, "ordered" : 1 },
{ "_id" : 3, "item" : "cookies", "price" : 10, "ordered" : 60 }
])
db.warehouses.insert([
{ "_id" : 1, "stock_item" : "almonds", warehouse: "A", "instock" : 120 },
{ "_id" : 2, "stock_item" : "pecans", warehouse: "A", "instock" : 80 },
{ "_id" : 3, "stock_item" : "almonds", warehouse: "B", "instock" : 60 },
{ "_id" : 4, "stock_item" : "cookies", warehouse: "B", "instock" : 40 },
{ "_id" : 5, "stock_item" : "cookies", warehouse: "A", "instock" : 80 }
])
聚合查询与输出结果
db.orders.aggregate([
{
$lookup:
{
from: "warehouses",
let: { order_item: "$item", order_qty: "$ordered" },
pipeline: [
{ $match:
{ $expr:
{ $and:
[
{ $eq: [ "$stock_item", "$$order_item" ] },
{ $gte: [ "$instock", "$$order_qty" ] }
]
}
}
},
{ $project: { stock_item: 0, _id: 0 } }
],
as: "stockdata"
}
}
])
//result
{ "_id" : 1, "item" : "almonds", "price" : 12, "ordered" : 2,
"stockdata" : [ { "warehouse" : "A", "instock" : 120 },
{ "warehouse" : "B", "instock" : 60 } ] }
{ "_id" : 2, "item" : "pecans", "price" : 20, "ordered" : 1,
"stockdata" : [ { "warehouse" : "A", "instock" : 80 } ] }
{ "_id" : 3, "item" : "cookies", "price" : 10, "ordered" : 60,
"stockdata" : [ { "warehouse" : "A", "instock" : 80 } ] }
orders与warehouses文档,满足item字段与instock字段相等,且ordered字段大于等于instock字段的数据,筛选出warehouses文档,进行投影后将数据命名为stockdata插入orders文档
db.absences.aggregate([
{
$lookup:
{
from: "holidays",
pipeline: [
{ $match: { year: 2018 } },
{ $project: { _id: 0, date: { name: "$name", date: "$date" } } },
{ $replaceRoot: { newRoot: "$date" } }
],
as: "holidays"
}
}
])
官方的第二个例子,简单来说就是absences中插入holidays文档内容,内容为year为2018,将结果替换为data对象中包含name和date,并将对象成为最终holidays文档。