この記事について
この記事は先日投稿した記事の続きです。
今回はpymongoのbulk_write(一括書き込み)について書いていきます。
bulk_writeとは?
- dbへの書き込みを1件ごとにクエリを作成し、書き込むのではなく、クエリを大量に生成しておき、bulk_write関数で一括書き込みを行うといったもので、dbの書き込みをまとめて行い、ネットワークの往復を減らすことで、スループットを向上を図るのにとても便利な操作です
。
参照:pymongo 3.9.0 document:Bulk Write Operations
どんな操作が可能か?
下記が可能な操作で、通常と同じ操作が使えます。
- InsertOne ※複数の挿入はinsert_manyで可能
- ReplaceOne,
- UpdateOne, UpdateMany,
- DeleteOne, DeleteMany
操作方法
基本的に操作ごとにオブジェクトを作成し、リストにまとめてbulk_write関数に渡すだけです。
普通に書き込みを行う
※ 書き込みの順番はオプションを付けなかった場合、デフォルトで、リストの1番目から順に書き込み操作が実行されていきます。
(不正な操作をした場合は、不正が発生するまでの書き込みがされる。)
main.py
from pprint import pprint
from pymongo import MongoClient
from pymongo import UpdateOne,InsertOne
from pymongo.errors import BulkWriteError
client = MongoClient()
db = client["Collection"]["table"]
# dbのドキュメント全件削除
# db.delete_many({})
# _idとxをインクリメントして1000件のドキュメントを挿入する
opList = [ InsertOne({"_id":i,"x":i}) for i in range(0,1000)]
# 書き込み成功時は戻り値から、
# 例外発生時はスローされたエラーから詳細が取得できる
try:
result = db.bulk_write(opList)
print("正常終了時")
pprint(result.bulk_api_result)
except BulkWriteError as bwe:
print("例外発生時")
pprint(bwe.details)
'''
{'writeErrors': [],
'writeConcernErrors': [],
'nInserted': 1000,
'nUpserted': 0,
'nMatched': 0,
'nModified': 0,
'nRemoved': 0,
'upserted': []}
'''
順番を気にせず書き込みを行う場合
bulk_writeのオプションでorderd = Falseを指定するだけ
(不正な操作をしていても全ての操作が試され、戻り値 or 例外から詳細がわかる。)
- まず、上記のスクリプトを実行した後で、さらに_id 500~1500までのドキュメントをインクリメントしながらインサートしてみる
(順番通りにインサートされるので、一件も挿入できずにエラーが出るはず。)
main.py
# _idとxをインクリメントして1000件のドキュメントを挿入する
opList = [ InsertOne({"_id":i,"x":i}) for i in range(500,1500)]
# 書き込み成功時は戻り値から、例外発生時はスローされたエラーから
# 詳細が取得できる
try:
result = db.bulk_write(opList)
print("正常終了時")
pprint(result.bulk_api_result)
except BulkWriteError as bwe:
print("例外発生時")
pprint(bwe.details)
# 出力結果
# (1件目のドキュメントの挿入で失敗して以降の書き込みができていないことがわかる)
'''
例外発生時
{'nInserted': 0,
'nMatched': 0,
'nModified': 0,
'nRemoved': 0,
'nUpserted': 0,
'upserted': [],
'writeConcernErrors': [],
'writeErrors': [{'code': 11000,
'errmsg': 'E11000 duplicate key error collection: '
'Collection.table index: _id_ dup key: { _id: 500 '
'}',
'index': 0,
'keyPattern': {'_id': 1},
'keyValue': {'_id': 500},
'op': {'_id': 500, 'x': 500}}]}
'''
- 次に,bulk_writeのオプションでorderd = Falseにして一括書き込みを行う
main.py
# _idとxをインクリメントして1000件のドキュメントを挿入する
opList = [ InsertOne({"_id":i,"x":i}) for i in range(500,1500)]
# 書き込み成功時は戻り値から、例外発生時はスローされたエラーから
# 詳細が取得できる
try:
result = db.bulk_write(opList,ordered=False)
print("正常終了時")
pprint(result.bulk_api_result)
except BulkWriteError as bwe:
print("例外発生時")
pprint(bwe.details)
# 例外がスローされるが、500件のドキュメントがインサートできて、
# 失敗した書き込み1つ1つの失敗した理由が取得できているのが確認できる
'''
例外発生時
{'nInserted': 500,
'nMatched': 0,
'nModified': 0,
'nRemoved': 0,
'nUpserted': 0,
'upserted': [],
'writeConcernErrors': [],
'writeErrors': [{'code': 11000,
'errmsg': 'E11000 duplicate key error collection: '
'Collection.table index: _id_ dup key: { _id: 500 '
'}',
'index': 0,
'keyPattern': {'_id': 1},
'keyValue': {'_id': 500},
'op': {'_id': 500, 'x': 500}},
{'code': 11000,
'errmsg': 'E11000 duplicate key error collection: '
'Collection.table index: _id_ dup key: { _id: 501 '
'}',
'index': 1,
'keyPattern': {'_id': 1},
'keyValue': {'_id': 501},
'op': {'_id': 501, 'x': 501}},
{'code': 11000,
'errmsg': 'E11000 duplicate key error collection: '
'Collection.table index: _id_ dup key: { _id: 502 '
'}',
~~~~以下略 ~~~~
'''
pymongoのbulk_writeは大体こんな感じです。
要望があれば、いろいろ追記していきます。