対象バージョンは v0.8.6
間違っているところがあったら指摘お願いします
用語の整理
まずは用語の整理をします
用語 | 説明 |
---|---|
Transaction | Embulkにおける1つのバルクロード処理全体を表す概念 |
Task | トランザクションを分割した仕事の単位。タスクの単位で並列度の制御が行われる。タスクの数だけプラグインのインスタンスが作られる。 |
Page | 複数行をまとめたデータの塊で、プラグインはページ単位でデータを受け取り処理する |
LocalExecutorプラグイン | デフォルトの Executor プラグインで、Embulk を実行したホストのリソースを使用してバルクロード処理を行う。その他の Executor プラグインとしては MapReduce Executor プラグインが存在し、そちらではHadoop上でバルクロード処理を実行することができたりする。 |
DirectExecutor | LocalExecutorプラグインの機能で、InputタスクとOutputタスクを同じスレッドで処理する |
ScatterExecutor | LocalExecutorプラグインの機能で、1つのInputタスクを、それぞれ別のスレッドで動く(かもしれない)複数のOutputタスクに分散して、FIlter/Outputプラグインの処理を行う。v0.8で導入された。 |
Inputタスク | Inputプラグインのタスク。FileInputPlugin ベースのプラグイン(input-file や embulk-input-s3)の場合、Inputタスク数はファイル数になる。InputPlugin ベースのプラグイン(input-jdbc など)の場合は 1 となる |
Outputタスク | Filter、Outputプラグインのタスク。 |
max_threads | 最大並列数を制御する LocalExecutorプラグインのパラメータ。デフォルトはCPUコア数の2倍。設定ファイルで指定することもできるが、embulk コマンドのオプション -X max_threads で指定することもできる。1つのスレッドで複数のタスクが動作しうる(同時にではない)。 |
min_output_tasks | 最小Outputタスク数を制御する LocalExecutorプラグインのパラメータ。デフォルトはCPUコア数。Inputタスク数が min_output_tasks よりも小さい場合は、Outputタスク数が min_output_tasks になるように ScatterExecutor が使われる。設定ファイルで指定することもできるが、embulk コマンドのオプション -X min_output_tasks で指定することもできる。 |
min_output_tasks, max_threads の指定方法
See http://www.embulk.org/docs/built-in.html#id32
計算式と具体例の整理
計算式の整理
inputTaskCount = FileInputプラグインの場合ファイル数、Inputプラグインの場合1
ScatterExecutor を使うかどうかの判定
scatter? = inputTaskCount < min_output_tasks
ScatterExecutor の場合
scatterCount = (min_output_tasks + inputTaskCount - 1) / inputTaskCount
outputTaskCount = inputTaskCount * scatterCount
inputThreadCount = max(max_threads / scatterCount, 1)
outputThreadCount = min(max_threads, outputTaskCount) *変数として定義されているわけではない
DirectExecutor の場合
outputTaskCount = inputTaskCount
inputThreadCount = max_threads
outputThreadCount = max_threads *正確には input と output で同じスレッドを利用する
具体例) InputTaskCount=1, min_output_tasks=2, max_threads=4
InputTaskCountが1になるのは、FileInputPluginで入力ファイルが1つの場合と、FileInputPluginではないInputPluginの場合(MySQLなどのデータベースからの入力など)。
ScatterExecutorが利用され、Outputの並列数が2になる。
max_threads 4 としているのだが、OutputTask 数は2 なので、Output Thread は2個しか使われていない
NOTE: newCachedThreadPool が使われているので、Output Thread3, 4 は実際は存在すらしていない
(追記) ThreadPool からスレッドを取り出してくるため、Output Task 1 が使用するスレッドが常に Output Thread1 になるとは限らないという点で、この図は語弊がある。気が向いたら直したい。
具体例) InputTaskCount=1, min_output_tasks=8, max_threads=4
max_threads=4 なのに、Output Thread が8つになるパターン。BUGか?
min_output_tasks > max_threads になるような指定は辞めよう。
具体例) InputTaskCount=2, min_output_tasks=4, max_threads=4
FileInputPluginで入力ファイルを2つにすると、InputTaskCountは2となる。Input Thread が 2つとなり、並列で読み込まれる。
Output Task 数が 4 になるように Scatterされ、4つのOutput Thread に分散されている。
具体例) InputTaskCount=2, min_output_tasks=8, max_threads=4
InputThreadの数が減り、並列性が落ちるパターン。Input Task1 が終わってから Input Task2 の処理が始まる。
1つのOutputThreadで、2つのOutputタスクが動くパターンでもある。
性能が落ちているので min_output_tasks > max_threads になるような指定は辞めよう。
その他の具体例
スプレッドシートにまとめているのでご参照ください > https://docs.google.com/spreadsheets/d/1VEzeAWkhBbCel4BahZIR6QFmYoONu3F2KfcuKFashwQ/edit?usp=sharing
疑問もしくは意見の整理
Q. DirectExecutor の場合に、newFixedThreadPool で maxThreads 分スレッドを作っているが、min(inputTaskCount, maxThreads) 作れば十分なのではないか?
大きな問題ではないので気にしていないだけだろうか
Q. FileOutput で試すと ScatterExecutor で想定される値よりも少ない並列度になるのだが、なぜか? embulk-output-bigquery なら問題なかった
例えば、入力ファイル2、-X min_output_tasks=4 -X max_threads=4 で、FileOutputPlugin で ThreadID をログに出したところ、4 ではなく 2個のスレッドしか使われていなかった
再現方法)
git clone https://github.com/sonots/embulk
git checkout -b thread_log origin/thread_log
./gradlew cli
pkg/embulk-0.8.6.jar run -l trace -X page_size=64 -X min_output_tasks=4 -X max_threads=4 example/two_files.yml
Q. min_output_tasks > max_threads になると、挙動が微妙
禁止にしても良いような?