EmbulkDay 10

Embulkのリジューム実行/差分実行について

More than 3 years have passed since last update.

Embulk Advent Calendar 2015の10日目の記事になります。Treasure Dataの赤間です。

今日はEmbulkの

* リジューム実行

* 差分実行

について書いてみたいと思います。

どちらも大量のデータをロードする際に使える機能ですが、以下のような違いがあります。

リジューム実行

Embulkの場合成功したタスクの情報をファイルに書き込むことができ、1番目と4番目のタスクが成功したというような情報が残るので、リジューム実行した場合は2番目と3番目のタスクのみを実行するというものです。

-rオプションを使用して実行する

なお頻繁にタスクが失敗してリジューム実行をする必要がないようにプラグイン側(Input/Output/FileInput/FileOutput)でリトライ処理を実装することも大事なポイントです。

その辺りは2日目にEmbulkプラグインのリトライ/例外処理の実装についてという記事を書いたので参考にしてください。

差分実行

Embulkをスケジュールで定期実行している時に前回ロードした分までをスキップして、差分ロードしたい場合に使える

-oオプションを使用して実行する


リジューム実行

まずはリジューム実行についてです。

Embulkの場合は以下のようなコマンドで-r(--resume-state)オプションを使用して実行可能です。

# 初回実行

embulk run config.yml -r resume-state.yml
# コケたところから再実行(resume-state.ymlに成功したタスクの情報が書かれている)
embulk run config.yml -r resume-state.yml
# 諦めてcleanup処理
embulk cleanup config.yml -r resume-state.yml

ロードが一部失敗した場合には、resume-state.ymlに以下のように成功したタスクの情報が保存されます。

これはGoogle Cloud Storageに入っているgzip圧縮されたCSV3ファイルの中の1ファイル(sample_03.csv.tar.gz)のみ他の2ファイルとは違うフォーマットで作成し、CSV Parserのstop_on_invalid_recordをtrueにしてパースエラーが発生した場合は処理を中断するオプションを指定して実行したものです。

(stop_on_invalid_recordはデフォルトではfalseなので通常はパースエラーが発生しても該当行のパースをskipしてwarningを出すだけで処理は中断しません)

ちょっと長いのですがInput(embulk-input-gcs)/Output(stdout)プラグインの設定やparserのスキーマ定義が続いた後、一番下に1番目、2番目のタスクが成功したという情報が書かれています。

exec_task: {transaction_time: '2015-12-10 09:58:00.554 UTC'}

in_task:
DecoderTaskSources:
- {}
DecoderConfigs:
- {type: gzip}
ParserTaskSource:
DefaultTimeZone: UTC
SchemaConfig:
- {name: id, type: long}
- {name: account, type: long}
- {format: '%Y-%m-%d %H:%M:%S', name: time, type: timestamp}
- {format: '%Y%m%d', name: purchase, type: timestamp}
- {name: comment, type: string}
Charset: UTF-8
SkipHeaderLines: 1
Newline: CRLF
DelimiterChar: ','
HeaderLine: true
StopOnInvalidRecord: true
MaxQuotedSizeLimit: 131072
DefaultTimestampFormat: '%Y-%m-%d %H:%M:%S.%N %z'
CommentLineMarker: null
TrimIfNotQuoted: false
QuoteChar: '"'
AllowOptionalColumns: false
NullString: null
AllowExtraColumns: false
EscapeChar: \
FileInputTaskSource:
LastPath: null
P12KeyfileFullpath: null
ApplicationName: Embulk GCS input plugin
Bucket: my-bucket
P12Keyfile: null
PathPrefix: sample
Files: [sample_01.csv.tar.gz, sample_02.csv.tar.gz, sample_03.csv.tar.gz]
ServiceAccountEmail: null
AuthMethod: json_key
JsonKeyfile: {base64: ABCDEFGHIJK}
ParserConfig:
charset: UTF-8
newline: CRLF
type: csv
delimiter: ','
quote: '"'
header_line: true
stop_on_invalid_record: true
columns:
- {name: id, type: long}
- {name: account, type: long}
- {name: time, type: timestamp, format: '%Y-%m-%d %H:%M:%S'}
- {name: purchase, type: timestamp, format: '%Y%m%d'}
- {name: comment, type: string}
out_task: {TimeZone: UTC}
in_schema:
- {index: 0, name: id, type: long}
- {index: 1, name: account, type: long}
- {index: 2, name: time, type: timestamp}
- {index: 3, name: purchase, type: timestamp}
- {index: 4, name: comment, type: string}
out_schema:
- {index: 0, name: id, type: long}
- {index: 1, name: account, type: long}
- {index: 2, name: time, type: timestamp}
- {index: 3, name: purchase, type: timestamp}
- {index: 4, name: comment, type: string}
in_reports:
- {}
- {}
- null
out_reports:
- {}
- {}
- null

