LoginSignup
14
6

More than 5 years have passed since last update.

embulk-filter-copy の紹介

Last updated at Posted at 2017-06-09

はじめに

embulk-filter-copy という Embulk Plugin を作ってリリースしました。
この記事では、その使い方や実装コンセプトについてまとめようと思います。

なにするやつか

filter plugin に渡ってきたデータを copy して、そのデータを input として別の embulk を実行する plugin です。example を見ると雰囲気が伝わると思います。

config.yml
in:
  type: file
  path_prefix: ./example/data.tsv
  parser:
    type: csv
    delimiter: "\t"
    skip_header_lines: 0
    null_string: ""
    columns:
      - { name: id, type: long }
      - { name: description, type: string }
      - { name: name, type: string }
      - { name: payload, type: json}
    stop_on_invalid_record: true
filters:
  - type: copy
    config:
      exec:
        max_threads: 8
      filters:
        - type: remove_columns
          remove: ["id"]
      out:
        type: stdout
out:
  type: stdout

type: copy の下あたりに embulk の config っぽい記述がありますね!
例えば、処理するデータが以下のようなデータだったとすると (4行目の3カラム目は空文字)

example/data.tsv
0   c20ef94602  c212c89f91  {"a":0,"b":"99"}
1   330a9fc33a  e25b33b616  {"a":1,"b":"a9"}
2   707b3b7588  90823c6a1f  {"a":2,"b":"96"}
3   8d8288e66f      {"a":3,"b":"86"}
4   c54d8b6481  e56a40571c  {"a":4,"b":"d2"}

出力は次のようになります。(ログは省略)

0,c20ef94602,c212c89f91,{"a":0,"b":"99"}
1,330a9fc33a,e25b33b616,{"a":1,"b":"a9"}
2,707b3b7588,90823c6a1f,{"a":2,"b":"96"}
3,8d8288e66f,,{"a":3,"b":"86"}
4,c54d8b6481,e56a40571c,{"a":4,"b":"d2"}
c20ef94602,c212c89f91,{"a":0,"b":"99"}
330a9fc33a,e25b33b616,{"a":1,"b":"a9"}
707b3b7588,90823c6a1f,{"a":2,"b":"96"}
8d8288e66f,,{"a":3,"b":"86"}
c54d8b6481,e56a40571c,{"a":4,"b":"d2"}

main の Embulk のstdoutembulk-filter-copy が実行する Embulk のstdout(idカラムが除かれている方) が出力されていることが分かると思います。

このように input 以外の embulk 実行設定を embulk-filter-copy の config に渡すことで embulk-filter-copy に渡ってきたデータを input として embulk を別途起動するプラグインです。

つかいかた

なにするやつか でほぼ説明してしまっていますが、 embulk-filter-copy は以下のオプションを持っています。

これら以外に必要なオプションはありません。

利用に際しての注意点

Java8

  • EmbulkとJava7 という記事で紹介されていますが 2017-06-09現在 Embulk はまだ Java8 を公式にサポートしていません:scream:
  • しかし、 embulk-filter-copy は Java8 で実装されています。
  • コンパイルも Java7 向けのコンパイルをしていません。(ラムダ式などjava7向けにコンパイルできないJava8の機能を使っています。)
  • これは後に紹介する依存ライブラリが Java8 前提で書かれていたことによる制約です。 趣味で書いたから楽しみたかった
  • Embulk は Runtime を Java8 にすることに関して問題はない(はず)ので、ご利用の際は Java8 による実行をお願いします。
  • Embulk の Java8 公式サポート切に願っております:pray:よろしくお願いいたします:pray::pray::pray:

Interface

  • Embulk の config を渡す config というオプションについては変更することは今後もありません:thumbsup:(あったとしても一定期間並行利用できるようにします。)
  • ただ、実は embulk-filter-copy には紹介したオプション以外のオプションが多数あります。(まだ README にも書いてません)
  • これらは内部で利用しているライブラリを細かく制御できるようにするためのものですが、内部実装は利用が進むと変わる可能性があるため、これらオプションについては変わる可能性があることをご認識ください。

エラーハンドリング

  • まだ仕様が固まっていません:poop:
  • 現状は、 embulk-filter-copy 上で動く Embulk が Exception を吐いた場合は問答無用で embulk-filter-copy も RuntimeException を吐く(つまり、 main で動いている Embulk がエラーで死ぬ)ようにしています。
  • 今後、例えば skip_if_embulk_failed といったオプションを追加して Exception 無視出来るようにしたりするアイデアはありますが、利用が進んで利用者の声を聞きつつ進めようと思っています。
  • ので、こうあるべきという意見があれば issue に起票をお願いします:pray:

embulk-executor-mapreduce

その他

  • README の NOTE を参照してください。
  • まだ細かな配慮が出来ていない部分があるのでお気づきの点があれば issue に起票をお願いします:pray:

モチベーション

Embulk Meetup Tokyo #3@toyama0919 さんの『クラウドとEmbulkで作った機械学習基盤』という発表を見て、多段かつ並列に Embulk を実行してデータ生成をする複雑なユースケースを知り embulk-filter-copy を実装してみようと思いました。その複雑さについてはスライドや以下の tweet 近辺をご覧になってみてください笑




