はじめに
embulk-filter-copy という Embulk Plugin を作ってリリースしました。
この記事では、その使い方や実装コンセプトについてまとめようと思います。
なにするやつか
filter plugin に渡ってきたデータを copy して、そのデータを input として別の embulk を実行する plugin です。example を見ると雰囲気が伝わると思います。
in:
type: file
path_prefix: ./example/data.tsv
parser:
type: csv
delimiter: "\t"
skip_header_lines: 0
null_string: ""
columns:
- { name: id, type: long }
- { name: description, type: string }
- { name: name, type: string }
- { name: payload, type: json}
stop_on_invalid_record: true
filters:
- type: copy
config:
exec:
max_threads: 8
filters:
- type: remove_columns
remove: ["id"]
out:
type: stdout
out:
type: stdout
type: copy
の下あたりに embulk の config っぽい記述がありますね!
例えば、処理するデータが以下のようなデータだったとすると (4行目の3カラム目は空文字)
0 c20ef94602 c212c89f91 {"a":0,"b":"99"}
1 330a9fc33a e25b33b616 {"a":1,"b":"a9"}
2 707b3b7588 90823c6a1f {"a":2,"b":"96"}
3 8d8288e66f {"a":3,"b":"86"}
4 c54d8b6481 e56a40571c {"a":4,"b":"d2"}
出力は次のようになります。(ログは省略)
0,c20ef94602,c212c89f91,{"a":0,"b":"99"}
1,330a9fc33a,e25b33b616,{"a":1,"b":"a9"}
2,707b3b7588,90823c6a1f,{"a":2,"b":"96"}
3,8d8288e66f,,{"a":3,"b":"86"}
4,c54d8b6481,e56a40571c,{"a":4,"b":"d2"}
c20ef94602,c212c89f91,{"a":0,"b":"99"}
330a9fc33a,e25b33b616,{"a":1,"b":"a9"}
707b3b7588,90823c6a1f,{"a":2,"b":"96"}
8d8288e66f,,{"a":3,"b":"86"}
c54d8b6481,e56a40571c,{"a":4,"b":"d2"}
main の Embulk のstdout
と embulk-filter-copy が実行する Embulk のstdout
(id
カラムが除かれている方) が出力されていることが分かると思います。
このように input 以外の embulk 実行設定を embulk-filter-copy の config に渡すことで embulk-filter-copy に渡ってきたデータを input として embulk を別途起動するプラグインです。
つかいかた
なにするやつか でほぼ説明してしまっていますが、 embulk-filter-copy は以下のオプションを持っています。
-
config
:in:
を除いた embulk の config を書きます。
これら以外に必要なオプションはありません。
利用に際しての注意点
Java8
- EmbulkとJava7 という記事で紹介されていますが 2017-06-09現在 Embulk はまだ Java8 を公式にサポートしていません
- しかし、 embulk-filter-copy は Java8 で実装されています。
- コンパイルも Java7 向けのコンパイルをしていません。(ラムダ式などjava7向けにコンパイルできないJava8の機能を使っています。)
- これは後に紹介する依存ライブラリが Java8 前提で書かれていたことによる制約です。
趣味で書いたから楽しみたかった - Embulk は Runtime を Java8 にすることに関して問題はない(はず)ので、ご利用の際は Java8 による実行をお願いします。
- Embulk の Java8 公式サポート切に願っておりますよろしくお願いいたします
Interface
- Embulk の config を渡す
config
というオプションについては変更することは今後もありません(あったとしても一定期間並行利用できるようにします。) - ただ、実は embulk-filter-copy には紹介したオプション以外のオプションが多数あります。(まだ README にも書いてません)
- これらは内部で利用しているライブラリを細かく制御できるようにするためのものですが、内部実装は利用が進むと変わる可能性があるため、これらオプションについては変わる可能性があることをご認識ください。
エラーハンドリング
- まだ仕様が固まっていません
- 現状は、 embulk-filter-copy 上で動く Embulk が Exception を吐いた場合は問答無用で embulk-filter-copy も RuntimeException を吐く(つまり、 main で動いている Embulk がエラーで死ぬ)ようにしています。
- 今後、例えば
skip_if_embulk_failed
といったオプションを追加して Exception 無視出来るようにしたりするアイデアはありますが、利用が進んで利用者の声を聞きつつ進めようと思っています。 - ので、こうあるべきという意見があれば issue に起票をお願いします
embulk-executor-mapreduce
- embulk-executor-mapreduce 上ではまだ動きません。
その他
- README の NOTE を参照してください。
- まだ細かな配慮が出来ていない部分があるのでお気づきの点があれば issue に起票をお願いします
モチベーション
Embulk Meetup Tokyo #3 で @toyama0919 さんの『クラウドとEmbulkで作った機械学習基盤』という発表を見て、多段かつ並列に Embulk を実行してデータ生成をする複雑なユースケースを知り embulk-filter-copy を実装してみようと思いました。その複雑さについてはスライドや以下の tweet 近辺をご覧になってみてください笑
Embulk と digdag で map reduce やってるかんじかw #embulk
— Civitaspo (@Civitaspo) 16 May 2017
toyamaさんembulkで相当無茶やってる気がするw #embulk
— 夜行性のフレンズ じょーかー (@joker1007) 16 May 2017
2017-06-07 に開催された workflowenginesnight では @frsyuki さんに「ユースケースの幅が広がるので良いですねぇ」とお墨付きの言葉をもらったのでそれも今ではモチベーションになっています。
実装コンセプト
ここからは実装コンセプトについて書いていきます。
言葉遣いについてのおことわり
Embulk と書いたときに embulk-filter-copy が実行した Embulk か main で実行されている Embulk なのかわかりにくいので、ここからは main で実行されている Embulk を main Embulk、 embulk-filter-copy が実行した Embulk を copy Embulk と書くことにします。
embulk-filter-copy は copy Embulk が使う input plugin も実装する
embulk-filter-copy は以下のコードを見ると分かりますが input plugin も実装 されています。
Embulk::JavaPlugin.register_filter(
"copy", "org.embulk.filter.copy.CopyFilterPlugin",
File.expand_path('../../../../classpath', __FILE__))
Embulk::JavaPlugin.register_input(
"internal_forward",
"org.embulk.filter.copy.plugin.InternalForwardInputPlugin",
File.expand_path('../../../../classpath', __FILE__))
これは copy Embulk が使う input plugin で embulk-filter-copy がコピーしたデータを受信するために使っています。
main Embulk から copy Embulk へのデータ送信はネットワーク経由で行う
main Embulk から copy Embulk へのデータ送信はネットワーク経由で行います。これは、将来的に別のサーバーで copy Embulk を起動することを考えているためです。Embulk はサーバーリソースを限界まで使うよう設計されているため copy Embulk は別のサーバーで動くのが良いだろうと考えています。
embulk-executor-mapreduce
そのため、embulk-executor-mapreduce 上で動かすことも将来的に出来ると考えています。copy Embulk の input task が動くサーバーが分からなければ embulk-filter-copy がデータを送信出来ないのでどうやって取得しようか考えているところです。
Forward Protocol
実は、main Embulk から copy Embulk へのデータ送信のために Forward Protocol を利用しています。これは Embulk の実装が MessagePack との親和性が高いことと Forward Protocol を使用したサーバー・クライアント実装の開発が活発に行われていることに依ります。
現実装では以下のライブラリを使用しています。
okumin/influent
- Java で Fluentd の in_forward っぽいことをするライブラリを作りました #fluentd
- Influent ベンチマーク - Part 1 #fluentd
- Influent ベンチマーク - Part 2 #fluentd
- Influent ベンチマーク - Part 3 #fluentd
komamitsu/fluency
実装上ハマったこと(Embulkガチ勢向け)
なお、書いた時点の Embulk のバージョンは v0.8.23 になります。
EmbulkEmbed
embulk-filter-copy の内部で copy Embulk を実行するために利用しているクラスが EmbulkEmbed になります。このクラスは Embulk の実行をコントロールしているクラスで embulk run
や emburk guess
、 embulk preview
は JRuby からこのクラスを呼んで実行されています。
このクラスが非常に厄介でコンストラクタが package-local にしか公開されていないので当然 embulk-filter-copy の内部からは呼べません。また、このクラスのインスタンスを生成するための Builder クラス(Bootstrap
) は JRuby から呼び出されること(というか依存関係の解決を JRuby 内で行うこと)を想定しており、 public
に公開されているメソッドを普通に使って EmbulkEmbed をビルドすると依存関係が解決されていない状態の EmbulkEmbed が作られてしまいます。
そこで、なんとか依存関係が解決された状態の EmbulkEmbed を生成したく考えた結果が以下のような黒魔術でした。
private EmbulkEmbed newEmbulkEmbed()
{
try {
Constructor<EmbulkEmbed> constructor = EmbulkEmbed.class
.getDeclaredConstructor(ConfigSource.class, LifeCycleInjector.class);
constructor.setAccessible(true);
return constructor.newInstance(null, Exec.getInjector());
}
catch (InstantiationException | IllegalAccessException | NoSuchMethodException | InvocationTargetException e) {
throw new ConfigException(e);
}
}
Exec.getInjector()
というメソッドで依存関係が詰め込まれたInjector
を取得出来ます。このInjector
を無理矢理 EmbulkEmbed のコンストラクタに詰め込んで生成するというものです。 Builder クラス(Bootstrap
) は JRuby で解決された依存関係を使ってInjector
を生成することを目的としており、このInjector
は Embulk の実行中 Global に一つしか存在しない認識なので、使い回して問題ないだろうということでこのような実装にしてみました。他に良い方法があれば教えていただきたいです。
static 変数
はじめはスモールにネットワーク経由でデータ送信せずにプロセス内でデータ共有すればよいかなぁと static
な変数に Queue
を作ってデータ共有しようと考えていました。しかしながら、Embulk は Plugin 毎に ClassLoader が異なるため、 Plugin を超えて static
な変数の共有が出来ないのでした。(実はコードを全て読み切ったわけではないのでこの説明であってるか不安)
雑
- fluentd へ送信する plugin は別途外出ししても良いかもしれない
Fluentdへのforward機能欲しい #embulk
— Hiroshi Toyama (@toyama0919) 16 May 2017
おわりに
以上になります。
それでは embulk-filter-copy をよろしくお願いいたします。