Help us understand the problem. What is going on with this article?

Digdagで大きいパラメータを登録すると後続の処理が重くなる

この記事はZOZOテクノロジーズ #5 Advent Calendar 2019 7日目の記事になります。

また、今年は全部で5つのAdvent Calendarが公開されています。

ZOZOテクノロジーズ #1 Advent Calendar 2019
ZOZOテクノロジーズ #2 Advent Calendar 2019
ZOZOテクノロジーズ #3 Advent Calendar 2019
ZOZOテクノロジーズ #4 Advent Calendar 2019

概要

Digdagのstoreパラメータに大きな値を入れたときに後続のジョブがすべて遅くなってしまうと言うことがあったので紹介します。
考えてみれば大きな値を毎回DBから取り出してDigdag側でパース等しているので重くなるのは当たり前なのですが。

そこで今回は実際にどんな感じで遅くなるのかを紹介します。また、その回方法を紹介します。

計測

まずは、storeパラメータの大きさによってどれくらい処理のスピードが落ちるのか計測してみます。
以下のようなワークフローを利用してそれぞれ時間を計測しました。

計測用のコード

はじめにstoreに値を登録するRubyスクリプトを作成します。
aが100個並んだの文字列のリストを作成します。長さは10/100/1000/10000のものを用意します。

tasks/task.rb

class Task
  def store10
    list = ['a' * 100] * 10
    Digdag.env.store(LIST: list)
  end

  def store100
    list = ['a' * 100] * 100
    Digdag.env.store(LIST: list)
  end

  def store1000
    list = ['a' * 100] * 1000
    Digdag.env.store(LIST: list)
  end

  def store10000
    list = ['a' * 100] * 10000
    Digdag.env.store(LIST: list)
  end
end

続いて上記のスクリプトを実行後に"hello"を出力するようなワークフローを作成します。
XXXXには10/100/1000/10000の値が入ります。

storeXXXX.dig

+task1:
  _export:
    docker:
      image: ruby
  rb>: Task.storeXXXX
  require: 'tasks/task'

+task2:
  echo>: 'hello'

実行結果

$ time digdag run store10.dig --rerun
2019-12-05 11:40:38 +0900: Digdag v0.9.39
(略)
2019-12-05 11:40:40 +0900 [INFO] (0018@[0:default]+store10+task1): rb>: Task.store10
2019-12-05 11:40:41 +0900 [INFO] (0018@[0:default]+store10+task2): echo>: hello
hello
Success. Task state is saved at /Users/katsuya.tajima/src/github.com/katsuyan/digdag-example/digdag-store/.digdag/status/20191205T000000+0000 directory.
  * Use --session <daily | hourly | "yyyy-MM-dd[ HH:mm:ss]"> to not reuse the last session time.
  * Use --rerun, --start +NAME, or --goal +NAME argument to rerun skipped tasks.
digdag run store10.dig --rerun  3.90s user 0.38s system 106% cpu 4.020 total
$ time digdag run store100.dig --rerun
2019-12-05 11:40:44 +0900: Digdag v0.9.39
(略)
2019-12-05 11:40:46 +0900 [INFO] (0018@[0:default]+store100+task1): rb>: Task.store100
2019-12-05 11:40:48 +0900 [INFO] (0018@[0:default]+store100+task2): echo>: hello
hello
Success. Task state is saved at /Users/katsuya.tajima/src/github.com/katsuyan/digdag-example/digdag-store/.digdag/status/20191205T000000+0000 directory.
  * Use --session <daily | hourly | "yyyy-MM-dd[ HH:mm:ss]"> to not reuse the last session time.
  * Use --rerun, --start +NAME, or --goal +NAME argument to rerun skipped tasks.
digdag run store100.dig --rerun  3.98s user 0.38s system 108% cpu 3.996 total
$ time digdag run store1000.dig --rerun
2019-12-05 11:40:50 +0900: Digdag v0.9.39
(略)
2019-12-05 11:40:52 +0900 [INFO] (0018@[0:default]+store1000+task1): rb>: Task.store1000
2019-12-05 11:40:55 +0900 [INFO] (0018@[0:default]+store1000+task2): echo>: hello
hello
Success. Task state is saved at /Users/katsuya.tajima/src/github.com/katsuyan/digdag-example/digdag-store/.digdag/status/20191205T000000+0000 directory.
  * Use --session <daily | hourly | "yyyy-MM-dd[ HH:mm:ss]"> to not reuse the last session time.
  * Use --rerun, --start +NAME, or --goal +NAME argument to rerun skipped tasks.
digdag run store1000.dig --rerun  5.01s user 0.42s system 113% cpu 4.795 total
$ time digdag run store10000.dig --rerun
2019-12-05 11:40:58 +0900: Digdag v0.9.39
(略)
2019-12-05 11:41:00 +0900 [INFO] (0018@[0:default]+store10000+task1): rb>: Task.store10000
2019-12-05 11:42:29 +0900 [INFO] (0018@[0:default]+store10000+task2): echo>: hello
hello
Success. Task state is saved at /Users/katsuya.tajima/src/github.com/katsuyan/digdag-example/digdag-store/.digdag/status/20191205T000000+0000 directory.
  * Use --session <daily | hourly | "yyyy-MM-dd[ HH:mm:ss]"> to not reuse the last session time.
  * Use --rerun, --start +NAME, or --goal +NAME argument to rerun skipped tasks.