2017-06-07 に開催された workflowenginesnight では @frsyuki さんに「ユースケースの幅が広がるので良いですねぇ」とお墨付きの言葉をもらったのでそれも今ではモチベーションになっています。

実装コンセプト

ここからは実装コンセプトについて書いていきます。

言葉遣いについてのおことわり
Embulk と書いたときに embulk-filter-copy が実行した Embulk か main で実行されている Embulk なのかわかりにくいので、ここからは main で実行されている Embulk を main Embulk、 embulk-filter-copy が実行した Embulk を copy Embulk と書くことにします。

embulk-filter-copy は copy Embulk が使う input plugin も実装する

embulk-filter-copy は以下のコードを見ると分かりますが input plugin も実装 されています。

lib/embulk/filter/copy.rb
Embulk::JavaPlugin.register_filter(
  "copy", "org.embulk.filter.copy.CopyFilterPlugin",
  File.expand_path('../../../../classpath', __FILE__))

Embulk::JavaPlugin.register_input(
    "internal_forward",
    "org.embulk.filter.copy.plugin.InternalForwardInputPlugin",
    File.expand_path('../../../../classpath', __FILE__))

これは copy Embulk が使う input pluginembulk-filter-copy がコピーしたデータを受信するために使っています。

main Embulk から copy Embulk へのデータ送信はネットワーク経由で行う

main Embulk から copy Embulk へのデータ送信はネットワーク経由で行います。これは、将来的に別のサーバーで copy Embulk を起動することを考えているためです。Embulk はサーバーリソースを限界まで使うよう設計されているため copy Embulk は別のサーバーで動くのが良いだろうと考えています。

embulk-executor-mapreduce

そのため、embulk-executor-mapreduce 上で動かすことも将来的に出来ると考えています。copy Embulk の input task が動くサーバーが分からなければ embulk-filter-copy がデータを送信出来ないのでどうやって取得しようか考えているところです。

Forward Protocol

実は、main Embulk から copy Embulk へのデータ送信のために Forward Protocol を利用しています。これは Embulk の実装が MessagePack との親和性が高いことと Forward Protocol を使用したサーバー・クライアント実装の開発が活発に行われていることに依ります。

現実装では以下のライブラリを使用しています。

okumin/influent
komamitsu/fluency

実装上ハマったこと(Embulkガチ勢向け)

なお、書いた時点の Embulk のバージョンは v0.8.23 になります。

EmbulkEmbed

embulk-filter-copy の内部で copy Embulk を実行するために利用しているクラスが EmbulkEmbed になります。このクラスは Embulk の実行をコントロールしているクラスで embulk runemburk guessembulk preview は JRuby からこのクラスを呼んで実行されています。

このクラスが非常に厄介でコンストラクタが package-local にしか公開されていないので当然 embulk-filter-copy の内部からは呼べません。また、このクラスのインスタンスを生成するための Builder クラス(Bootstrap) は JRuby から呼び出されること(というか依存関係の解決を JRuby 内で行うこと)を想定しており、 public に公開されているメソッドを普通に使って EmbulkEmbed をビルドすると依存関係が解決されていない状態の EmbulkEmbed が作られてしまいます。

そこで、なんとか依存関係が解決された状態の EmbulkEmbed を生成したく考えた結果が以下のような黒魔術でした。

EmbulkExecutorService.java
    private EmbulkEmbed newEmbulkEmbed()
    {
        try {
            Constructor<EmbulkEmbed> constructor = EmbulkEmbed.class
                    .getDeclaredConstructor(ConfigSource.class, LifeCycleInjector.class);
            constructor.setAccessible(true);
            return constructor.newInstance(null, Exec.getInjector());
        }
        catch (InstantiationException | IllegalAccessException | NoSuchMethodException | InvocationTargetException e) {
            throw new ConfigException(e);
        }
    }

Exec.getInjector()というメソッドで依存関係が詰め込まれたInjectorを取得出来ます。このInjectorを無理矢理 EmbulkEmbed のコンストラクタに詰め込んで生成するというものです。 Builder クラス(Bootstrap) は JRuby で解決された依存関係を使ってInjectorを生成することを目的としており、このInjectorは Embulk の実行中 Global に一つしか存在しない認識なので、使い回して問題ないだろうということでこのような実装にしてみました。他に良い方法があれば教えていただきたいです。

static 変数

はじめはスモールにネットワーク経由でデータ送信せずにプロセス内でデータ共有すればよいかなぁと static な変数に Queue を作ってデータ共有しようと考えていました。しかしながら、Embulk は Plugin 毎に ClassLoader が異なるため、 Plugin を超えて static な変数の共有が出来ないのでした。(実はコードを全て読み切ったわけではないのでこの説明であってるか不安)

  • fluentd へ送信する plugin は別途外出ししても良いかもしれない

おわりに

以上になります。
それでは embulk-filter-copy をよろしくお願いいたします。

14
6
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
14
6