批量插入
insert_one
这种方法在数据量较小时可以很好的工作,但是当数据量非常大时,此种操作会非常慢,我们需要通过批量写入的方式来写入数据。
insert_many
参数:
- documents
- ordered :为True时,迫使MongoDB按顺序同步插入数据;为False,MongoDB会并发的不按固定顺序进行批量插入。显然当我们对性能有要求时,将该参数设为False是非常必要的。
- bypass_document_validation : MongoDB3.2之后加入了document validation功能,用于
验证写入的文档是否符合collection制定的规则
,具体可以参考reference中的链接。而既然是验证就肯定需要花费时间,当我们对性能有极致要求时,也可以将此参数设为True,从而越过验证,直接写入。 - session
批量更新
前面的例子在插入操作时非常有效,但是对于更新操作由于update_many
无法针对每一个doc
进行更新,如本例中针对每一个uid
进行更新,那么就需要使用bulk_write
操作。
from pymongo import UpdateOne
update_operations = []
for uid, user_data in user_dict.items():
op = UpdateOne({'uid': uid}, {'$set': {'user_data': user_data}}, upsert=True)
update_operations.append(op)
user_collection.bulk_write(update_operations, ordered=False, bypass_document_validation=True)
批量读取
批量读取我们可以使用$in
操作符,但是需要注意的是如果$in
针对的list 过大
,那么可能会导致报错pymongo.errors.DocumentTooLarge
, 目前我的做法是将大的 list 分割成1000个一段,然后分段查询
list_length = len(uid_list)
iter_size = 1000
current = 0
while current < list_length:
end = current + iter_size
uid_segment = uid_list[current: end]
result_cursor = mongo_collection.find({"uid": {"$in": uid_segment}})
for user_info in result_cursor:
# do something
...
current = current + iter_size
异常处理
在实践过程中,会遇到异常的情况,尤其是写入的时候,可能由于各种原因导致写入失败,因此需要catch exception,并打印详细信息,如下:
try:
user_collection.insert_many(
data_iter, ordered=False, bypass_document_validation=True)
except BulkWriteError as e:
log.error(e.details)