在MongoTemplate中使用带merge的聚合操作时,发现返回了into表的全表查询结果。实际使用merge通常是要进行聚合插入,不需要查询结果。
去除返回结果需要给Aggregation添加skipOutput选项,具体如下:
@Test
void mergeTest() throws InterruptedException {
// 添加skipOutput去除merge返回结果
Aggregation aggregation = Aggregation.newAggregation(
Aggregation.match(Criteria.where("a").is("b")),
Aggregation.merge().intoCollection("newCollection").on("_id").build()
).withOptions(AggregationOptions.builder().skipOutput().build());
// 下面两种均适用
reactiveMongoTemplate.aggregate(aggregation, "collection", Document.class)
.subscribe(document -> System.out.println(document));
List<Document> mappedResults = mongoTemplate.aggregate(aggregation, "collection", Document.class)
.getMappedResults();
}
正常使用数据库时,聚合操作中使用merge并不会返回查询结果,造成返回into表查询结果的原因是template在merge和out聚合操作后添加了find操作,即进行了两次请求
添加的位置分别在AggregateIterableImpl和AsyncAggregateIterableImpl中
class AggregateIterableImpl<TDocument, TResult> extends MongoIterableImpl<TResult> implements AggregateIterable<TResult> {
public ReadOperation<BatchCursor<TResult>> asReadOperation() {
MongoNamespace outNamespace = getOutNamespace();
// 此处判断是否含有merge或out
if (outNamespace != null) {
getExecutor().execute(operations.aggregateToCollection(pipeline, maxTimeMS, allowDiskUse, bypassDocumentValidation, collation,
hint, comment, aggregationLevel), getReadConcern(), getClientSession());
FindOptions findOptions = new FindOptions().collation(collation);
Integer batchSize = getBatchSize();
if (batchSize != null) {
findOptions.batchSize(batchSize);
}
// 此处添加findOptions
return operations.find(outNamespace, new BsonDocument(), resultClass, findOptions);
} else {
return operations.aggregate(pipeline, resultClass, maxTimeMS, maxAwaitTimeMS, getBatchSize(), collation,
hint, comment, allowDiskUse, aggregationLevel);
}
}
}
class AsyncAggregateIterableImpl<TDocument, TResult> extends AsyncMongoIterableImpl<TResult> implements AsyncAggregateIterable<TResult> {
AsyncReadOperation<AsyncBatchCursor<TResult>> asAsyncReadOperation() {
MongoNamespace outNamespace = getOutNamespace();
// 此处判断是否含有merge或out
if (outNamespace != null) {
AsyncWriteOperation<Void> aggregateToCollectionOperation =
operations.aggregateToCollection(pipeline, maxTimeMS, allowDiskUse, bypassDocumentValidation, collation, hint, comment,
aggregationLevel);
FindOptions findOptions = new FindOptions().collation(collation);
Integer batchSize = getBatchSize();
if (batchSize != null) {
findOptions.batchSize(batchSize);
}
AsyncReadOperation<AsyncBatchCursor<TResult>> findOperation =
operations.find(outNamespace, new BsonDocument(), resultClass, findOptions);
// 此处添加findOptions
return new WriteOperationThenCursorReadOperation<TResult>(aggregateToCollectionOperation, findOperation);
} else {
return operations.aggregate(pipeline, resultClass, maxTimeMS, maxAwaitTimeMS, getBatchSize(), collation,
hint, comment, allowDiskUse, aggregationLevel);
}
}
}