起きた状況
以前書いた
elasticsearchのlogstashで同一IDのデータを更新/マージする方法(updateとdoc_as_upsert)
でデータ処理をしていたところ、
タグを付与する更新処理がされていないデータが出てきた。
更新処理用のCSVはlogstashで読み込まれているが、一部が反映されていない。
また、Grafanaでタグの付与されないデータでグラフ化し、10秒で表示更新して、
タグの付与状況をリアルタイムでずっと眺めていると、付与自体も結構遅いデータがある。
最初はelasticsearchのキューのrejectを疑った
fluentd -> Elasticsearch 大量データ転送でトラブル
の「原因2: 処理キューが閾値を超えた場合にリクエストを破棄する」とか
Elasticsearch はデフォルトで閾値を超えたキューを破棄する設定となっています。 例えば index の作成は 50 個まで、bulk 処理は 20 個までといった具合。
メモリの消費を抑え適切に管理するための設定ではあるのですが、 この設定により作られるハズの index が作成されなかったり、 投入されるハズの record が作成されなかったりします。
es_rejected_execution_exceptionが出たのでthread_pool.bulk.queue_sizeを増やした
es_rejected_execution_exceptionはElasticsearchが持っているスレッドプールのキューサイズを超えるbulk requestがきたので、それをrejectした、つまりログがElasticsearchに一部入らなかったということです。
とかの情報にあるような、キューのrejectを疑った。
でも、ログ見てもrejectはないし、
Elasticsearchのcat thread pool
に出てるキューの確認コマンド打ってもrejectの痕跡はない
curl -XGET 'localhost:9200/_cat/thread_pool?pretty'
versionのconflict
logstashの方にはなかったけど、elasticsearchの方を見ていたら、
retry_on_conflict=>1
というのが目についた。
ちょっと他の部分を覚えていないのがよくないのだが、
elasticのディスカッションの下記にあったのような感じだった
Version conflict, document already exists (current version [1])
[2018-07-09T15:10:44.971-0400][WARN ][logstash.outputs.elasticsearch] Failed action. {:status=>409, :action=>["update", {:_id=>"f4:4d:30:60:8a:31", :_index=>"state_mac", :_type=>"state", :_routing=>nil, :_retry_on_conflict=>1}, 2018-07-09T19:09:45.000Z %{host} %{message}], :response=>{"update"=>{"_index"=>"state_mac", "_type"=>"state", "_id"=>"f4:4d:30:60:8a:31", "status"=>409, "error"=>{"type"=>"version_conflict_engine_exception", "reason"=>"[state][f4:4d:30:60:8a:31]: version conflict, document already exists (current version [1])", "index_uuid"=>"huFaDcR5RgeG92F5S8F9kw", "shard"=>"2", "index"=>"state_mac"}}}}
API
最初はconflictを無視して強制上書きするオプションを探した。
APIだと、下記のような情報が見つかる
Update By Query APIに記述がある。
If you want to simply count version conflicts, and not cause the _update_by_query to abort, you can set conflicts=proceed on the url or "conflicts": "proceed" in the request body.
Reindexが登場!とelasticのブログで日本語でもあった
先ほどと同じように、この処理中にドキュメントを更新しようとすると、失敗しますが、そのような場合には、失敗した場所からリトライするような機能もあります。もしあなたが、bananasタグがあった場合にchocolateタグを追加し、同時にアップデートするようなアプリケーションを作っている場合、_update_by_queryでバージョンの矛盾を安全に無視できます。conflicts=proceedと設定します。これはバージョンの矛盾を数え、アップデートを継続します。
ただ、logstashのelasticsearchのoutputプラグインでは上記を行う方法がないようだった。
logstashの場合の対処法
logstashのelasticsearchのプラグインには、retry_on_conflictというオプションがある。
デフォルトが1なので、
これを増やすと、conflict時にリトライが行われ、ちゃんと更新される確率が上がる。
retry_on_conflict
retry_on_conflict
・Value type is number
・Default value is 1
The number of times Elasticsearch should internally retry an update/upserted document See the partial updates for more info
上記に関連して、下記2つの設定もある。
デフォルトは、リトライは2秒、4秒、8秒と倍々していき、64秒を上限にretry_on_conflictの回数を繰り返す。
retry_initial_interval
・Value type is number
・Default value is 2
Set initial interval in seconds between bulk retries. Doubled on each retry up to retry_max_interval
retry_max_interval
・Value type is number
・Default value is 64
Set max interval in seconds between bulk retries.
logstashの書き方サンプル
elasticsearchのlogstashで同一IDのデータを更新/マージする方法(updateとdoc_as_upsert)のUpdate設定にretry_on_conflictで5回のリトライを追加したサンプル
output {
elasticsearch {
hosts => ["testserver:9200"]
index => "testindex"
document_id => "%{hogeid}"
doc_as_upsert => true
action => "update"
retry_on_conflict => 5
}
}
参考:elasticのドキュメントでこんなのもあった
Updates and Conflicts
その他
logstashのretry_on_conflictに加えて、下記も行った。
元データの更新を行うタグ付与の対象が、
過去5分の追加データ全てで、複数回タグ付与が行われ、conflictが起きやすい状況だった。
タグ付与の対象を、まだタグの付与されていないデータに限定するようにクエリ変更も行った。
logstashのリトライと更新のconflictが起きやすいデータ更新を見直したことで、conflictによるデータ欠落は亡くなった。