Elasticsearchの公式pythonクライアントにはBulk APIに対応するhelpers.bulk(es, actions)という機能があるが、Bulk APIと少し挙動が異なる。
helpers.bulkはactionsをデフォルト500のchunk_sizeごとに分けて実行するラッパーで、エラーが生じればその時点でBulkIndexErrorとなり、残りのchunkは実行されない。
この事実はラップされているhelpers.streaming_bulkの説明にのみ登場していて、
raise_on_error(デフォルト:True)というパラメータは「エラーが生じたときに実行していた最後のchunkにおけるエラーを、BulkIndexErrorとして発出するかどうか」を定めると記載があり、このことから残りのchunkが実行されないことが分かる。
全てのchunkを実行した上でエラーをまとめて報告してくれるわけではないので、エラーハンドリングする際は思わぬところで足をすくわれかねない。
以下に実例を示す。
from elasticsearch import Elasticsearch, helpers
es = Elasticsearch(***)
mapping = {
"mappings": {
"properties": {
"date": {"type": "date"}
}
}
}
es.indices.create(index="test_bulk_api", body=mapping)
action = {
"_op_type": 'index',
"_index": 'test_bulk_api',
"_source": {"date": "2022-01-01"}
}
bad_action = {
"_op_type": 'index',
"_index": 'test_bulk_api',
"_source": {"date": "aaa"}
}
actions = [action]*499 + [bad_action]*2 + [action]
helpers.bulk(es, actions)
BulkIndexError: ('1 document(s) failed to index.', [{'index': {'_index': 'test_bulk_api', '_type': '_doc', '_id': 'H_uX-IMBNOq4SUAzPGiD', 'status': 400, 'error': {'type': 'mapper_parsing_exception', 'reason': "failed to parse field [date] of type [date] in document with id 'H_uX-IMBNOq4SUAzPGiD'. Preview of field's value: 'aaa'", 'caused_by': {'type': 'illegal_argument_exception', 'reason': 'failed to parse date field [aaa] with format [strict_date_optional_time||epoch_millis]', 'caused_by': {'type': 'date_time_parse_exception', 'reason': 'date_time_parse_exception: Failed to parse with all enclosed parsers'}}}, 'data': {'date': 'aaa'}}}])
デフォルトのchunk sizeは500なので、500番目でエラーが起こると、501番目のbad_actionは実行されず、結果としてエラーにも返ってこない。502番目のactionが実行されなかったことも、このBulkIndexErrorからだけでは読み取れない。
したがって、「actionsを全て実行した上でエラーを漏れなく集めて何か処理をしたい」といったときは対策が必要。
方法1. chunk sizeを増やす(actionsをchunk_size以下にする)
helpers.bulk(es, actions, chunk_size=1000)
circuit_breakerに引っかからないかや、max_chunk_bytesの制約にかからないか注意。
方法2. raise_on_error=Falseにする
helpers.bulk(es, actions, raise_on_error=False)
(500,
[{'index': {'_index': 'test_bulk_api',
'_type': '_doc',
'_id': 'wNGv-IMBQg2WwLVBKlg_',
'status': 400,
'error': {'type': 'mapper_parsing_exception',
'reason': "failed to parse field [date] of type [date] in document with id 'wNGv-IMBQg2WwLVBKlg_'. Preview of field's value: 'aaa'",
'caused_by': {'type': 'illegal_argument_exception',
'reason': 'failed to parse date field [aaa] with format [strict_date_optional_time||epoch_millis]',
'caused_by': {'type': 'date_time_parse_exception',
'reason': 'date_time_parse_exception: Failed to parse with all enclosed parsers'}}}}},
{'index': {'_index': 'test_bulk_api',
'_type': '_doc',
'_id': '1eyv-IMBOaj2lBpBKmt_',
'status': 400,
'error': {'type': 'mapper_parsing_exception',
'reason': "failed to parse field [date] of type [date] in document with id '1eyv-IMBOaj2lBpBKmt_'. Preview of field's value: 'aaa'",
'caused_by': {'type': 'illegal_argument_exception',
'reason': 'failed to parse date field [aaa] with format [strict_date_optional_time||epoch_millis]',
'caused_by': {'type': 'date_time_parse_exception',
'reason': 'Failed to parse with all enclosed parsers'}}}}}])
ラッパー側でtry catchをやって集計して返してくれる形になるので、helpers.bulk
のエラーをハンドリングしたいのであれば、自前でtry catchを書くよりraise_on_error=False
として、こちらを利用するのが良いはず。
とはいえこれもactionを丸ごと返すわけではなく、例えば"version"といったパラメータは返ってこない上に、成功と失敗でまとめられてしまい何番目のactionに対応したエラーか分からないので、個々のactionに対応した結果を確実に得たい場合はelasticsearch.helpers.streaming_bulkを利用する必要がある。
なお、raise_on_errorに似たようなraise_on_exception
というパラメータもあるのですが、役割がよく分かりませんでした。。
raise_on_exception
– ifFalse
then don’t propagate exceptions from call tobulk
and just report the items that failed as failed.
おまけ
max_retries
という、circuit_breaker_exception
などの429エラーの際に自動で再試行してくれるパラメータもあるので、こちらも設定しておくと良いと思います。