11
6

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

Digdag + Embulkで並列数をdigファイルだけで制御する

Last updated at Posted at 2019-06-09

Embulkでデータ転送処理を複数定義していると並列処理を使うと思いますが、同時接続数の都合やメモリの上限で全てを同時に動かすことが現実的ではない事があります。対策として、PythonやRubyを介して、分割した配列を作る方法があります。同僚が過去に投稿した記事でも紹介されています。

これらの方法でも実現できますが、digファイルを見た時に何をするためのワークフローなのかを分かりやすくしたいという気持ちがあり、digファイル内の処理だけで並列数を制御させたのでその方法を紹介します。

方法

下記のように6つのテーブルのデータを最大2並列で転送したいとします。

_export:
  # Concurrency
  parallel_num: 2
  # Set target tables
  target_table_list:
    - { name: table1 }
    - { name: table2 }
    - { name: table3 }
    - { name: table4 }
    - { name: table5 }
    - { name: table6 }

これを外部のPythonやRubyに頼らず分割します。

digファイル内では ${} という記法の中でjavascriptを記述することができるため、これを使います。
並列数を使ってループの回数と、ループ毎に必要な配列を生成できれば実現可能であるため、下記の方針で行くことにしました。

  • loopオペレータで指定回数ループさせる
  • loop内で必要な配列をsliceを用いて作成する

まずはloop数の算出方法です。
これは簡単で、連携テーブル数を並列数で割って切り上げした回数が必ず必要となるため

Math.ceil(target_table_list.length/parallel_num)

で計算できます。

次に並列実行するテーブルリストの作成にはloopオペレータのループ内で用いることができるiという変数を使います。これは0始まりでループ毎にインクリメントされて行く値であるため、その値とsliceを使う事でそのループ内で必要な配列だけ切り出す事ができます。

target_table_list.slice(i*parallel_num,(i+1)*parallel_num)

これで必要なテーブルのリストができました。
あとはこれらを_exportするなりして、embulkの処理につなげましょう。

例えば下記の用なコードで実現できます。

_export:
  # Concurrency
  parallel_num: 2
  # Set target tables
  target_table_list:
    - { name: table1 }
    - { name: table2 }
    - { name: table3 }
    - { name: table4 }
    - { name: table5 }
    - { name: table6 }

+parallel_test:
  _parallel: false
  loop>: ${Math.ceil(target_table_list.length/parallel_num)}
  _do:
    +exec_parallel:
      _parallel: true
      for_each>:
        target_table: ${target_table_list.slice(i*parallel_num,(i+1)*parallel_num)}
      _do:
        +run_embulk:
          sh>: echo ${target_table['name']}

結果

2019-06-10 00:17:48 +0900 [INFO] (0017@[0:default]+test+parallel_test): loop>: 3
2019-06-10 00:17:49 +0900 [INFO] (0017@[0:default]+test+parallel_test^sub+loop-0+exec_parallel): for_each>: {target_table=[{"name":"table1"},{"name":"table2"}]}
2019-06-10 00:17:49 +0900 [INFO] (0018@[0:default]+test+parallel_test^sub+loop-0+exec_parallel^sub+for-0=target_t=1=%7B%22name%22%3A+run_embulk): sh>: echo table2
table2
2019-06-10 00:17:50 +0900 [INFO] (0017@[0:default]+test+parallel_test^sub+loop-0+exec_parallel^sub+for-0=target_t=0=%7B%22name%22%3A+run_embulk): sh>: echo table1
table1
2019-06-10 00:17:50 +0900 [INFO] (0017@[0:default]+test+parallel_test^sub+loop-1+exec_parallel): for_each>: {target_table=[{"name":"table3"},{"name":"table4"}]}
2019-06-10 00:17:51 +0900 [INFO] (0018@[0:default]+test+parallel_test^sub+loop-1+exec_parallel^sub+for-0=target_t=1=%7B%22name%22%3A+run_embulk): sh>: echo table4
table4
2019-06-10 00:17:51 +0900 [INFO] (0017@[0:default]+test+parallel_test^sub+loop-1+exec_parallel^sub+for-0=target_t=0=%7B%22name%22%3A+run_embulk): sh>: echo table3
table3
2019-06-10 00:17:51 +0900 [INFO] (0017@[0:default]+test+parallel_test^sub+loop-2+exec_parallel): for_each>: {target_table=[{"name":"table5"},{"name":"table6"}]}
2019-06-10 00:17:51 +0900 [INFO] (0018@[0:default]+test+parallel_test^sub+loop-2+exec_parallel^sub+for-0=target_t=1=%7B%22name%22%3A+run_embulk): sh>: echo table6
table6
2019-06-10 00:17:52 +0900 [INFO] (0017@[0:default]+test+parallel_test^sub+loop-2+exec_parallel^sub+for-0=target_t=0=%7B%22name%22%3A+run_embulk): sh>: echo table5
table5
11
6
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
11
6

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?