Help us understand the problem. What is going on with this article?

Digdag + Embulkで並列数をdigファイルだけで制御する

More than 1 year has passed since last update.

Embulkでデータ転送処理を複数定義していると並列処理を使うと思いますが、同時接続数の都合やメモリの上限で全てを同時に動かすことが現実的ではない事があります。対策として、PythonやRubyを介して、分割した配列を作る方法があります。同僚が過去に投稿した記事でも紹介されています。

これらの方法でも実現できますが、digファイルを見た時に何をするためのワークフローなのかを分かりやすくしたいという気持ちがあり、digファイル内の処理だけで並列数を制御させたのでその方法を紹介します。

方法

下記のように6つのテーブルのデータを最大2並列で転送したいとします。

_export:
  # Concurrency
  parallel_num: 2
  # Set target tables
  target_table_list:
    - { name: table1 }
    - { name: table2 }
    - { name: table3 }
    - { name: table4 }
    - { name: table5 }
    - { name: table6 }

これを外部のPythonやRubyに頼らず分割します。

digファイル内では ${} という記法の中でjavascriptを記述することができるため、これを使います。
並列数を使ってループの回数と、ループ毎に必要な配列を生成できれば実現可能であるため、下記の方針で行くことにしました。

  • loopオペレータで指定回数ループさせる
  • loop内で必要な配列をsliceを用いて作成する

まずはloop数の算出方法です。
これは簡単で、連携テーブル数を並列数で割って切り上げした回数が必ず必要となるため

Math.ceil(target_table_list.length/parallel_num)

で計算できます。

次に並列実行するテーブルリストの作成にはloopオペレータのループ内で用いることができるiという変数を使います。これは0始まりでループ毎にインクリメントされて行く値であるため、その値とsliceを使う事でそのループ内で必要な配列だけ切り出す事ができます。

target_table_list.slice(i*parallel_num,(i+1)*parallel_num)

これで必要なテーブルのリストができました。
あとはこれらを_exportするなりして、embulkの処理につなげましょう。

例えば下記の用なコードで実現できます。

_export:
  # Concurrency
  parallel_num: 2
  # Set target tables
  target_table_list:
    - { name: table1 }
    - { name: table2 }
    - { name: table3 }
    - { name: table4 }
    - { name: table5 }
    - { name: table6 }

+parallel_test:
  _parallel: false
  loop>: ${Math.ceil(target_table_list.length/parallel_num)}
  _do:
    +exec_parallel:
      _parallel: true
      for_each>:
        target_table: ${target_table_list.slice(i*parallel_num,(i+1)*parallel_num)}
      _do:
        +run_embulk:
          sh>: echo ${target_table['name']}

結果

2019-06-10 00:17:48 +0900 [INFO] (0017@[0:default]+test+parallel_test): loop>: 3
2019-06-10 00:17:49 +0900 [INFO] (0017@[0:default]+test+parallel_test^sub+loop-0+exec_parallel): for_each>: {target_table=[{"name":"table1"},{"name":"table2"}]}
2019-06-10 00:17:49 +0900 [INFO] (0018@[0:default]+test+parallel_test^sub+loop-0+exec_parallel^sub+for-0=target_t=1=%7B%22name%22%3A+run_embulk): sh>: echo table2
table2
2019-06-10 00:17:50 +0900 [INFO] (0017@[0:default]+test+parallel_test^sub+loop-0+exec_parallel^sub+for-0=target_t=0=%7B%22name%22%3A+run_embulk): sh>: echo table1
table1
2019-06-10 00:17:50 +0900 [INFO] (0017@[0:default]+test+parallel_test^sub+loop-1+exec_parallel): for_each>: {target_table=[{"name":"table3"},{"name":"table4"}]}
2019-06-10 00:17:51 +0900 [INFO] (0018@[0:default]+test+parallel_test^sub+loop-1+exec_parallel^sub+for-0=target_t=1=%7B%22name%22%3A+run_embulk): sh>: echo table4
table4
2019-06-10 00:17:51 +0900 [INFO] (0017@[0:default]+test+parallel_test^sub+loop-1+exec_parallel^sub+for-0=target_t=0=%7B%22name%22%3A+run_embulk): sh>: echo table3
table3
2019-06-10 00:17:51 +0900 [INFO] (0017@[0:default]+test+parallel_test^sub+loop-2+exec_parallel): for_each>: {target_table=[{"name":"table5"},{"name":"table6"}]}
2019-06-10 00:17:51 +0900 [INFO] (0018@[0:default]+test+parallel_test^sub+loop-2+exec_parallel^sub+for-0=target_t=1=%7B%22name%22%3A+run_embulk): sh>: echo table6
table6
2019-06-10 00:17:52 +0900 [INFO] (0017@[0:default]+test+parallel_test^sub+loop-2+exec_parallel^sub+for-0=target_t=0=%7B%22name%22%3A+run_embulk): sh>: echo table5
table5
zozotech
70億人のファッションを技術の力で変えていく
https://tech.zozo.com/
Why not register and get more from Qiita?
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away