LoginSignup
10
4

More than 3 years have passed since last update.

embulk-input-union の紹介

Last updated at Posted at 2020-12-12

この記事はZOZOテクノロジーズ #2 Advent Calendar 2020 17日目の記事です。

はじめに

こんにちは @civitaspo です。先日新しい Embulk Pluginembulk-input-union をリリースしました。この記事ではこの embulk-input-union の使い方や実装内容について紹介しようと思います。

なにするやつか

皆さん SQL の union 句はご存じだと思いますが、この Plugin は複数のデータソースを union して input として利用出来ます。 example を見ると雰囲気が伝わるかと思います。

code.yml
in:
  type: union
  union:
    - in:
        type: file
        path_prefix: ./example/data01.tsv
        parser:
          type: csv
          delimiter: "\t"
          skip_header_lines: 0
          null_string: ""
          columns:
            - { name: id, type: long }
            - { name: description, type: string }
            - { name: name, type: string }
            - { name: t, type: timestamp, format: "%Y-%m-%d %H:%M:%S %z" }
            - { name: payload, type: json }
          stop_on_invalid_record: true
      filters:
        - type: column
          add_columns:
            - { name: group_name, type: string, default: "data01" }
    - name: example
      in:
        type: file
        path_prefix: ./example/data02.tsv
        parser:
          type: csv
          delimiter: "\t"
          skip_header_lines: 0
          null_string: ""
          columns:
            - { name: id, type: long }
            - { name: description, type: string }
            - { name: name, type: string }
            - { name: t, type: timestamp, format: "%Y-%m-%d %H:%M:%S %z" }
            - { name: payload, type: json }
          stop_on_invalid_record: true
      filters:
        - type: column
          add_columns:
            - { name: group_name, type: string, default: "data02" }

out:
  type: stdout

type: unionunion というオプションに配列で Embulk の input 及び filter の設定らしき記述がありますね!
これは本当に Embulk の設定で、複数のデータソースから Embulk でそれぞれ filter まで処理した後 union して input として利用出来るのがこのプラグインの特徴です。

この例の記述では

  • ./example/data01.tsv からデータを読み出し filter で group_name というカラムを追加後 data01 という値を入れたもの
  • ./example/data02.tsv からデータを読み出し filter で group_name というカラムを追加後 data02 という値を入れたもの

以上の2つの結果が union されて stdout に出力される設定となります。

実際にこの例を走らせてみます。ちなみに ./example/data01.tsv./example/data02.tsv にはどちらにも同じ下記のデータが入っています。

