Embulkでデータ転送処理を複数定義していると並列処理を使うと思いますが、同時接続数の都合やメモリの上限で全てを同時に動かすことが現実的ではない事があります。対策として、PythonやRubyを介して、分割した配列を作る方法があります。同僚が過去に投稿した記事でも紹介されています。
これらの方法でも実現できますが、digファイルを見た時に何をするためのワークフローなのかを分かりやすくしたいという気持ちがあり、digファイル内の処理だけで並列数を制御させたのでその方法を紹介します。
方法
下記のように6つのテーブルのデータを最大2並列で転送したいとします。
_export:
# Concurrency
parallel_num: 2
# Set target tables
target_table_list:
- { name: table1 }
- { name: table2 }
- { name: table3 }
- { name: table4 }
- { name: table5 }
- { name: table6 }
これを外部のPythonやRubyに頼らず分割します。
digファイル内では ${}
という記法の中でjavascriptを記述することができるため、これを使います。
並列数を使ってループの回数と、ループ毎に必要な配列を生成できれば実現可能であるため、下記の方針で行くことにしました。
- loopオペレータで指定回数ループさせる
- loop内で必要な配列をsliceを用いて作成する
まずはloop数の算出方法です。
これは簡単で、連携テーブル数を並列数で割って切り上げした回数が必ず必要となるため
Math.ceil(target_table_list.length/parallel_num)
で計算できます。
次に並列実行するテーブルリストの作成にはloopオペレータのループ内で用いることができるi
という変数を使います。これは0始まりでループ毎にインクリメントされて行く値であるため、その値とsliceを使う事でそのループ内で必要な配列だけ切り出す事ができます。
target_table_list.slice(i*parallel_num,(i+1)*parallel_num)
これで必要なテーブルのリストができました。
あとはこれらを_exportするなりして、embulkの処理につなげましょう。
例えば下記の用なコードで実現できます。
_export:
# Concurrency
parallel_num: 2
# Set target tables
target_table_list:
- { name: table1 }
- { name: table2 }
- { name: table3 }
- { name: table4 }
- { name: table5 }
- { name: table6 }
+parallel_test:
_parallel: false
loop>: ${Math.ceil(target_table_list.length/parallel_num)}
_do:
+exec_parallel:
_parallel: true
for_each>:
target_table: ${target_table_list.slice(i*parallel_num,(i+1)*parallel_num)}
_do:
+run_embulk:
sh>: echo ${target_table['name']}
結果
2019-06-10 00:17:48 +0900 [INFO] (0017@[0:default]+test+parallel_test): loop>: 3
2019-06-10 00:17:49 +0900 [INFO] (0017@[0:default]+test+parallel_test^sub+loop-0+exec_parallel): for_each>: {target_table=[{"name":"table1"},{"name":"table2"}]}
2019-06-10 00:17:49 +0900 [INFO] (0018@[0:default]+test+parallel_test^sub+loop-0+exec_parallel^sub+for-0=target_t=1=%7B%22name%22%3A+run_embulk): sh>: echo table2
table2
2019-06-10 00:17:50 +0900 [INFO] (0017@[0:default]+test+parallel_test^sub+loop-0+exec_parallel^sub+for-0=target_t=0=%7B%22name%22%3A+run_embulk): sh>: echo table1
table1
2019-06-10 00:17:50 +0900 [INFO] (0017@[0:default]+test+parallel_test^sub+loop-1+exec_parallel): for_each>: {target_table=[{"name":"table3"},{"name":"table4"}]}
2019-06-10 00:17:51 +0900 [INFO] (0018@[0:default]+test+parallel_test^sub+loop-1+exec_parallel^sub+for-0=target_t=1=%7B%22name%22%3A+run_embulk): sh>: echo table4
table4
2019-06-10 00:17:51 +0900 [INFO] (0017@[0:default]+test+parallel_test^sub+loop-1+exec_parallel^sub+for-0=target_t=0=%7B%22name%22%3A+run_embulk): sh>: echo table3
table3
2019-06-10 00:17:51 +0900 [INFO] (0017@[0:default]+test+parallel_test^sub+loop-2+exec_parallel): for_each>: {target_table=[{"name":"table5"},{"name":"table6"}]}
2019-06-10 00:17:51 +0900 [INFO] (0018@[0:default]+test+parallel_test^sub+loop-2+exec_parallel^sub+for-0=target_t=1=%7B%22name%22%3A+run_embulk): sh>: echo table6
table6
2019-06-10 00:17:52 +0900 [INFO] (0017@[0:default]+test+parallel_test^sub+loop-2+exec_parallel^sub+for-0=target_t=0=%7B%22name%22%3A+run_embulk): sh>: echo table5
table5