Embulkはオープンソースのバルクデータ転送ソフトウエアである。
Fluentdと同様、Ruby(JRuby)によって入力、出力、変換等のプラグインを容易に開発可能。
すでにたくさんの記事が公開されているが、まだ絶賛開発中のプロダクトということもあって内容が断片的だったり古かったりといった様子なので、自分なりに現状を調べてみた結果を書いておく。
環境
- Embulk v0.6.2
- JRE 1.8.0_40
- Mac OS X 10.10.3
ドキュメントはこちら
http://www.embulk.org/docs/
インストール、exampleの実行
GitHubのページに記載されている通りのコマンドをそのまま実行すればよい。簡単。
書かれている通りにexampleを実行すると、下記のようなconfig.ymlが出力される。
in:
type: file
path_prefix: /work/try1/csv/sample_
decoders:
- {type: gzip}
parser:
charset: UTF-8
newline: CRLF
type: csv
delimiter: ','
quote: '"'
escape: ''
skip_header_lines: 1
columns:
- {name: id, type: long}
- {name: account, type: long}
- {name: time, type: timestamp, format: '%Y-%m-%d %H:%M:%S'}
- {name: purchase, type: timestamp, format: '%Y%m%d'}
- {name: comment, type: string}
exec: {}
out: {type: stdout}
Embulkのコンフィグは、in
, out
, filters
, exec
の4つの大きなセクションからなる。(filters
は省略可能で、上の例では省略されている。)
ドキュメントには、それぞれ下記のように説明されている。
-
in
: 入力プラグイン、デコーダ、パーサ を指定 -
out
: 出力プラグイン、エンコーダ、フォーマッタ を指定 -
filters
:in
とout
の間に適用するフィルタを指定。省略可。 -
exec
: Executorプラグインを指定する。Executorとは、処理を実行するプラグインのことで、たとえば組み込みの"thread executor"や、"Hadoop MapReduce executor"等がある。
エンコーダというのは、要するに圧縮や暗号化のこと。
プラグインの一覧は、ここで見ることができる。
組み込みプラグインの使い方は、ドキュメントに詳しく書かれているのでそちらを参照すること。
プラグインの開発
プラグインは、JavaまたはRubyで書くことができる。
ただし、Javaで書かれた組み込みプラグインなどを拡張するにはJRuby向けブリッジが提供されている必要があって、現在(0.6.2)のところ、Rubyでは書けない部分も存在する。
[0.6.2においてRubyで書けるもの]
-
Rubyで書ける
- InputPlugin
- OutputPlugin
- FilterPlugin
- ParserPlugin
- GuessPlugin
-
Rubyで書けない
- EncoderPlugin / DecoderPlugin
- FileInputPlugin / FilleOutputPlugin の拡張
現状でも、普通の用途で拡張したくなるポイントはだいたいRubyで拡張できるようになっている。
Rubyでプラグインを書くのは、Fluentdのプラグインを書いたことがあれば、それほど苦労はしないはず。
"next configuration"
embulk run では、-o
オプションによって「次のコンフィグファイル」を出力することができる。
これは他のソフトウエアにあまり無い概念で、少し戸惑ったので使い方を書いておく。
embulk run config.yml -o next_config.yml
このように実行した場合、config.ymlと同じ内容に加えて、Inputプラグインが「どこまで実行したか」という情報が、next_config.ymlに出力される。
この情報はInputプラグインの種類によって異なり、たとえばFileInputPluginであれば、last_path
として「最後にロードしたファイル」が記録される。
...
columns:
- {name: id, type: long}
- {name: account, type: long}
- {name: time, type: timestamp, format: '%Y-%m-%d %H:%M:%S'}
- {name: purchase, type: timestamp, format: '%Y%m%d'}
- {name: comment, type: string}
last_path: /work/try1/csv/sample_01.csv.gz
exec: {}
out: {type: stdout}
次回実行時には、前回生成されたnext_config.ymlを使って
embulk run next_config.yml
とすれば、前回の処理結果を踏まえて今回の処理を実行する。
FileInputPlugindであれば、last_path
に記載されたファイルよりも「辞書順で後」になるファイルだけが処理される。
また、
embulk run config.yml -o config.yml
このように実行対象のコンフィグとnext configに同じファイルを指定することもできて、こうすれば実行するたびに新着ファイルのみを処理し続けるといったことができる。
設定ファイルが毎回書き換わるというのは何となく気持ち悪い感じもするが、cronで定期的にファイルを見に行く場合はこのようにしなさいと、ドキュメントにも記載されている。
Taskとトランザクション
Embulkは、処理をTaskという単位に分割し、Task単位で並列化とトランザクション制御をしているらしい。
Taskの分割方法はInputPluginによって決められる。
たとえば、FileInputPluginの場合は、1ファイルにつき1つのタスクを生成する。
1ファイルをさらに複数のタスクに分割するembulk-input-filesplit
というプラグインもある。
Taskをどうやって実行するかはExecutorが決める。
現在はローカルでマルチスレッド実行(デフォルト)かHadoopクラスタで実行するかという2択だが、今後もっと手軽に分散処理をする方法など追加されるといいなと思う。
Resume State
embulk run config.yml -r resume_state.yml
このように実行すると、異常終了した場合にresume stateが出力される。
何らかのエラーによりembulkが異常終了した場合に、問題修正後同じく-r
オプション付きで実行すると、resume stateを読み込んで、トランザクション単位で続きから再開できるらしい。
resume stateが出力されるのはあくまでトランザクションが失敗した場合なので、単一レコードのパースエラーだとか、トランザクションの成否に影響しないようなエラーでは出力されない。
ログ出力
現状ではembulkはすべてのログを標準出力に出すっぽいので、実行結果を監視したいときは
embulk run config.yml -l warn -r resume_state.yml &>> embulk.log
のようにして、出たログファイルを監視すればよさそう。
所感
まだ未実装の機能などはあるが、手軽に使えて、しかも分散処理までできるという、データ処理をする人にとってはかなり強力なツールになりそう。
あとFluentdと同じくプラグインが簡単に書けるし、メインの開発者の方も日本語話者の方なので、これからOSSにコミットしていきたいと思っている自分のような人の足がかりとしても良いんじゃないだろうか。
個人的には下記のFeatureがとても気になっていて、実装されたらぜひ使っていきたいなと思っている。
Command-line (ssh) distributed executor · Issue #30 · embulk/embulk