/example/{data01.tsv,data02.tsv}
0   c20ef94602  c212c89f91  2017-10-24 03:54:35 +0900   {"a":0,"b":"99"}
1   330a9fc33a  e25b33b616  2017-10-22 19:53:31 +0900   {"a":1,"b":"a9"}
2   707b3b7588  90823c6a1f  2017-10-23 23:42:43 +0900   {"a":2,"b":"96"}
3   8d8288e66f      2017-10-22 06:12:13 +0900   {"a":3,"b":"86"}
4   c54d8b6481  e56a40571c  2017-10-23 04:59:16 +0900   {"a":4,"b":"d2"}
❯ embulk run config.yml
2020-12-11 23:16:59.370 +0900: Embulk v0.9.23
2020-12-11 23:17:00.187 +0900 [WARN] (main): DEPRECATION: JRuby org.jruby.embed.ScriptingContainer is directly injected.
2020-12-11 23:17:02.263 +0900 [INFO] (main): BUNDLE_GEMFILE is being set: "/Users/takahiro.nakayama/src/github.com/civitaspo/embulk-input-union/example/Gemfile"
2020-12-11 23:17:02.264 +0900 [INFO] (main): Gem's home and path are being cleared.
2020-12-11 23:17:03.612 +0900 [INFO] (main): Started Embulk v0.9.23
2020-12-11 23:17:03.727 +0900 [INFO] (0001:transaction): Loaded plugin embulk/input/union from a load path
2020-12-11 23:17:03.937 +0900 [INFO] (0001:transaction[ad5c2a8]:union[0]:in[file].filters[column]:#transaction): Listing local files at directory './example' filtering filename by prefix 'data01.tsv'
2020-12-11 23:17:03.938 +0900 [INFO] (0001:transaction[ad5c2a8]:union[0]:in[file].filters[column]:#transaction): "follow_symlinks" is set false. Note that symbolic links to directories are skipped.
2020-12-11 23:17:03.940 +0900 [INFO] (0001:transaction[ad5c2a8]:union[0]:in[file].filters[column]:#transaction): Loading files [./example/data01.tsv]
2020-12-11 23:17:04.128 +0900 [INFO] (0001:transaction[ad5c2a8]:union[0]:in[file].filters[column]:#transaction): Loaded plugin embulk-filter-column (0.7.1)
2020-12-11 23:17:04.155 +0900 [INFO] (0001:transaction[ad5c2a8]:union[1]:example:#transaction): Listing local files at directory './example' filtering filename by prefix 'data02.tsv'
2020-12-11 23:17:04.155 +0900 [INFO] (0001:transaction[ad5c2a8]:union[1]:example:#transaction): "follow_symlinks" is set false. Note that symbolic links to directories are skipped.
2020-12-11 23:17:04.156 +0900 [INFO] (0001:transaction[ad5c2a8]:union[1]:example:#transaction): Loading files [./example/data02.tsv]
2020-12-11 23:17:04.169 +0900 [INFO] (0001:transaction): Using local thread executor with max_threads=24 / output tasks 12 = input tasks 2 * 6
2020-12-11 23:17:04.180 +0900 [INFO] (0001:transaction): {done:  0 / 2, running: 0}
2020-12-11 23:17:04.226 +0900 [INFO] (0016:transaction[ad5c2a8]:union[1]:example:#run): Using local thread executor with max_threads=12 / tasks=1
2020-12-11 23:17:04.227 +0900 [INFO] (0015:transaction[ad5c2a8]:union[0]:in[file].filters[column]:#run): Using local thread executor with max_threads=12 / tasks=1
2020-12-11 23:17:04.262 +0900 [INFO] (0015:transaction[ad5c2a8]:union[0]:in[file].filters[column]:#run): {done:  0 / 1, running: 0}
2020-12-11 23:17:04.262 +0900 [INFO] (0016:transaction[ad5c2a8]:union[1]:example:#run): {done:  0 / 1, running: 0}
2020-12-11 23:17:04.360 +0900 [INFO] (0016:transaction[ad5c2a8]:union[1]:example:#run): {done:  1 / 1, running: 0}
2020-12-11 23:17:04.360 +0900 [INFO] (0015:transaction[ad5c2a8]:union[0]:in[file].filters[column]:#run): {done:  1 / 1, running: 0}
0,c20ef94602,c212c89f91,2017-10-23 18:54:35 +0000,{"a":0,"b":"99"},data02
0,c20ef94602,c212c89f91,2017-10-23 18:54:35 +0000,{"a":0,"b":"99"},data01
1,330a9fc33a,e25b33b616,2017-10-22 10:53:31 +0000,{"a":1,"b":"a9"},data02
1,330a9fc33a,e25b33b616,2017-10-22 10:53:31 +0000,{"a":1,"b":"a9"},data01
2,707b3b7588,90823c6a1f,2017-10-23 14:42:43 +0000,{"a":2,"b":"96"},data02
2,707b3b7588,90823c6a1f,2017-10-23 14:42:43 +0000,{"a":2,"b":"96"},data01
3,8d8288e66f,,2017-10-21 21:12:13 +0000,{"a":3,"b":"86"},data02
3,8d8288e66f,,2017-10-21 21:12:13 +0000,{"a":3,"b":"86"},data01
4,c54d8b6481,e56a40571c,2017-10-22 19:59:16 +0000,{"a":4,"b":"d2"},data02
4,c54d8b6481,e56a40571c,2017-10-22 19:59:16 +0000,{"a":4,"b":"d2"},data01
2020-12-11 23:17:04.365 +0900 [INFO] (0001:transaction): {done:  2 / 2, running: 0}
2020-12-11 23:17:04.365 +0900 [INFO] (0001:transaction): {done:  2 / 2, running: 0}
2020-12-11 23:17:04.406 +0900 [INFO] (main): Committed.
2020-12-11 23:17:04.407 +0900 [INFO] (main): Next config diff: {"in":{"union":[{"in":{"last_path":"./example/data01.tsv"}},{"in":{"last_path":"./example/data02.tsv"}}]},"out":{}}

2つのファイルが読み出され、それぞれ filter 処理で追加した group_name である data01, data02 が付いた状態で出力されていることが確認出来ます。

つかいかた

オプションはシンプルです。 union というオプションしかありません。 union option には union したいデータソースの数だけ配列で Embulk の設定を記述します。
この Embulk の設定は以下の点で少しだけ普通の Embulk の設定とは異なります。

  • out option は記述しても無視される。
  • exec option は Local Executor Plugin 固定、かつ max_threads オプションのみサポート

    • Local Executor Plugin は Embulk の Default の Executor Plugin なので現状困ることはないはずですが、今後有用な Executor Plugin が出てきたら機能拡張するかもしれません。
  • name option が追加されています。

    • ログに出力するときに利用するのみで記述しなくても構いません。
    • デフォルトだと設定値のサマリが名称になります。
      • 上記の実行例で言うと in[file].filters[column] がソレにあたります。

ref. https://github.com/civitaspo/embulk-input-union#configuration

想定ユースケース

異なるミドルウェアからデータを抽出したい

異なるミドルウェアからデータを抽出したくなることがあります。具体例としてはラムダアーキテクチャを採用している場合で、 その瞬間の全てのデータを得るためには speed-layer と batch-layer の両方からデータを抽出する必要があります。このときデータが異なるミドルウェア上に載っていることがあります。そんなとき、 embulk-input-union が役に立ちます。

example.yml
in:
  type: union
  union:
    - name: speed-layer
      in:
        type: kafka
        topics:
          - "access-log"
        serialize_format: json
        brokers:
          - "localhost:9092"
        fetch_max_wait_ms: 10000
        columns:
          - {name: id, type: string}
          - {name: time, type: timestamp, format: "%Y-%m-%dT%H:%M:%SZ"}
          - {name: path, type: string}
          - {name: data, type: json}
          - {name: _key, type: string}
          - {name: _partition, type: long}
    - name: batch-layer
      in:
        type: s3
        bucket: my-s3-bucket
        path_prefix: access-logs/d=20201212/
        endpoint: s3-us-west-1.amazonaws.com
        access_key_id: ABCXYZ123ABCXYZ123
        secret_access_key: AbCxYz123aBcXyZ123
        decoders:
          - type: gzip
        parser:
          type: json
          columns:
            - {name: id, type: string}
            - {name: time, type: timestamp, format: "%Y-%m-%dT%H:%M:%SZ"}
            - {name: path, type: string}
            - {name: data, type: json}
            - {name: _key, type: string}
            - {name: _partition, type: long}

out:
  type: stdout

様々な API からデータを抽出し整形して一カ所にまとめたい

様々な API からデータを抽出し整形して一カ所にまとめたくなることがあります。例えば複数の広告事業者を利用していて、そのレポートデータを整形して1つのテーブルにまとめたいといったケースです。そんなときも、 embulk-input-union が役に立ちます。

example.yml
in:
  type: union
  union:
    - name: GoogleAds
      in:
        type: http
        url: ...
        ...snip...
      filters:
        - type: column
          ...snip...
    - name: Facebook Audience Network
      in:
        type: http
        url: ...
        ...snip...
      filters
        - type: column
          ...snip...
        - type: column
          ...snip...
    - name: Twitter Ads
      in:
        type: http
        url: ...
        ...snip...
      filters
        - type: column
          ...snip...
        - type: column
          ...snip...
        - type: column
          ...snip...

out:
  type: mysql
  ...snip...

(具体的な記述ができなくて申し訳ない:pray:)

File の Path をカラムに追加したい

冒頭の例のようなユースケースです。 Embulk では load したファイルの path や名称をデータとして取得出来ません。Hive などではパーティションの key, value を実データには持たず path に保持する仕様なので、パーティションの値が取得出来ずに困る場合があります。そんなときも、 embulk-input-union が役に立ちます。

※ 例は冒頭に書いたので省略します。

実装詳細

ここからはどういう仕組みで embulk-input-union が動いているのか詳解したいと思います。

PipeOutputPlugin

embulk-input-union では定義された設定に対して PipeOutputPlugin という内部 Plugin を Output Plugin として利用するようにしています。この Plugin は読んで字のごとく embulk-input-union で実行された Embulk のデータを本体の Embulk にパイプする役割を持っています。

ReuseOutputLocalExecutorPlugin

embulk-input-union では Executor Plugin も内部実装しています。この Executor Plugin は Embulk の DirectExecutor をベースに Output Plugin が再利用されるような変更を加えたものです。なぜこのような実装が必要なのかは Embulk の Plugin の呼び出し方に理由があります。

Embulk では transactionopen といった Plugin のインターフェースを実装するときにインスタンスの変数に依存した処理を書いてはいけないというのが暗黙知として存在します。コレは Embulk が内部で method を呼び出すときに毎回 Task から deserialize するためです。

今回の embulk-input-union の処理では本体の Embulk から受け取った PageOutput オブジェクトをそのまま PipeOutputPlugin で使い続けないとデータの受け渡しができません。そのため、PipeOutputPlugin が再作成されないような Executor Plugin を利用する必要があり ReuseOutputLocalExecutorPlugin が生まれたのでした。

BreakinBulkLoader

embulk-input-union では内部で Embulk 相当の挙動を実現するため BreakinBulkLoader というクラスを実装しています。このクラスは Embulk の BulkLoader に割り込む(break-in)形で別の Embulk を走らせるためこのような名前を付けています。

具体的にどういう挙動をするのか説明します。まず Embulk の BulkLoader の処理を見てみます。

BulkLoader.java
    private ExecutionResult doRun(ConfigSource config) {
        final BulkLoaderTask task = config.loadConfig(BulkLoaderTask.class);

        final ExecutorPlugin exec = newExecutorPlugin(task);
        final ProcessPluginSet plugins = new ProcessPluginSet(task);

        final LoaderState state = newLoaderState(logger, plugins);
        state.setTransactionStage(TransactionStage.INPUT_BEGIN);
        try {
            ConfigDiff inputConfigDiff = plugins.getInputPlugin().transaction(task.getInputConfig(), new InputPlugin.Control() {
                public List<TaskReport> run(final TaskSource inputTask, final Schema inputSchema, final int inputTaskCount) {
                    state.setInputTaskSource(inputTask);
                    state.setTransactionStage(TransactionStage.FILTER_BEGIN);
                    Filters.transaction(plugins.getFilterPlugins(), task.getFilterConfigs(), inputSchema, new Filters.Control() {
                        public void run(final List<TaskSource> filterTasks, final List<Schema> schemas) {
                            state.setSchemas(schemas);
                            state.setFilterTaskSources(filterTasks);
                            state.setTransactionStage(TransactionStage.EXECUTOR_BEGIN);
                            exec.transaction(task.getExecConfig(), last(schemas), inputTaskCount, new ExecutorPlugin.Control() {
                                public void transaction(final Schema executorSchema, final int outputTaskCount, final ExecutorPlugin.Executor executor) {
                                    state.setExecutorSchema(executorSchema);
                                    state.setTransactionStage(TransactionStage.OUTPUT_BEGIN);
                                    @SuppressWarnings("checkstyle:LineLength")
                                    ConfigDiff outputConfigDiff = plugins.getOutputPlugin().transaction(task.getOutputConfig(), executorSchema, outputTaskCount, new OutputPlugin.Control() {
                                        public List<TaskReport> run(final TaskSource outputTask) {
                                            state.setOutputTaskSource(outputTask);
                                            state.initialize(inputTaskCount, outputTaskCount);
                                            state.setTransactionStage(TransactionStage.RUN);

                                            if (!state.isAllTasksCommitted()) {  // inputTaskCount == 0
                                                execute(task, executor, state);
                                            }

                                            if (!state.isAllTasksCommitted()) {
                                                throw new RuntimeException(String.format("%d input tasks and %d output tasks failed",
                                                            state.countUncommittedInputTasks(), state.countUncommittedOutputTasks()));
                                            }

                                            state.setTransactionStage(TransactionStage.OUTPUT_COMMIT);
                                            return state.getAllOutputTaskReports();
                                        }
                                    });
                                    state.setOutputConfigDiff(outputConfigDiff);
                                    state.setTransactionStage(TransactionStage.EXECUTOR_COMMIT);
                                }
                            });
                            state.setTransactionStage(TransactionStage.FILTER_COMMIT);
                        }
                    });
                    state.setTransactionStage(TransactionStage.INPUT_COMMIT);
                    return state.getAllInputTaskReports();
                }
            });
            state.setInputConfigDiff(inputConfigDiff);
            state.setTransactionStage(TransactionStage.CLEANUP);

            cleanupCommittedTransaction(config, state);

            return state.buildExecuteResult();

        } catch (Throwable ex) {
            if (isSkippedTransaction(ex)) {
                ConfigDiff configDiff = ((SkipTransactionException) ex).getConfigDiff();
                return state.buildExecuteResultOfSkippedExecution(configDiff);
            } else if (state.isAllTasksCommitted() && state.isAllTransactionsCommitted()) {
                // ignore the exception
                return state.buildExecuteResultWithWarningException(ex);
            }
            throw state.buildPartialExecuteException(ex, Exec.session());
        }
    }

ref. https://github.com/embulk/embulk/blob/v0.9.23/embulk-core/src/main/java/org/embulk/exec/BulkLoader.java#L498-L568

上記の処理をざっくりまとめると以下のような処理を行っています。

  1. Input Plugin の transaction を start
  2. Filter Plugins の transaction を start
  3. Executor Plugin の transaction を start
  4. Output Plugin の transaction を start
  5. Output Plugin の openPageOutput を開く
  6. Filter Plugins の open で Output Plugin の PageOutput につなぐ形で PageOutput を開く
  7. Input Plugin の run でデータソースから Filter Plugins の PageOutput へデータを投入
  8. Output Plugin の transaction を finish
  9. Executor Plugin の transaction を finish
  10. Filter Plugins の transaction を finish
  11. Input Plugin の transaction を finish

この処理に BreakinBulkLoader は割り込んで別の Embulk の処理を入れていきます。具体的には以下のコードです。

BreakinBulkLoader.scala
  def transaction(f: Schema => Unit): Unit = {
    ThreadNameContext.switch(s"$loaderName:#transaction") {
      ctxt: ThreadNameContext =>
        try {
          runInput {
            runFilters {
              // NOTE: This "f" calls "control.run" that is defined in UnionInputPlugin.
              //       And then "control.run" calls UnionInputPlugin#run. This means
              //       BreakinBulkLoader#run is called inside "f". So the plugins
              //       executions order become the same as the Embulk BulkLoader.
              //       ref. https://github.com/embulk/embulk/blob/c532e7c084ef7041914ec6b119522f6cb7dcf8e8/embulk-core/src/main/java/org/embulk/exec/BulkLoader.java#L498-L568
              ctxt.switch { _ => f(lastFilterSchema) }
            }
          }
        }
        catch {
          // NOTE: BreakinBulkLoader does not catch SkipTransactionException
          //       because this exception should be handled in the original
          //       BulkLoader to stop the output plugin ingesting.
          // NOTE: BreakinBulkLoader catch only the exception wrapped by
          //       BreakinBulkLoader.Exception that has this BreakinBulkLoader's
          //       name, because any other exceptions are not thrown by this
          //       BreakinBulkLoader.
          // NOTE: BreakinBulkLoader allows to suppress exceptions only when
          //       the all tasks and all transactions are committed.
          case ex: BreakinBulkLoader.Exception
              if (ex.name == loaderName && state.isAllTasksCommitted && state.isAllTransactionsCommitted) =>
            logger.warn(
              s"Threw exception on the stage: ${ex.transactionStage.map(_.name()).getOrElse("None")}," +
                s" but all tasks and transactions are committed.",
              ex
            )
        }
    }
  }

  def run(schema: Schema, output: PageOutput): Unit = {
    ThreadNameContext.switch(s"$loaderName:#run") { _ =>
      val outputPlugin = PipeOutputPlugin(output)
      val executorPlugin = ReuseOutputLocalExecutorPlugin(outputPlugin)
      try {
        runExecutor(executorPlugin, schema) {
          (executor: ExecutorPlugin.Executor) =>
            runOutput(outputPlugin) {
              execute(executor)
            }
        }
      }
      catch {
        // NOTE: Wrap the exception by BreakinBulkLoader.Exception
        //       in order to identify this exception thrown by this
        //       BreakinBulkLoader.
        case ex: Throwable => throw buildException(ex)
      }
    }
  }

ref. https://github.com/civitaspo/embulk-input-union/blob/0.0.1/src/main/scala/pro/civitaspo/embulk/input/union/BreakinBulkLoader.scala#L114-L169

上記コードの transaction method 及び run method が本体の Embulk における Input Plugin の transactionrun のタイミングで呼ばれます。そのため、先ほどの処理まとめに追記すると以下のような処理になります。便宜上 embulk-input-union が走らせる Embulk を union Embulk と呼びます。

  1. Input Plugin の transaction を start
    1. union Embulk: Input Plugin の transaction を start
    2. union Embulk: Filter Plugins の transaction を start
    3. ...(定義した数だけ処理を繰り返す)
  2. Filter Plugins の transaction を start
  3. Executor Plugin の transaction を start
  4. Output Plugin の transaction を start
  5. Output Plugin の openPageOutput を開く
  6. Filter Plugins の open で Output Plugin の PageOutput につなぐ形で PageOutput を開く
  7. Input Plugin の run でデータソースから Filter Plugins の PageOutput へデータを投入
    1. union Embulk: Executor Plugin の transaction を start
    2. union Embulk: Output Plugin の transaction を start
    3. union Embulk: Output Plugin の openPageOutput を開く
    4. union Embulk: Filter Plugins の open で Output Plugin の PageOutput につなぐ形で PageOutput を開く
    5. union Embulk: Input Plugin の run でデータソースから Filter Plugins の PageOutput へデータを投入
    6. union Embulk: Output Plugin の transaction を finish
    7. union Embulk: Executor Plugin の transaction を finish
    8. ...(定義した数だけ処理を繰り返す)
  8. Output Plugin の transaction を finish
  9. Executor Plugin の transaction を finish
  10. Filter Plugins の transaction を finish
  11. Input Plugin の transaction を finish
    1. union Embulk: Filter Plugins の transaction を finish
    2. union Embulk: Input Plugin の transaction を finish
    3. ...(定義した数だけ処理を繰り返す)

このように embulk-input-union が実行する Embulk は本体の処理に割り込む形で実行されています。

おわりに

この記事では embulk-input-union の使い方と内部実装の詳細を説明しました。是非皆さんも使ってみてください。

10
4
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
10
4