Embulkとは
embulkはバッチ処理でデータのインプットから加工、出力を行うツール。
プラグインが多数作られており様々な入力、様々な加工、様々な出力を行うことができる。
たとえば入出力はDBだったりファイルだったりS3等のストレージや外部DBサービスも利用できる。
embulk-output-bibquery
は出力を担当するプラグインで、名前の通りbigqueryへの一貫した出力を行うことができる。
bigqueryへの出力時はデータを一度Google Cloud Storageに配置してからbigqueryへのロードが行われているが、詳細はソースを読むべきです( https://github.com/embulk/embulk-output-bigquery )。
リトライされない問題
ソースコードを読むと、Broken pipeによるネットワーク切断はリトライ対象となっており設定に指定した回数までは異常終了されずにリトライするはずであったが、現システムにおいて何故かリトライされずに異常終了となってしまっていた。週2,3のペースで。
(そもそもほぼ毎晩処理中にBroken pipeが発生する環境もよくないのだが)
Broken pipeはファイアーウォールによる切断でリトライすれば成功する可能性の高いものなので通常はリトライされるべきです。ということでちょっと詳細に調査していきました。
問題点
ソースコード
if ['Broken pipe', 'Connection reset', 'Connection timed out'].include?(e.message)
上記にマッチした場合にリトライされるようになっている。
システム状況
e.message -> 'Broken pipe (Write failed)'
要するにエラーメッセージで判断しようとしているけど、Broken pipeなのに予期していないメッセージなので文字列マッチでリトライ対象に判断されなくなっている。
ちなみに
ruby久々過ぎてちゃんと調べたことを書いておく。
if ['Broken pipe', 'Connection reset', 'Connection timed out'].include?(e.message)
これは配列に対する include? メソッドになっていて、 「配列の要素に引数が含まれているか」を判断しているため、文字列は完全一致で判断される。
文字列に対する include? メソッドは「文字列に引数の文字列が含まれているか」なので部分一致。
修正内容
if ['broken pipe', 'connection reset', 'connection timed out'].any? {|item| e.message.downcase.include?(item)}
あくまでリトライの判断でクリティカルな処理ではないので、広めに判断できるようにしてみた。
any? は配列の要素それぞれに続く {}
ブロックの処理を行い、いずれかが true
なら true
を返す。
大文字小文字もゆるく判断してくれる方が都合がいいので小文字にととのえてe.messageにもdowncase。
この対応でちゃんと Broken pipe (Write failed)
エラーに対してリトライされるようになりました。
ただし
こういった外部ライブラリの修正は embulk gem install
したモジュールに手を加えているので本来ならバッドパターンです。
なぜプルリク送らなかったの?と言われれば、環境依存ぽく、Java側まで見に行く余裕がなかったので・・・・