全てのタスクが失敗してしまった場合には、resume-state.ymlは作成されません。

リジューム実行を諦めて中間データを掃除したい場合にはrunではなくcleanupを実行します。

リジューム処理はInput/FileInputプラグインであれば特に何もしなくても対応できます。

Output/FileOutputプラグインの場合は1タスクのrunが冪等であれば対応できます。

例としてOutputプラグインであるembulk-output-mysqlではmodeオプションで以下の6つのモードを実装しています。

* insert

* insert_direct (※)

* truncate_insert

* merge

* merge_direct (※)

* replace

このうち(※)を付けた*_directは、タスクがリジュームされるとデータが2回ロードされてしまう可能性があるためリジューム実行に対応できません。

その他のmodeは全タスクが成功して最後にトランザクションがcommitされるまでは何もデータを書き込んだことにならないので、リジュームすることが可能という形になっています。


差分実行

差分実行はembulkをcronでスケジュール実行していたりする場合に使えます。

実行する手順はScheduled bulk data loading to Elasticsearch + Kibana 4 from CSV filesのチュートリアルにもあるのですが...

0 * * * * embulk run /path/to/next-config.yml -o /path/to/next-config.yml

というように-o(--output)オプションを使用して実行します。

こちらはInput/FileInputプラグイン側の対応が必要となります。

プラグイン側での実装がどうなっているのかを、比較的対応しやすいオブジェクトストレージからのInputを例に見てみます。

下のコードはAWS S3のバケットからファイルの一覧をprefixを指定して取得するメソッドです(元のコードはここ)。

1. 最初はlastKeyがnullになっています。差分実行した場合にはconfig.ymlのlast_pathオプションにファイルパスが入ってくるのでそれがメソッドの引数で渡されます。

2. ループの中でol.getNextMarker()で次のファイルパスが取得できるのでこれをlastKeyに代入

3. 以下ol.getNextMarker()の戻り値がnullになるまで繰り返し

public static void listS3FilesByPrefix(FileList.Builder builder, AmazonS3Client client, String bucketName, String prefix, Optional<String> lastPath)

{
// 最初はnull、last_pathを指定して差分実行された場合はconfig.ymlのlast_pathの値がメソッドの引数経由で渡される
String lastKey = lastPath.orNull();
do {
ListObjectsRequest req = new ListObjectsRequest(bucketName, prefix, lastKey, null, 1024);
ObjectListing ol = client.listObjects(req);
for (S3ObjectSummary s : ol.getObjectSummaries()) {
if (s.getSize() > 0) {
builder.add(s.getKey(), s.getSize());
if (!builder.needsMore()) {
return;
}
}
}
// AWS SDKを使ってnextMarkerを取得しlastKeyに代入
lastKey = ol.getNextMarker();
} while(lastKey != null);
}

AWSのSDKの場合はol.getNextMarker()の戻り値がString、それもバケット内のファイルパスというとても素直な実装なので外からInjectするのも簡単なのですが、他サービスのオブジェクトストレージのSDKの場合は謎のオブジェクトが返ってきてユーザが外からInjectできなかったりします。

そういった場合は差分実行に対応できないということになります。


まとめ

Embulk実行基盤の話などは今後知見が貯まっていくと思いますが、大量のデータに対してバルクロードを実行した際に手動調査&リトライ等を避けるためにリジューム実行や差分実行は是非活用したいところです。

来週12/15(火)のEmbulk Meetup #2で私もTreasure DataのEmbulk実行基盤周り等の話をさせて頂こうかと思っています。

まだ定員には余裕があるので是非ご参加下さい。

その前日12/14(月)のWorkflow Hacks! #1でも関連する話は出ると思うのでご参加をお待ちしています。