この記事はZOZOテクノロジーズ #2 Advent Calendar 2020 17日目の記事です。
はじめに
こんにちは @civitaspo です。先日新しい Embulk Plugin の embulk-input-union をリリースしました。この記事ではこの embulk-input-union の使い方や実装内容について紹介しようと思います。
なにするやつか
皆さん SQL の union 句はご存じだと思いますが、この Plugin は複数のデータソースを union して input として利用出来ます。 example を見ると雰囲気が伝わるかと思います。
in:
type: union
union:
- in:
type: file
path_prefix: ./example/data01.tsv
parser:
type: csv
delimiter: "\t"
skip_header_lines: 0
null_string: ""
columns:
- { name: id, type: long }
- { name: description, type: string }
- { name: name, type: string }
- { name: t, type: timestamp, format: "%Y-%m-%d %H:%M:%S %z" }
- { name: payload, type: json }
stop_on_invalid_record: true
filters:
- type: column
add_columns:
- { name: group_name, type: string, default: "data01" }
- name: example
in:
type: file
path_prefix: ./example/data02.tsv
parser:
type: csv
delimiter: "\t"
skip_header_lines: 0
null_string: ""
columns:
- { name: id, type: long }
- { name: description, type: string }
- { name: name, type: string }
- { name: t, type: timestamp, format: "%Y-%m-%d %H:%M:%S %z" }
- { name: payload, type: json }
stop_on_invalid_record: true
filters:
- type: column
add_columns:
- { name: group_name, type: string, default: "data02" }
out:
type: stdout
type: union
の union
というオプションに配列で Embulk の input 及び filter の設定らしき記述がありますね!
これは本当に Embulk の設定で、複数のデータソースから Embulk でそれぞれ filter まで処理した後 union して input として利用出来るのがこのプラグインの特徴です。
この例の記述では
-
./example/data01.tsv
からデータを読み出し filter でgroup_name
というカラムを追加後data01
という値を入れたもの -
./example/data02.tsv
からデータを読み出し filter でgroup_name
というカラムを追加後data02
という値を入れたもの
以上の2つの結果が union されて stdout に出力される設定となります。
実際にこの例を走らせてみます。ちなみに ./example/data01.tsv
と ./example/data02.tsv
にはどちらにも同じ下記のデータが入っています。
0 c20ef94602 c212c89f91 2017-10-24 03:54:35 +0900 {"a":0,"b":"99"}
1 330a9fc33a e25b33b616 2017-10-22 19:53:31 +0900 {"a":1,"b":"a9"}
2 707b3b7588 90823c6a1f 2017-10-23 23:42:43 +0900 {"a":2,"b":"96"}
3 8d8288e66f 2017-10-22 06:12:13 +0900 {"a":3,"b":"86"}
4 c54d8b6481 e56a40571c 2017-10-23 04:59:16 +0900 {"a":4,"b":"d2"}
```
```
❯ embulk run config.yml
2020-12-11 23:16:59.370 +0900: Embulk v0.9.23
2020-12-11 23:17:00.187 +0900 [WARN] (main): DEPRECATION: JRuby org.jruby.embed.ScriptingContainer is directly injected.
2020-12-11 23:17:02.263 +0900 [INFO] (main): BUNDLE_GEMFILE is being set: "/Users/takahiro.nakayama/src/github.com/civitaspo/embulk-input-union/example/Gemfile"
2020-12-11 23:17:02.264 +0900 [INFO] (main): Gem's home and path are being cleared.
2020-12-11 23:17:03.612 +0900 [INFO] (main): Started Embulk v0.9.23
2020-12-11 23:17:03.727 +0900 [INFO] (0001:transaction): Loaded plugin embulk/input/union from a load path
2020-12-11 23:17:03.937 +0900 [INFO] (0001:transaction[ad5c2a8]:union[0]:in[file].filters[column]:#transaction): Listing local files at directory './example' filtering filename by prefix 'data01.tsv'
2020-12-11 23:17:03.938 +0900 [INFO] (0001:transaction[ad5c2a8]:union[0]:in[file].filters[column]:#transaction): "follow_symlinks" is set false. Note that symbolic links to directories are skipped.
2020-12-11 23:17:03.940 +0900 [INFO] (0001:transaction[ad5c2a8]:union[0]:in[file].filters[column]:#transaction): Loading files [./example/data01.tsv]
2020-12-11 23:17:04.128 +0900 [INFO] (0001:transaction[ad5c2a8]:union[0]:in[file].filters[column]:#transaction): Loaded plugin embulk-filter-column (0.7.1)
2020-12-11 23:17:04.155 +0900 [INFO] (0001:transaction[ad5c2a8]:union[1]:example:#transaction): Listing local files at directory './example' filtering filename by prefix 'data02.tsv'
2020-12-11 23:17:04.155 +0900 [INFO] (0001:transaction[ad5c2a8]:union[1]:example:#transaction): "follow_symlinks" is set false. Note that symbolic links to directories are skipped.
2020-12-11 23:17:04.156 +0900 [INFO] (0001:transaction[ad5c2a8]:union[1]:example:#transaction): Loading files [./example/data02.tsv]
2020-12-11 23:17:04.169 +0900 [INFO] (0001:transaction): Using local thread executor with max_threads=24 / output tasks 12 = input tasks 2 * 6
2020-12-11 23:17:04.180 +0900 [INFO] (0001:transaction): {done: 0 / 2, running: 0}
2020-12-11 23:17:04.226 +0900 [INFO] (0016:transaction[ad5c2a8]:union[1]:example:#run): Using local thread executor with max_threads=12 / tasks=1
2020-12-11 23:17:04.227 +0900 [INFO] (0015:transaction[ad5c2a8]:union[0]:in[file].filters[column]:#run): Using local thread executor with max_threads=12 / tasks=1
2020-12-11 23:17:04.262 +0900 [INFO] (0015:transaction[ad5c2a8]:union[0]:in[file].filters[column]:#run): {done: 0 / 1, running: 0}
2020-12-11 23:17:04.262 +0900 [INFO] (0016:transaction[ad5c2a8]:union[1]:example:#run): {done: 0 / 1, running: 0}
2020-12-11 23:17:04.360 +0900 [INFO] (0016:transaction[ad5c2a8]:union[1]:example:#run): {done: 1 / 1, running: 0}
2020-12-11 23:17:04.360 +0900 [INFO] (0015:transaction[ad5c2a8]:union[0]:in[file].filters[column]:#run): {done: 1 / 1, running: 0}
0,c20ef94602,c212c89f91,2017-10-23 18:54:35 +0000,{"a":0,"b":"99"},data02
0,c20ef94602,c212c89f91,2017-10-23 18:54:35 +0000,{"a":0,"b":"99"},data01
1,330a9fc33a,e25b33b616,2017-10-22 10:53:31 +0000,{"a":1,"b":"a9"},data02
1,330a9fc33a,e25b33b616,2017-10-22 10:53:31 +0000,{"a":1,"b":"a9"},data01
2,707b3b7588,90823c6a1f,2017-10-23 14:42:43 +0000,{"a":2,"b":"96"},data02
2,707b3b7588,90823c6a1f,2017-10-23 14:42:43 +0000,{"a":2,"b":"96"},data01
3,8d8288e66f,,2017-10-21 21:12:13 +0000,{"a":3,"b":"86"},data02
3,8d8288e66f,,2017-10-21 21:12:13 +0000,{"a":3,"b":"86"},data01
4,c54d8b6481,e56a40571c,2017-10-22 19:59:16 +0000,{"a":4,"b":"d2"},data02
4,c54d8b6481,e56a40571c,2017-10-22 19:59:16 +0000,{"a":4,"b":"d2"},data01
2020-12-11 23:17:04.365 +0900 [INFO] (0001:transaction): {done: 2 / 2, running: 0}
2020-12-11 23:17:04.365 +0900 [INFO] (0001:transaction): {done: 2 / 2, running: 0}
2020-12-11 23:17:04.406 +0900 [INFO] (main): Committed.
2020-12-11 23:17:04.407 +0900 [INFO] (main): Next config diff: {"in":{"union":[{"in":{"last_path":"./example/data01.tsv"}},{"in":{"last_path":"./example/data02.tsv"}}]},"out":{}}
```
2つのファイルが読み出され、それぞれ filter 処理で追加した `group_name` である `data01`, `data02` が付いた状態で出力されていることが確認出来ます。
# つかいかた
オプションはシンプルです。 `union` というオプションしかありません。 `union` option には union したいデータソースの数だけ配列で [Embulk の設定](https://www.embulk.org/docs/built-in.html#embulk-configuration-file-format)を記述します。
この [Embulk の設定](https://www.embulk.org/docs/built-in.html#embulk-configuration-file-format)は以下の点で少しだけ普通の [Embulk の設定](https://www.embulk.org/docs/built-in.html#embulk-configuration-file-format)とは異なります。
* `out` option は記述しても無視される。
* `exec` option は [Local Executor Plugin](https://www.embulk.org/docs/built-in.html#local-executor-plugin) 固定、かつ `max_threads` オプションのみサポート
* Local Executor Plugin は Embulk の Default の Executor Plugin なので現状困ることはないはずですが、今後有用な Executor Plugin が出てきたら機能拡張するかもしれません。
* `name` option が追加されています。
* ログに出力するときに利用するのみで記述しなくても構いません。
* デフォルトだと設定値のサマリが名称になります。
* 上記の実行例で言うと `in[file].filters[column]` がソレにあたります。
ref. https://github.com/civitaspo/embulk-input-union#configuration
# 想定ユースケース
## 異なるミドルウェアからデータを抽出したい
異なるミドルウェアからデータを抽出したくなることがあります。具体例としては[ラムダアーキテクチャ](https://docs.microsoft.com/en-us/azure/architecture/data-guide/big-data/#lambda-architecture)を採用している場合で、 その瞬間の全てのデータを得るためには speed-layer と batch-layer の両方からデータを抽出する必要があります。このときデータが異なるミドルウェア上に載っていることがあります。そんなとき、 [embulk\-input\-union](https://github.com/civitaspo/embulk-input-union) が役に立ちます。
```example.yml
in:
type: union
union:
- name: speed-layer
in:
type: kafka
topics:
- "access-log"
serialize_format: json
brokers:
- "localhost:9092"
fetch_max_wait_ms: 10000
columns:
- {name: id, type: string}
- {name: time, type: timestamp, format: "%Y-%m-%dT%H:%M:%SZ"}
- {name: path, type: string}
- {name: data, type: json}
- {name: _key, type: string}
- {name: _partition, type: long}
- name: batch-layer
in:
type: s3
bucket: my-s3-bucket
path_prefix: access-logs/d=20201212/
endpoint: s3-us-west-1.amazonaws.com
access_key_id: ABCXYZ123ABCXYZ123
secret_access_key: AbCxYz123aBcXyZ123
decoders:
- type: gzip
parser:
type: json
columns:
- {name: id, type: string}
- {name: time, type: timestamp, format: "%Y-%m-%dT%H:%M:%SZ"}
- {name: path, type: string}
- {name: data, type: json}
- {name: _key, type: string}
- {name: _partition, type: long}
out:
type: stdout
```
## 様々な API からデータを抽出し整形して一カ所にまとめたい
様々な API からデータを抽出し整形して一カ所にまとめたくなることがあります。例えば複数の広告事業者を利用していて、そのレポートデータを整形して1つのテーブルにまとめたいといったケースです。そんなときも、 [embulk\-input\-union](https://github.com/civitaspo/embulk-input-union) が役に立ちます。
```example.yml
in:
type: union
union:
- name: GoogleAds
in:
type: http
url: ...
...snip...
filters:
- type: column
...snip...
- name: Facebook Audience Network
in:
type: http
url: ...
...snip...
filters
- type: column
...snip...
- type: column
...snip...
- name: Twitter Ads
in:
type: http
url: ...
...snip...
filters
- type: column
...snip...
- type: column
...snip...
- type: column
...snip...
out:
type: mysql
...snip...
```
(具体的な記述ができなくて申し訳ない:pray:)
## File の Path をカラムに追加したい
冒頭の例のようなユースケースです。 Embulk では load したファイルの path や名称をデータとして取得出来ません。Hive などではパーティションの key, value を実データには持たず path に保持する仕様なので、パーティションの値が取得出来ずに困る場合があります。そんなときも、 [embulk\-input\-union](https://github.com/civitaspo/embulk-input-union) が役に立ちます。
※ 例は冒頭に書いたので省略します。
# 実装詳細
ここからはどういう仕組みで [embulk\-input\-union](https://github.com/civitaspo/embulk-input-union) が動いているのか詳解したいと思います。
## [PipeOutputPlugin](https://github.com/civitaspo/embulk-input-union/blob/0.0.1/src/main/scala/pro/civitaspo/embulk/input/union/plugin/PipeOutputPlugin.scala)
[embulk\-input\-union](https://github.com/civitaspo/embulk-input-union) では定義された設定に対して [PipeOutputPlugin](https://github.com/civitaspo/embulk-input-union/blob/0.0.1/src/main/scala/pro/civitaspo/embulk/input/union/plugin/PipeOutputPlugin.scala) という内部 Plugin を Output Plugin として利用するようにしています。この Plugin は読んで字のごとく [embulk\-input\-union](https://github.com/civitaspo/embulk-input-union) で実行された Embulk のデータを本体の Embulk にパイプする役割を持っています。
## [ReuseOutputLocalExecutorPlugin](https://github.com/civitaspo/embulk-input-union/blob/0.0.1/src/main/scala/pro/civitaspo/embulk/input/union/plugin/ReuseOutputLocalExecutorPlugin.scala)
[embulk\-input\-union](https://github.com/civitaspo/embulk-input-union) では Executor Plugin も内部実装しています。この Executor Plugin は Embulk の [DirectExecutor](https://github.com/embulk/embulk/blob/v0.9.23/embulk-core/src/main/java/org/embulk/exec/LocalExecutorPlugin.java#L143-L192) をベースに Output Plugin が再利用されるような変更を加えたものです。なぜこのような実装が必要なのかは Embulk の Plugin の呼び出し方に理由があります。
Embulk では `transaction` や `open` といった Plugin のインターフェースを実装するときにインスタンスの変数に依存した処理を書いてはいけないというのが暗黙知として存在します。コレは Embulk が内部で method を呼び出すときに毎回 Task から deserialize するためです。
今回の [embulk\-input\-union](https://github.com/civitaspo/embulk-input-union) の処理では本体の Embulk から受け取った `PageOutput` オブジェクトをそのまま [PipeOutputPlugin](https://github.com/civitaspo/embulk-input-union/blob/0.0.1/src/main/scala/pro/civitaspo/embulk/input/union/plugin/PipeOutputPlugin.scala) で使い続けないとデータの受け渡しができません。そのため、[PipeOutputPlugin](https://github.com/civitaspo/embulk-input-union/blob/0.0.1/src/main/scala/pro/civitaspo/embulk/input/union/plugin/PipeOutputPlugin.scala) が再作成されないような Executor Plugin を利用する必要があり [ReuseOutputLocalExecutorPlugin](https://github.com/civitaspo/embulk-input-union/blob/0.0.1/src/main/scala/pro/civitaspo/embulk/input/union/plugin/ReuseOutputLocalExecutorPlugin.scala) が生まれたのでした。
## [BreakinBulkLoader](https://github.com/civitaspo/embulk-input-union/blob/0.0.1/src/main/scala/pro/civitaspo/embulk/input/union/BreakinBulkLoader.scala)
[embulk\-input\-union](https://github.com/civitaspo/embulk-input-union) では内部で Embulk 相当の挙動を実現するため [BreakinBulkLoader](https://github.com/civitaspo/embulk-input-union/blob/0.0.1/src/main/scala/pro/civitaspo/embulk/input/union/BreakinBulkLoader.scala) というクラスを実装しています。このクラスは Embulk の [BulkLoader](https://github.com/embulk/embulk/blob/v0.9.23/embulk-core/src/main/java/org/embulk/exec/BulkLoader.java) に割り込む(break-in)形で別の Embulk を走らせるためこのような名前を付けています。
具体的にどういう挙動をするのか説明します。まず Embulk の [BulkLoader](https://github.com/embulk/embulk/blob/v0.9.23/embulk-core/src/main/java/org/embulk/exec/BulkLoader.java) の処理を見てみます。
```BulkLoader.java
private ExecutionResult doRun(ConfigSource config) {
final BulkLoaderTask task = config.loadConfig(BulkLoaderTask.class);
final ExecutorPlugin exec = newExecutorPlugin(task);
final ProcessPluginSet plugins = new ProcessPluginSet(task);
final LoaderState state = newLoaderState(logger, plugins);
state.setTransactionStage(TransactionStage.INPUT_BEGIN);
try {
ConfigDiff inputConfigDiff = plugins.getInputPlugin().transaction(task.getInputConfig(), new InputPlugin.Control() {
public List<TaskReport> run(final TaskSource inputTask, final Schema inputSchema, final int inputTaskCount) {
state.setInputTaskSource(inputTask);
state.setTransactionStage(TransactionStage.FILTER_BEGIN);
Filters.transaction(plugins.getFilterPlugins(), task.getFilterConfigs(), inputSchema, new Filters.Control() {
public void run(final List<TaskSource> filterTasks, final List<Schema> schemas) {
state.setSchemas(schemas);
state.setFilterTaskSources(filterTasks);
state.setTransactionStage(TransactionStage.EXECUTOR_BEGIN);
exec.transaction(task.getExecConfig(), last(schemas), inputTaskCount, new ExecutorPlugin.Control() {
public void transaction(final Schema executorSchema, final int outputTaskCount, final ExecutorPlugin.Executor executor) {
state.setExecutorSchema(executorSchema);
state.setTransactionStage(TransactionStage.OUTPUT_BEGIN);
@SuppressWarnings("checkstyle:LineLength")
ConfigDiff outputConfigDiff = plugins.getOutputPlugin().transaction(task.getOutputConfig(), executorSchema, outputTaskCount, new OutputPlugin.Control() {
public List<TaskReport> run(final TaskSource outputTask) {
state.setOutputTaskSource(outputTask);
state.initialize(inputTaskCount, outputTaskCount);
state.setTransactionStage(TransactionStage.RUN);
if (!state.isAllTasksCommitted()) { // inputTaskCount == 0
execute(task, executor, state);
}
if (!state.isAllTasksCommitted()) {
throw new RuntimeException(String.format("%d input tasks and %d output tasks failed",
state.countUncommittedInputTasks(), state.countUncommittedOutputTasks()));
}
state.setTransactionStage(TransactionStage.OUTPUT_COMMIT);
return state.getAllOutputTaskReports();
}
});
state.setOutputConfigDiff(outputConfigDiff);
state.setTransactionStage(TransactionStage.EXECUTOR_COMMIT);
}
});
state.setTransactionStage(TransactionStage.FILTER_COMMIT);
}
});
state.setTransactionStage(TransactionStage.INPUT_COMMIT);
return state.getAllInputTaskReports();
}
});
state.setInputConfigDiff(inputConfigDiff);
state.setTransactionStage(TransactionStage.CLEANUP);
cleanupCommittedTransaction(config, state);
return state.buildExecuteResult();
} catch (Throwable ex) {
if (isSkippedTransaction(ex)) {
ConfigDiff configDiff = ((SkipTransactionException) ex).getConfigDiff();
return state.buildExecuteResultOfSkippedExecution(configDiff);
} else if (state.isAllTasksCommitted() && state.isAllTransactionsCommitted()) {
// ignore the exception
return state.buildExecuteResultWithWarningException(ex);
}
throw state.buildPartialExecuteException(ex, Exec.session());
}
}
```
ref. https://github.com/embulk/embulk/blob/v0.9.23/embulk-core/src/main/java/org/embulk/exec/BulkLoader.java#L498-L568
上記の処理をざっくりまとめると以下のような処理を行っています。
1. Input Plugin の `transaction` を start
2. Filter Plugins の `transaction` を start
3. Executor Plugin の `transaction` を start
4. Output Plugin の `transaction` を start
5. Output Plugin の `open` で `PageOutput` を開く
6. Filter Plugins の `open` で Output Plugin の `PageOutput` につなぐ形で `PageOutput` を開く
7. Input Plugin の `run` でデータソースから Filter Plugins の `PageOutput` へデータを投入
8. Output Plugin の `transaction` を finish
9. Executor Plugin の `transaction` を finish
10. Filter Plugins の `transaction` を finish
11. Input Plugin の `transaction` を finish
この処理に [BreakinBulkLoader](https://github.com/civitaspo/embulk-input-union/blob/0.0.1/src/main/scala/pro/civitaspo/embulk/input/union/BreakinBulkLoader.scala) は割り込んで別の Embulk の処理を入れていきます。具体的には以下のコードです。
```BreakinBulkLoader.scala
def transaction(f: Schema => Unit): Unit = {
ThreadNameContext.switch(s"$loaderName:#transaction") {
ctxt: ThreadNameContext =>
try {
runInput {
runFilters {
// NOTE: This "f" calls "control.run" that is defined in UnionInputPlugin.
// And then "control.run" calls UnionInputPlugin#run. This means
// BreakinBulkLoader#run is called inside "f". So the plugins
// executions order become the same as the Embulk BulkLoader.
// ref. https://github.com/embulk/embulk/blob/c532e7c084ef7041914ec6b119522f6cb7dcf8e8/embulk-core/src/main/java/org/embulk/exec/BulkLoader.java#L498-L568
ctxt.switch { _ => f(lastFilterSchema) }
}
}
}
catch {
// NOTE: BreakinBulkLoader does not catch SkipTransactionException
// because this exception should be handled in the original
// BulkLoader to stop the output plugin ingesting.
// NOTE: BreakinBulkLoader catch only the exception wrapped by
// BreakinBulkLoader.Exception that has this BreakinBulkLoader's
// name, because any other exceptions are not thrown by this
// BreakinBulkLoader.
// NOTE: BreakinBulkLoader allows to suppress exceptions only when
// the all tasks and all transactions are committed.
case ex: BreakinBulkLoader.Exception
if (ex.name == loaderName && state.isAllTasksCommitted && state.isAllTransactionsCommitted) =>
logger.warn(
s"Threw exception on the stage: ${ex.transactionStage.map(_.name()).getOrElse("None")}," +
s" but all tasks and transactions are committed.",
ex
)
}
}
}
def run(schema: Schema, output: PageOutput): Unit = {
ThreadNameContext.switch(s"$loaderName:#run") { _ =>
val outputPlugin = PipeOutputPlugin(output)
val executorPlugin = ReuseOutputLocalExecutorPlugin(outputPlugin)
try {
runExecutor(executorPlugin, schema) {
(executor: ExecutorPlugin.Executor) =>
runOutput(outputPlugin) {
execute(executor)
}
}
}
catch {
// NOTE: Wrap the exception by BreakinBulkLoader.Exception
// in order to identify this exception thrown by this
// BreakinBulkLoader.
case ex: Throwable => throw buildException(ex)
}
}
}
```
ref. https://github.com/civitaspo/embulk-input-union/blob/0.0.1/src/main/scala/pro/civitaspo/embulk/input/union/BreakinBulkLoader.scala#L114-L169
上記コードの `transaction` method 及び `run` method が本体の Embulk における Input Plugin の `transaction` と `run` のタイミングで呼ばれます。そのため、先ほどの処理まとめに追記すると以下のような処理になります。便宜上 [embulk\-input\-union](https://github.com/civitaspo/embulk-input-union) が走らせる Embulk を union Embulk と呼びます。
1. Input Plugin の `transaction` を start
1. union Embulk: Input Plugin の `transaction` を start
2. union Embulk: Filter Plugins の `transaction` を start
3. ...(定義した数だけ処理を繰り返す)
2. Filter Plugins の `transaction` を start
3. Executor Plugin の `transaction` を start
4. Output Plugin の `transaction` を start
5. Output Plugin の `open` で `PageOutput` を開く
6. Filter Plugins の `open` で Output Plugin の `PageOutput` につなぐ形で `PageOutput` を開く
7. Input Plugin の `run` でデータソースから Filter Plugins の `PageOutput` へデータを投入
1. union Embulk: Executor Plugin の `transaction` を start
2. union Embulk: Output Plugin の `transaction` を start
3. union Embulk: Output Plugin の `open` で `PageOutput` を開く
6. union Embulk: Filter Plugins の `open` で Output Plugin の `PageOutput` につなぐ形で `PageOutput` を開く
7. union Embulk: Input Plugin の `run` でデータソースから Filter Plugins の `PageOutput` へデータを投入
8. union Embulk: Output Plugin の `transaction` を finish
9. union Embulk: Executor Plugin の `transaction` を finish
10. ...(定義した数だけ処理を繰り返す)
8. Output Plugin の `transaction` を finish
9. Executor Plugin の `transaction` を finish
10. Filter Plugins の `transaction` を finish
11. Input Plugin の `transaction` を finish
12. union Embulk: Filter Plugins の `transaction` を finish
11. union Embulk: Input Plugin の `transaction` を finish
12. ...(定義した数だけ処理を繰り返す)
このように [embulk\-input\-union](https://github.com/civitaspo/embulk-input-union) が実行する Embulk は本体の処理に割り込む形で実行されています。
# おわりに
この記事では [embulk\-input\-union](https://github.com/civitaspo/embulk-input-union) の使い方と内部実装の詳細を説明しました。是非皆さんも使ってみてください。