10
4

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

embulk-input-union の紹介

Last updated at Posted at 2020-12-12

この記事はZOZOテクノロジーズ #2 Advent Calendar 2020 17日目の記事です。

はじめに

こんにちは @civitaspo です。先日新しい Embulk Pluginembulk-input-union をリリースしました。この記事ではこの embulk-input-union の使い方や実装内容について紹介しようと思います。

なにするやつか

皆さん SQL の union 句はご存じだと思いますが、この Plugin は複数のデータソースを union して input として利用出来ます。 example を見ると雰囲気が伝わるかと思います。

code.yml
in:
  type: union
  union:
    - in:
        type: file
        path_prefix: ./example/data01.tsv
        parser:
          type: csv
          delimiter: "\t"
          skip_header_lines: 0
          null_string: ""
          columns:
            - { name: id, type: long }
            - { name: description, type: string }
            - { name: name, type: string }
            - { name: t, type: timestamp, format: "%Y-%m-%d %H:%M:%S %z" }
            - { name: payload, type: json }
          stop_on_invalid_record: true
      filters:
        - type: column
          add_columns:
            - { name: group_name, type: string, default: "data01" }
    - name: example
      in:
        type: file
        path_prefix: ./example/data02.tsv
        parser:
          type: csv
          delimiter: "\t"
          skip_header_lines: 0
          null_string: ""
          columns:
            - { name: id, type: long }
            - { name: description, type: string }
            - { name: name, type: string }
            - { name: t, type: timestamp, format: "%Y-%m-%d %H:%M:%S %z" }
            - { name: payload, type: json }
          stop_on_invalid_record: true
      filters:
        - type: column
          add_columns:
            - { name: group_name, type: string, default: "data02" }

out:
  type: stdout

type: unionunion というオプションに配列で Embulk の input 及び filter の設定らしき記述がありますね!
これは本当に Embulk の設定で、複数のデータソースから Embulk でそれぞれ filter まで処理した後 union して input として利用出来るのがこのプラグインの特徴です。

この例の記述では

  • ./example/data01.tsv からデータを読み出し filter で group_name というカラムを追加後 data01 という値を入れたもの
  • ./example/data02.tsv からデータを読み出し filter で group_name というカラムを追加後 data02 という値を入れたもの

以上の2つの結果が union されて stdout に出力される設定となります。

実際にこの例を走らせてみます。ちなみに ./example/data01.tsv./example/data02.tsv にはどちらにも同じ下記のデータが入っています。

