46
30

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

Embulk の LocalExecutor プラグインの振る舞いについて整理

Last updated at Posted at 2016-03-19

対象バージョンは v0.8.6
間違っているところがあったら指摘お願いします :pray:

用語の整理

まずは用語の整理をします

用語 説明
Transaction Embulkにおける1つのバルクロード処理全体を表す概念
Task トランザクションを分割した仕事の単位。タスクの単位で並列度の制御が行われる。タスクの数だけプラグインのインスタンスが作られる。
Page 複数行をまとめたデータの塊で、プラグインはページ単位でデータを受け取り処理する
LocalExecutorプラグイン デフォルトの Executor プラグインで、Embulk を実行したホストのリソースを使用してバルクロード処理を行う。その他の Executor プラグインとしては MapReduce Executor プラグインが存在し、そちらではHadoop上でバルクロード処理を実行することができたりする。
DirectExecutor LocalExecutorプラグインの機能で、InputタスクとOutputタスクを同じスレッドで処理する
ScatterExecutor LocalExecutorプラグインの機能で、1つのInputタスクを、それぞれ別のスレッドで動く(かもしれない)複数のOutputタスクに分散して、FIlter/Outputプラグインの処理を行う。v0.8で導入された。
Inputタスク Inputプラグインのタスク。FileInputPlugin ベースのプラグイン(input-file や embulk-input-s3)の場合、Inputタスク数はファイル数になる。InputPlugin ベースのプラグイン(input-jdbc など)の場合は 1 となる
Outputタスク Filter、Outputプラグインのタスク。
max_threads 最大並列数を制御する LocalExecutorプラグインのパラメータ。デフォルトはCPUコア数の2倍。設定ファイルで指定することもできるが、embulk コマンドのオプション -X max_threads で指定することもできる。1つのスレッドで複数のタスクが動作しうる(同時にではない)。
min_output_tasks 最小Outputタスク数を制御する LocalExecutorプラグインのパラメータ。デフォルトはCPUコア数。Inputタスク数が min_output_tasks よりも小さい場合は、Outputタスク数が min_output_tasks になるように ScatterExecutor が使われる。設定ファイルで指定することもできるが、embulk コマンドのオプション -X min_output_tasks で指定することもできる。

min_output_tasks, max_threads の指定方法

See http://www.embulk.org/docs/built-in.html#id32

計算式と具体例の整理

計算式の整理

inputTaskCount = FileInputプラグインの場合ファイル数、Inputプラグインの場合1

ScatterExecutor を使うかどうかの判定

scatter? = inputTaskCount < min_output_tasks

ScatterExecutor の場合

scatterCount = (min_output_tasks + inputTaskCount - 1) / inputTaskCount
outputTaskCount = inputTaskCount * scatterCount
inputThreadCount = max(max_threads / scatterCount, 1)
outputThreadCount = min(max_threads, outputTaskCount) *変数として定義されているわけではない

DirectExecutor の場合

outputTaskCount = inputTaskCount
inputThreadCount = max_threads
outputThreadCount = max_threads *正確には input と output で同じスレッドを利用する

ref. https://github.com/embulk/embulk/blob/a2eaa4a79791eb2c0fa6865584ac7f91a4dab9ec/embulk-core/src/main/java/org/embulk/exec/LocalExecutorPlugin.java

具体例) InputTaskCount=1, min_output_tasks=2, max_threads=4

InputTaskCountが1になるのは、FileInputPluginで入力ファイルが1つの場合と、FileInputPluginではないInputPluginの場合(MySQLなどのデータベースからの入力など)。

ScatterExecutorが利用され、Outputの並列数が2になる。

max_threads 4 としているのだが、OutputTask 数は2 なので、Output Thread は2個しか使われていない

NOTE: newCachedThreadPool が使われているので、Output Thread3, 4 は実際は存在すらしていない

(追記) ThreadPool からスレッドを取り出してくるため、Output Task 1 が使用するスレッドが常に Output Thread1 になるとは限らないという点で、この図は語弊がある。気が向いたら直したい。

image

具体例) InputTaskCount=1, min_output_tasks=8, max_threads=4

max_threads=4 なのに、Output Thread が8つになるパターン。BUGか?

min_output_tasks > max_threads になるような指定は辞めよう。

image

具体例) InputTaskCount=2, min_output_tasks=4, max_threads=4

FileInputPluginで入力ファイルを2つにすると、InputTaskCountは2となる。Input Thread が 2つとなり、並列で読み込まれる。

Output Task 数が 4 になるように Scatterされ、4つのOutput Thread に分散されている。

image

具体例) InputTaskCount=2, min_output_tasks=8, max_threads=4

InputThreadの数が減り、並列性が落ちるパターン。Input Task1 が終わってから Input Task2 の処理が始まる。

1つのOutputThreadで、2つのOutputタスクが動くパターンでもある。

性能が落ちているので min_output_tasks > max_threads になるような指定は辞めよう。

image

その他の具体例

スプレッドシートにまとめているのでご参照ください > https://docs.google.com/spreadsheets/d/1VEzeAWkhBbCel4BahZIR6QFmYoONu3F2KfcuKFashwQ/edit?usp=sharing

疑問もしくは意見の整理

Q. DirectExecutor の場合に、newFixedThreadPool で maxThreads 分スレッドを作っているが、min(inputTaskCount, maxThreads) 作れば十分なのではないか?

大きな問題ではないので気にしていないだけだろうか

Q. FileOutput で試すと ScatterExecutor で想定される値よりも少ない並列度になるのだが、なぜか? embulk-output-bigquery なら問題なかった

例えば、入力ファイル2、-X min_output_tasks=4 -X max_threads=4 で、FileOutputPlugin で ThreadID をログに出したところ、4 ではなく 2個のスレッドしか使われていなかった

再現方法)

git clone https://github.com/sonots/embulk
git checkout -b thread_log origin/thread_log
./gradlew cli
pkg/embulk-0.8.6.jar run -l trace -X page_size=64 -X min_output_tasks=4 -X max_threads=4 example/two_files.yml

Q. min_output_tasks > max_threads になると、挙動が微妙

禁止にしても良いような?

46
30
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
46
30

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?