digdag run store10000.dig --rerun  92.45s user 1.50s system 102% cpu 1:31.28 total

以上の結果からリストの長さが10000のときに明らかにtask2の実行時間が伸びていることがわかります。
今回は、遅くなるということを示したかったのでどれくらいでどのくらい遅くなるのかということは示していません。

パラメータの値が大きくなるケース

それでも、どうしても大きいリストをstoreに入れて利用したいという場合があると思います。
例えばBigQueryからパラメータをとってきて、それをfor_eachオペレータで並列処理させたいということがありました。
このときは以下のようなデータ構造でsotreに5000件ほどのパラメータを入れていました。

[
  {
    id: 1,
    text: '文字列'
  },
  ()
]

このやり方だと後続の処理がすごーく遅くなってしまったので、そこで使った回避策を紹介します。

回避策

プライマリーキーの利用

上記のようなケースでは、事前にKVSやRDBなどに後続で利用したいパラメータをintなど比較的小さいPKと一緒に登録します。そしてstoreにはPKのリストのみ登録してやることで大きさを減らすことができます。

# BigQueryからid/textを取得しKVSに登録
+get_bigquery_and_set_kvs:
  _export:
    docker:
      image: ruby
  rb>: Task.get_bigquery_and_set_kvs 
  require: 'tasks/task'

# PKでループを回しKVSからtextを取得し出力する
+loop
  for_each>:
    PK: ${PK_LIST}
  _parallel: true
  _do:

    # PKを利用してKVSからtextを取得
    +get_text_from_kvs
      _export:
        docker:
          image: ruby
      rb>: Task.get_text_from_kvs
      require: 'tasks/task'

    # textを出力
    +print_text
      _export:
        docker:
          image: ruby
      rb>: Task.print_text 
      require: 'tasks/task'

storeの上書き

以上のような方法をとったとしてもPKのリストは大きくなってしまいます。その状態で都度PKのリストを参照してしまうため、後続のジョブすべての処理が結局遅くなってしまいます。そこですでに必要ない大きいパラメータを上書きすることで回避することができます。以下のようにfor_eachでPKのリストを渡したあとに、後続の処理ではPKのリストを空文字列で上書きしてやると後続への影響をなくすことができます。

+loop
  for_each>:
    PK: ${PK_LIST}
  _parallel: true
  _do:

    _export:
      PK: ''

    # PKを利用してKVSからtextを取得
    +get_text_from_kvs
      _export:
        docker:
          image: ruby
      rb>: Task.get_text_from_kvs
      require: 'tasks/task'

実験

実際にsotreを上書きすることで、処理が早くなるか実験してみます。
計測時に示した10000件のワークフローに、task2のタイミングでLISTを空文字で上書きしています。

ワークフロー

+task1:
  _export:
    docker:
      image: ruby
  rb>: Task.task10000
  require: 'tasks/task'

+task2:
  _export:
    LIST: ''
  sh>: echo 'task2!!'

結果

$ time digdag run store10000_update_list.dig --rerun
2019-12-05 12:30:02 +0900: Digdag v0.9.39
()
2019-12-05 12:30:04 +0900 [INFO] (0018@[0:default]+store10000_update_list+task1): rb>: Task.store10000
2019-12-05 12:30:06 +0900 [INFO] (0018@[0:default]+store10000_update_list+task2): sh>: echo 'task2!!'
task2!!
Success. Task state is saved at /Users/katsuya.tajima/src/github.com/katsuyan/digdag-example/digdag-store/.digdag/status/20191205T000000+0000 directory.
  * Use --session <daily | hourly | "yyyy-MM-dd[ HH:mm:ss]"> to not reuse the last session time.
  * Use --rerun, --start +NAME, or --goal +NAME argument to rerun skipped tasks.
digdag run store10000_update_list.dig --rerun 4.06s user 0.40s system 107% cpu 4.162 total

以上の結果から上書きしてやることで全体の処理が早く終わることがわかりました。

まとめ

今回はDigdagのパラメータにでかい値を登録すると後続のジョブが遅くなることについて紹介しました。
また、その回避策についてPKを利用する方法とstoreを上書きする方法について紹介しました。

ただし、そもそもでかい値をstoreに登録する時点でDigdagの用途とかけ離れてしまっているのかなとも感じたので、それをふまえて参考にしていただければと思います。

今日から3日連続Digdagネタです。明日は「digdag-myjdbc-pluginを作ろうとした話」になります。

katsuyan
エンジニアしてます。RubyとかGoとかClojureとかが好きです。DigDagとかEmbulkとかも好きです。
http://zakiran.hatenablog.com/
zozotech
70億人のファッションを技術の力で変えていく
https://tech.zozo.com/
Why not register and get more from Qiita?
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away