9
8

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

Embulk (0.6.2) 軽く調査

Last updated at Posted at 2015-04-20

Embulkはオープンソースのバルクデータ転送ソフトウエアである。
Fluentdと同様、Ruby(JRuby)によって入力、出力、変換等のプラグインを容易に開発可能。

すでにたくさんの記事が公開されているが、まだ絶賛開発中のプロダクトということもあって内容が断片的だったり古かったりといった様子なので、自分なりに現状を調べてみた結果を書いておく。

環境

  • Embulk v0.6.2
  • JRE 1.8.0_40
  • Mac OS X 10.10.3

ドキュメントはこちら
http://www.embulk.org/docs/

インストール、exampleの実行

GitHubのページに記載されている通りのコマンドをそのまま実行すればよい。簡単。

書かれている通りにexampleを実行すると、下記のようなconfig.ymlが出力される。

config.yml
in:
  type: file
  path_prefix: /work/try1/csv/sample_
  decoders:
  - {type: gzip}
  parser:
    charset: UTF-8
    newline: CRLF
    type: csv
    delimiter: ','
    quote: '"'
    escape: ''
    skip_header_lines: 1
    columns:
    - {name: id, type: long}
    - {name: account, type: long}
    - {name: time, type: timestamp, format: '%Y-%m-%d %H:%M:%S'}
    - {name: purchase, type: timestamp, format: '%Y%m%d'}
    - {name: comment, type: string}
exec: {}
out: {type: stdout}

Embulkのコンフィグは、in, out, filters, exec の4つの大きなセクションからなる。(filtersは省略可能で、上の例では省略されている。)

ドキュメントには、それぞれ下記のように説明されている。

  • in: 入力プラグイン、デコーダ、パーサ を指定
  • out: 出力プラグイン、エンコーダ、フォーマッタ を指定
  • filters: inoutの間に適用するフィルタを指定。省略可。
  • exec: Executorプラグインを指定する。Executorとは、処理を実行するプラグインのことで、たとえば組み込みの"thread executor"や、"Hadoop MapReduce executor"等がある。

エンコーダというのは、要するに圧縮や暗号化のこと。

プラグインの一覧は、ここで見ることができる。

組み込みプラグインの使い方は、ドキュメントに詳しく書かれているのでそちらを参照すること。

プラグインの開発

プラグインは、JavaまたはRubyで書くことができる。

ただし、Javaで書かれた組み込みプラグインなどを拡張するにはJRuby向けブリッジが提供されている必要があって、現在(0.6.2)のところ、Rubyでは書けない部分も存在する。

[0.6.2においてRubyで書けるもの]

  • Rubyで書ける

    • InputPlugin
    • OutputPlugin
    • FilterPlugin
    • ParserPlugin
    • GuessPlugin
  • Rubyで書けない

    • EncoderPlugin / DecoderPlugin
    • FileInputPlugin / FilleOutputPlugin の拡張

現状でも、普通の用途で拡張したくなるポイントはだいたいRubyで拡張できるようになっている。

Rubyでプラグインを書くのは、Fluentdのプラグインを書いたことがあれば、それほど苦労はしないはず。

"next configuration"

embulk run では、-oオプションによって「次のコンフィグファイル」を出力することができる。
これは他のソフトウエアにあまり無い概念で、少し戸惑ったので使い方を書いておく。

embulk run config.yml -o next_config.yml

このように実行した場合、config.ymlと同じ内容に加えて、Inputプラグインが「どこまで実行したか」という情報が、next_config.ymlに出力される。

この情報はInputプラグインの種類によって異なり、たとえばFileInputPluginであれば、last_pathとして「最後にロードしたファイル」が記録される。

next_config.yml
...
    columns:
    - {name: id, type: long}
    - {name: account, type: long}
    - {name: time, type: timestamp, format: '%Y-%m-%d %H:%M:%S'}
    - {name: purchase, type: timestamp, format: '%Y%m%d'}
    - {name: comment, type: string}
  last_path: /work/try1/csv/sample_01.csv.gz
exec: {}
out: {type: stdout}

次回実行時には、前回生成されたnext_config.ymlを使って

embulk run next_config.yml

とすれば、前回の処理結果を踏まえて今回の処理を実行する。

FileInputPlugindであれば、last_pathに記載されたファイルよりも「辞書順で後」になるファイルだけが処理される。

また、

embulk run config.yml -o config.yml

このように実行対象のコンフィグとnext configに同じファイルを指定することもできて、こうすれば実行するたびに新着ファイルのみを処理し続けるといったことができる。

設定ファイルが毎回書き換わるというのは何となく気持ち悪い感じもするが、cronで定期的にファイルを見に行く場合はこのようにしなさいと、ドキュメントにも記載されている。

Taskとトランザクション

Embulkは、処理をTaskという単位に分割し、Task単位で並列化とトランザクション制御をしているらしい。

Taskの分割方法はInputPluginによって決められる。
たとえば、FileInputPluginの場合は、1ファイルにつき1つのタスクを生成する。
1ファイルをさらに複数のタスクに分割するembulk-input-filesplitというプラグインもある。

Taskをどうやって実行するかはExecutorが決める。
現在はローカルでマルチスレッド実行(デフォルト)かHadoopクラスタで実行するかという2択だが、今後もっと手軽に分散処理をする方法など追加されるといいなと思う。

Resume State

embulk run config.yml -r resume_state.yml

このように実行すると、異常終了した場合にresume stateが出力される。

何らかのエラーによりembulkが異常終了した場合に、問題修正後同じく-rオプション付きで実行すると、resume stateを読み込んで、トランザクション単位で続きから再開できるらしい。

resume stateが出力されるのはあくまでトランザクションが失敗した場合なので、単一レコードのパースエラーだとか、トランザクションの成否に影響しないようなエラーでは出力されない。

ログ出力

現状ではembulkはすべてのログを標準出力に出すっぽいので、実行結果を監視したいときは

embulk run config.yml -l warn -r resume_state.yml &>> embulk.log

のようにして、出たログファイルを監視すればよさそう。

所感

まだ未実装の機能などはあるが、手軽に使えて、しかも分散処理までできるという、データ処理をする人にとってはかなり強力なツールになりそう。

あとFluentdと同じくプラグインが簡単に書けるし、メインの開発者の方も日本語話者の方なので、これからOSSにコミットしていきたいと思っている自分のような人の足がかりとしても良いんじゃないだろうか。

個人的には下記のFeatureがとても気になっていて、実装されたらぜひ使っていきたいなと思っている。

Command-line (ssh) distributed executor · Issue #30 · embulk/embulk

参考リンク

9
8
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
9
8

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?