./example/{data01.tsv,data02.tsv}
0	c20ef94602	c212c89f91	2017-10-24 03:54:35 +0900	{"a":0,"b":"99"}
1	330a9fc33a	e25b33b616	2017-10-22 19:53:31 +0900	{"a":1,"b":"a9"}
2	707b3b7588	90823c6a1f	2017-10-23 23:42:43 +0900	{"a":2,"b":"96"}
3	8d8288e66f		2017-10-22 06:12:13 +0900	{"a":3,"b":"86"}
4	c54d8b6481	e56a40571c	2017-10-23 04:59:16 +0900	{"a":4,"b":"d2"}
```

```
❯ embulk run config.yml
2020-12-11 23:16:59.370 +0900: Embulk v0.9.23
2020-12-11 23:17:00.187 +0900 [WARN] (main): DEPRECATION: JRuby org.jruby.embed.ScriptingContainer is directly injected.
2020-12-11 23:17:02.263 +0900 [INFO] (main): BUNDLE_GEMFILE is being set: "/Users/takahiro.nakayama/src/github.com/civitaspo/embulk-input-union/example/Gemfile"
2020-12-11 23:17:02.264 +0900 [INFO] (main): Gem's home and path are being cleared.
2020-12-11 23:17:03.612 +0900 [INFO] (main): Started Embulk v0.9.23
2020-12-11 23:17:03.727 +0900 [INFO] (0001:transaction): Loaded plugin embulk/input/union from a load path
2020-12-11 23:17:03.937 +0900 [INFO] (0001:transaction[ad5c2a8]:union[0]:in[file].filters[column]:#transaction): Listing local files at directory './example' filtering filename by prefix 'data01.tsv'
2020-12-11 23:17:03.938 +0900 [INFO] (0001:transaction[ad5c2a8]:union[0]:in[file].filters[column]:#transaction): "follow_symlinks" is set false. Note that symbolic links to directories are skipped.
2020-12-11 23:17:03.940 +0900 [INFO] (0001:transaction[ad5c2a8]:union[0]:in[file].filters[column]:#transaction): Loading files [./example/data01.tsv]
2020-12-11 23:17:04.128 +0900 [INFO] (0001:transaction[ad5c2a8]:union[0]:in[file].filters[column]:#transaction): Loaded plugin embulk-filter-column (0.7.1)
2020-12-11 23:17:04.155 +0900 [INFO] (0001:transaction[ad5c2a8]:union[1]:example:#transaction): Listing local files at directory './example' filtering filename by prefix 'data02.tsv'
2020-12-11 23:17:04.155 +0900 [INFO] (0001:transaction[ad5c2a8]:union[1]:example:#transaction): "follow_symlinks" is set false. Note that symbolic links to directories are skipped.
2020-12-11 23:17:04.156 +0900 [INFO] (0001:transaction[ad5c2a8]:union[1]:example:#transaction): Loading files [./example/data02.tsv]
2020-12-11 23:17:04.169 +0900 [INFO] (0001:transaction): Using local thread executor with max_threads=24 / output tasks 12 = input tasks 2 * 6
2020-12-11 23:17:04.180 +0900 [INFO] (0001:transaction): {done:  0 / 2, running: 0}
2020-12-11 23:17:04.226 +0900 [INFO] (0016:transaction[ad5c2a8]:union[1]:example:#run): Using local thread executor with max_threads=12 / tasks=1
2020-12-11 23:17:04.227 +0900 [INFO] (0015:transaction[ad5c2a8]:union[0]:in[file].filters[column]:#run): Using local thread executor with max_threads=12 / tasks=1
2020-12-11 23:17:04.262 +0900 [INFO] (0015:transaction[ad5c2a8]:union[0]:in[file].filters[column]:#run): {done:  0 / 1, running: 0}
2020-12-11 23:17:04.262 +0900 [INFO] (0016:transaction[ad5c2a8]:union[1]:example:#run): {done:  0 / 1, running: 0}
2020-12-11 23:17:04.360 +0900 [INFO] (0016:transaction[ad5c2a8]:union[1]:example:#run): {done:  1 / 1, running: 0}
2020-12-11 23:17:04.360 +0900 [INFO] (0015:transaction[ad5c2a8]:union[0]:in[file].filters[column]:#run): {done:  1 / 1, running: 0}
0,c20ef94602,c212c89f91,2017-10-23 18:54:35 +0000,{"a":0,"b":"99"},data02
0,c20ef94602,c212c89f91,2017-10-23 18:54:35 +0000,{"a":0,"b":"99"},data01
1,330a9fc33a,e25b33b616,2017-10-22 10:53:31 +0000,{"a":1,"b":"a9"},data02
1,330a9fc33a,e25b33b616,2017-10-22 10:53:31 +0000,{"a":1,"b":"a9"},data01
2,707b3b7588,90823c6a1f,2017-10-23 14:42:43 +0000,{"a":2,"b":"96"},data02
2,707b3b7588,90823c6a1f,2017-10-23 14:42:43 +0000,{"a":2,"b":"96"},data01
3,8d8288e66f,,2017-10-21 21:12:13 +0000,{"a":3,"b":"86"},data02
3,8d8288e66f,,2017-10-21 21:12:13 +0000,{"a":3,"b":"86"},data01
4,c54d8b6481,e56a40571c,2017-10-22 19:59:16 +0000,{"a":4,"b":"d2"},data02
4,c54d8b6481,e56a40571c,2017-10-22 19:59:16 +0000,{"a":4,"b":"d2"},data01
2020-12-11 23:17:04.365 +0900 [INFO] (0001:transaction): {done:  2 / 2, running: 0}
2020-12-11 23:17:04.365 +0900 [INFO] (0001:transaction): {done:  2 / 2, running: 0}
2020-12-11 23:17:04.406 +0900 [INFO] (main): Committed.
2020-12-11 23:17:04.407 +0900 [INFO] (main): Next config diff: {"in":{"union":[{"in":{"last_path":"./example/data01.tsv"}},{"in":{"last_path":"./example/data02.tsv"}}]},"out":{}}
```

2つのファイルが読み出され、それぞれ filter 処理で追加した `group_name` である `data01`, `data02` が付いた状態で出力されていることが確認出来ます。

# つかいかた

オプションはシンプルです。 `union` というオプションしかありません。 `union` option には union したいデータソースの数だけ配列で [Embulk の設定](https://www.embulk.org/docs/built-in.html#embulk-configuration-file-format)を記述します。
この [Embulk の設定](https://www.embulk.org/docs/built-in.html#embulk-configuration-file-format)は以下の点で少しだけ普通の [Embulk の設定](https://www.embulk.org/docs/built-in.html#embulk-configuration-file-format)とは異なります。

* `out` option は記述しても無視される。
* `exec` option は [Local Executor Plugin](https://www.embulk.org/docs/built-in.html#local-executor-plugin) 固定、かつ `max_threads` オプションのみサポート
    * Local Executor Plugin は Embulk の Default の Executor Plugin なので現状困ることはないはずですが、今後有用な Executor Plugin が出てきたら機能拡張するかもしれません。

* `name` option が追加されています。
    * ログに出力するときに利用するのみで記述しなくても構いません。
    * デフォルトだと設定値のサマリが名称になります。
        * 上記の実行例で言うと `in[file].filters[column]` がソレにあたります。

ref. https://github.com/civitaspo/embulk-input-union#configuration

# 想定ユースケース

## 異なるミドルウェアからデータを抽出したい

異なるミドルウェアからデータを抽出したくなることがあります。具体例としては[ラムダアーキテクチャ](https://docs.microsoft.com/en-us/azure/architecture/data-guide/big-data/#lambda-architecture)を採用している場合で、 その瞬間の全てのデータを得るためには speed-layer と batch-layer の両方からデータを抽出する必要があります。このときデータが異なるミドルウェア上に載っていることがあります。そんなとき、 [embulk\-input\-union](https://github.com/civitaspo/embulk-input-union) が役に立ちます。

```example.yml
in:
  type: union
  union:
    - name: speed-layer
      in:
        type: kafka
        topics:
          - "access-log"
        serialize_format: json
        brokers:
          - "localhost:9092"
        fetch_max_wait_ms: 10000
        columns:
          - {name: id, type: string}
          - {name: time, type: timestamp, format: "%Y-%m-%dT%H:%M:%SZ"}
          - {name: path, type: string}
          - {name: data, type: json}
          - {name: _key, type: string}
          - {name: _partition, type: long}
    - name: batch-layer
      in:
        type: s3
        bucket: my-s3-bucket
        path_prefix: access-logs/d=20201212/
        endpoint: s3-us-west-1.amazonaws.com
        access_key_id: ABCXYZ123ABCXYZ123
        secret_access_key: AbCxYz123aBcXyZ123
        decoders:
          - type: gzip
        parser:
          type: json
          columns:
            - {name: id, type: string}
            - {name: time, type: timestamp, format: "%Y-%m-%dT%H:%M:%SZ"}
            - {name: path, type: string}
            - {name: data, type: json}
            - {name: _key, type: string}
            - {name: _partition, type: long}

out:
  type: stdout
```

## 様々な API からデータを抽出し整形して一カ所にまとめたい

様々な API からデータを抽出し整形して一カ所にまとめたくなることがあります。例えば複数の広告事業者を利用していて、そのレポートデータを整形して1つのテーブルにまとめたいといったケースです。そんなときも、 [embulk\-input\-union](https://github.com/civitaspo/embulk-input-union) が役に立ちます。

```example.yml
in:
  type: union
  union:
    - name: GoogleAds
      in:
        type: http
        url: ...
        ...snip...
      filters:
        - type: column
          ...snip...
    - name: Facebook Audience Network
      in:
        type: http
        url: ...
        ...snip...
      filters
        - type: column
          ...snip...
        - type: column
          ...snip...
    - name: Twitter Ads
      in:
        type: http
        url: ...
        ...snip...
      filters
        - type: column
          ...snip...
        - type: column
          ...snip...
        - type: column
          ...snip...

out:
  type: mysql
  ...snip...
```

(具体的な記述ができなくて申し訳ない:pray:)

## File の Path をカラムに追加したい

冒頭の例のようなユースケースです。 Embulk では load したファイルの path や名称をデータとして取得出来ません。Hive などではパーティションの key, value を実データには持たず path に保持する仕様なので、パーティションの値が取得出来ずに困る場合があります。そんなときも、 [embulk\-input\-union](https://github.com/civitaspo/embulk-input-union) が役に立ちます。

※ 例は冒頭に書いたので省略します。

# 実装詳細

ここからはどういう仕組みで [embulk\-input\-union](https://github.com/civitaspo/embulk-input-union) が動いているのか詳解したいと思います。

## [PipeOutputPlugin](https://github.com/civitaspo/embulk-input-union/blob/0.0.1/src/main/scala/pro/civitaspo/embulk/input/union/plugin/PipeOutputPlugin.scala)

[embulk\-input\-union](https://github.com/civitaspo/embulk-input-union) では定義された設定に対して [PipeOutputPlugin](https://github.com/civitaspo/embulk-input-union/blob/0.0.1/src/main/scala/pro/civitaspo/embulk/input/union/plugin/PipeOutputPlugin.scala) という内部 Plugin を Output Plugin として利用するようにしています。この Plugin は読んで字のごとく [embulk\-input\-union](https://github.com/civitaspo/embulk-input-union) で実行された Embulk のデータを本体の Embulk にパイプする役割を持っています。

## [ReuseOutputLocalExecutorPlugin](https://github.com/civitaspo/embulk-input-union/blob/0.0.1/src/main/scala/pro/civitaspo/embulk/input/union/plugin/ReuseOutputLocalExecutorPlugin.scala)

[embulk\-input\-union](https://github.com/civitaspo/embulk-input-union) では Executor Plugin も内部実装しています。この Executor Plugin は Embulk の [DirectExecutor](https://github.com/embulk/embulk/blob/v0.9.23/embulk-core/src/main/java/org/embulk/exec/LocalExecutorPlugin.java#L143-L192) をベースに Output Plugin が再利用されるような変更を加えたものです。なぜこのような実装が必要なのかは Embulk の Plugin の呼び出し方に理由があります。

Embulk では `transaction` や `open` といった Plugin のインターフェースを実装するときにインスタンスの変数に依存した処理を書いてはいけないというのが暗黙知として存在します。コレは Embulk が内部で method を呼び出すときに毎回 Task から deserialize するためです。

今回の [embulk\-input\-union](https://github.com/civitaspo/embulk-input-union) の処理では本体の Embulk から受け取った `PageOutput` オブジェクトをそのまま [PipeOutputPlugin](https://github.com/civitaspo/embulk-input-union/blob/0.0.1/src/main/scala/pro/civitaspo/embulk/input/union/plugin/PipeOutputPlugin.scala) で使い続けないとデータの受け渡しができません。そのため、[PipeOutputPlugin](https://github.com/civitaspo/embulk-input-union/blob/0.0.1/src/main/scala/pro/civitaspo/embulk/input/union/plugin/PipeOutputPlugin.scala) が再作成されないような Executor Plugin を利用する必要があり [ReuseOutputLocalExecutorPlugin](https://github.com/civitaspo/embulk-input-union/blob/0.0.1/src/main/scala/pro/civitaspo/embulk/input/union/plugin/ReuseOutputLocalExecutorPlugin.scala) が生まれたのでした。


## [BreakinBulkLoader](https://github.com/civitaspo/embulk-input-union/blob/0.0.1/src/main/scala/pro/civitaspo/embulk/input/union/BreakinBulkLoader.scala)

[embulk\-input\-union](https://github.com/civitaspo/embulk-input-union) では内部で Embulk 相当の挙動を実現するため [BreakinBulkLoader](https://github.com/civitaspo/embulk-input-union/blob/0.0.1/src/main/scala/pro/civitaspo/embulk/input/union/BreakinBulkLoader.scala) というクラスを実装しています。このクラスは Embulk の [BulkLoader](https://github.com/embulk/embulk/blob/v0.9.23/embulk-core/src/main/java/org/embulk/exec/BulkLoader.java) に割り込む(break-in)形で別の Embulk を走らせるためこのような名前を付けています。

具体的にどういう挙動をするのか説明します。まず Embulk の [BulkLoader](https://github.com/embulk/embulk/blob/v0.9.23/embulk-core/src/main/java/org/embulk/exec/BulkLoader.java) の処理を見てみます。

```BulkLoader.java
    private ExecutionResult doRun(ConfigSource config) {
        final BulkLoaderTask task = config.loadConfig(BulkLoaderTask.class);

        final ExecutorPlugin exec = newExecutorPlugin(task);
        final ProcessPluginSet plugins = new ProcessPluginSet(task);

        final LoaderState state = newLoaderState(logger, plugins);
        state.setTransactionStage(TransactionStage.INPUT_BEGIN);
        try {
            ConfigDiff inputConfigDiff = plugins.getInputPlugin().transaction(task.getInputConfig(), new InputPlugin.Control() {
                public List<TaskReport> run(final TaskSource inputTask, final Schema inputSchema, final int inputTaskCount) {
                    state.setInputTaskSource(inputTask);
                    state.setTransactionStage(TransactionStage.FILTER_BEGIN);
                    Filters.transaction(plugins.getFilterPlugins(), task.getFilterConfigs(), inputSchema, new Filters.Control() {
                        public void run(final List<TaskSource> filterTasks, final List<Schema> schemas) {
                            state.setSchemas(schemas);
                            state.setFilterTaskSources(filterTasks);
                            state.setTransactionStage(TransactionStage.EXECUTOR_BEGIN);
                            exec.transaction(task.getExecConfig(), last(schemas), inputTaskCount, new ExecutorPlugin.Control() {
                                public void transaction(final Schema executorSchema, final int outputTaskCount, final ExecutorPlugin.Executor executor) {
                                    state.setExecutorSchema(executorSchema);
                                    state.setTransactionStage(TransactionStage.OUTPUT_BEGIN);
                                    @SuppressWarnings("checkstyle:LineLength")
                                    ConfigDiff outputConfigDiff = plugins.getOutputPlugin().transaction(task.getOutputConfig(), executorSchema, outputTaskCount, new OutputPlugin.Control() {
                                        public List<TaskReport> run(final TaskSource outputTask) {
                                            state.setOutputTaskSource(outputTask);
                                            state.initialize(inputTaskCount, outputTaskCount);
                                            state.setTransactionStage(TransactionStage.RUN);

                                            if (!state.isAllTasksCommitted()) {  // inputTaskCount == 0
                                                execute(task, executor, state);
                                            }

                                            if (!state.isAllTasksCommitted()) {
                                                throw new RuntimeException(String.format("%d input tasks and %d output tasks failed",
                                                            state.countUncommittedInputTasks(), state.countUncommittedOutputTasks()));
                                            }

                                            state.setTransactionStage(TransactionStage.OUTPUT_COMMIT);
                                            return state.getAllOutputTaskReports();
                                        }
                                    });
                                    state.setOutputConfigDiff(outputConfigDiff);
                                    state.setTransactionStage(TransactionStage.EXECUTOR_COMMIT);
                                }
                            });
                            state.setTransactionStage(TransactionStage.FILTER_COMMIT);
                        }
                    });
                    state.setTransactionStage(TransactionStage.INPUT_COMMIT);
                    return state.getAllInputTaskReports();
                }
            });
            state.setInputConfigDiff(inputConfigDiff);
            state.setTransactionStage(TransactionStage.CLEANUP);

            cleanupCommittedTransaction(config, state);

            return state.buildExecuteResult();

        } catch (Throwable ex) {
            if (isSkippedTransaction(ex)) {
                ConfigDiff configDiff = ((SkipTransactionException) ex).getConfigDiff();
                return state.buildExecuteResultOfSkippedExecution(configDiff);
            } else if (state.isAllTasksCommitted() && state.isAllTransactionsCommitted()) {
                // ignore the exception
                return state.buildExecuteResultWithWarningException(ex);
            }
            throw state.buildPartialExecuteException(ex, Exec.session());
        }
    }
```

ref. https://github.com/embulk/embulk/blob/v0.9.23/embulk-core/src/main/java/org/embulk/exec/BulkLoader.java#L498-L568

上記の処理をざっくりまとめると以下のような処理を行っています。

1. Input Plugin の `transaction` を start
2. Filter Plugins の `transaction` を start
3. Executor Plugin の `transaction` を start
4. Output Plugin の `transaction` を start
5. Output Plugin の `open` で `PageOutput` を開く
6. Filter Plugins の `open` で Output Plugin の `PageOutput` につなぐ形で `PageOutput` を開く
7. Input Plugin の `run` でデータソースから Filter Plugins の `PageOutput` へデータを投入
8. Output Plugin の `transaction` を finish
9. Executor Plugin の `transaction` を finish
10. Filter Plugins の `transaction` を finish 
11. Input Plugin の `transaction` を finish

この処理に [BreakinBulkLoader](https://github.com/civitaspo/embulk-input-union/blob/0.0.1/src/main/scala/pro/civitaspo/embulk/input/union/BreakinBulkLoader.scala) は割り込んで別の Embulk の処理を入れていきます。具体的には以下のコードです。

```BreakinBulkLoader.scala
  def transaction(f: Schema => Unit): Unit = {
    ThreadNameContext.switch(s"$loaderName:#transaction") {
      ctxt: ThreadNameContext =>
        try {
          runInput {
            runFilters {
              // NOTE: This "f" calls "control.run" that is defined in UnionInputPlugin.
              //       And then "control.run" calls UnionInputPlugin#run. This means
              //       BreakinBulkLoader#run is called inside "f". So the plugins
              //       executions order become the same as the Embulk BulkLoader.
              //       ref. https://github.com/embulk/embulk/blob/c532e7c084ef7041914ec6b119522f6cb7dcf8e8/embulk-core/src/main/java/org/embulk/exec/BulkLoader.java#L498-L568
              ctxt.switch { _ => f(lastFilterSchema) }
            }
          }
        }
        catch {
          // NOTE: BreakinBulkLoader does not catch SkipTransactionException
          //       because this exception should be handled in the original
          //       BulkLoader to stop the output plugin ingesting.
          // NOTE: BreakinBulkLoader catch only the exception wrapped by
          //       BreakinBulkLoader.Exception that has this BreakinBulkLoader's
          //       name, because any other exceptions are not thrown by this
          //       BreakinBulkLoader.
          // NOTE: BreakinBulkLoader allows to suppress exceptions only when
          //       the all tasks and all transactions are committed.
          case ex: BreakinBulkLoader.Exception
              if (ex.name == loaderName && state.isAllTasksCommitted && state.isAllTransactionsCommitted) =>
            logger.warn(
              s"Threw exception on the stage: ${ex.transactionStage.map(_.name()).getOrElse("None")}," +
                s" but all tasks and transactions are committed.",
              ex
            )
        }
    }
  }

  def run(schema: Schema, output: PageOutput): Unit = {
    ThreadNameContext.switch(s"$loaderName:#run") { _ =>
      val outputPlugin = PipeOutputPlugin(output)
      val executorPlugin = ReuseOutputLocalExecutorPlugin(outputPlugin)
      try {
        runExecutor(executorPlugin, schema) {
          (executor: ExecutorPlugin.Executor) =>
            runOutput(outputPlugin) {
              execute(executor)
            }
        }
      }
      catch {
        // NOTE: Wrap the exception by BreakinBulkLoader.Exception
        //       in order to identify this exception thrown by this
        //       BreakinBulkLoader.
        case ex: Throwable => throw buildException(ex)
      }
    }
  }
```

ref. https://github.com/civitaspo/embulk-input-union/blob/0.0.1/src/main/scala/pro/civitaspo/embulk/input/union/BreakinBulkLoader.scala#L114-L169

上記コードの `transaction` method 及び `run` method が本体の Embulk における Input Plugin の `transaction` と `run` のタイミングで呼ばれます。そのため、先ほどの処理まとめに追記すると以下のような処理になります。便宜上 [embulk\-input\-union](https://github.com/civitaspo/embulk-input-union) が走らせる Embulk を union Embulk と呼びます。

1. Input Plugin の `transaction` を start
    1. union Embulk: Input Plugin の `transaction` を start
    2. union Embulk: Filter Plugins の `transaction` を start
    3. ...(定義した数だけ処理を繰り返す)
2. Filter Plugins の `transaction` を start
3. Executor Plugin の `transaction` を start
4. Output Plugin の `transaction` を start
5. Output Plugin の `open` で `PageOutput` を開く
6. Filter Plugins の `open` で Output Plugin の `PageOutput` につなぐ形で `PageOutput` を開く
7. Input Plugin の `run` でデータソースから Filter Plugins の `PageOutput` へデータを投入
    1. union Embulk: Executor Plugin の `transaction` を start
    2. union Embulk: Output Plugin の `transaction` を start
    3. union Embulk: Output Plugin の `open` で `PageOutput` を開く
    6. union Embulk: Filter Plugins の `open` で Output Plugin の `PageOutput` につなぐ形で `PageOutput` を開く
    7. union Embulk: Input Plugin の `run` でデータソースから Filter Plugins の `PageOutput` へデータを投入
    8. union Embulk: Output Plugin の `transaction` を finish
    9. union Embulk: Executor Plugin の `transaction` を finish
    10. ...(定義した数だけ処理を繰り返す)
8. Output Plugin の `transaction` を finish
9. Executor Plugin の `transaction` を finish
10. Filter Plugins の `transaction` を finish 
11. Input Plugin の `transaction` を finish
    12. union Embulk: Filter Plugins の `transaction` を finish 
    11. union Embulk: Input Plugin の `transaction` を finish
    12. ...(定義した数だけ処理を繰り返す)

このように [embulk\-input\-union](https://github.com/civitaspo/embulk-input-union) が実行する Embulk は本体の処理に割り込む形で実行されています。

# おわりに

この記事では [embulk\-input\-union](https://github.com/civitaspo/embulk-input-union) の使い方と内部実装の詳細を説明しました。是非皆さんも使ってみてください。
10
4
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
10
4

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?