LoginSignup
7
1

More than 5 years have passed since last update.

DigdagのstoreパラメータをRuby間で渡すTips

Posted at

前回Digdagのstoreのパラメータを環境ごとにセットアップするTipsを紹介しました。
今回はRubyを使ったDigdagのstoreパラメータ渡しのTipsを紹介します。

やり方

Digdagではrubyのタスク間でHashを渡すことができます。これを利用することで、まとまった情報をタスク間で共有することができます。
一点注意点があり、Digdag storeでkeyがシンボルのhashを渡しても、Digdag paramsで取得したHashのキーが文字列になってしまうということです。
また、もともと文字列をキーとしてHashを作成すればそのままの形でパラメータが渡されます。

> Digdag.env.store(HASH: {key1: 123, key2: 'abc'})

->

> Digdag.env.params['HASH']
=> {'key1' => 123, 'key2' => 'abc'}

サンプル

最終的な構成はGitHubからも確認できます。(https://github.com/katsuyan/digdag_sample_store_hash)

ディレクトリ構成

.
├── digdsag.dig
└── tasks
    └── task.rb

digdag.dig

digdag.dig
timezone: Asia/Tokyo

+task1:
  _retry: 3
  _export:
    docker:
      image: ruby:2.5.1
  rb>: Task.task1
  require: 'tasks/task'

+task2:
  _retry: 3
  _export:
    docker:
      image: ruby:2.5.1
  rb>: Task.task2
  require: 'tasks/task'

task.rb

task.rb
require 'yaml'

class Task
  def task1
    hash = {str: 'aaa', num: 123}
    Digdag.env.store(HASH: hash)
  end

  def task2
    hash = Digdag.env.params['HASH']
    puts '---hash---'
    p hash
    puts '---str---'
    p hash['str']
    puts '---b---'
    p hash['num']
  end
end

実行の流れ

1. digdagからTask.task1を呼び出す

+task1:
  _retry: 3
  _export:
    docker:
      image: ruby:2.5.1
  rb>: Task.task1
  require: 'tasks/task'

2. task.rbのTask.task1を実行

def task1
  hash = {str: 'aaa', num: 123}
  Digdag.env.store(HASH: hash)
end

これにより、HASH{str: 'aaa', num: 123}が格納されます。

3. digdagからTask.task2を呼び出す

+task2:
  _retry: 3
  _export:
    docker:
      image: ruby:2.5.1
  rb>: Task.task2
  require: 'tasks/task'

4. task.rbのTask.task2を実行

def task2
  hash = Digdag.env.params['HASH']
  puts '---hash---'
  p hash
  puts '---str---'
  p hash['str']
  puts '---b---'
  p hash['num']
end

Digdag.env.paramsを利用してHASHに格納された値を取り出します。
この時、格納時にkeyとしてsymbolを利用したものは文字列に変換されているので注意が必要です。

実行結果

$ digdag run digdsag.dig
2018-12-10 20:56:42 +0900: Digdag v0.9.31
2018-12-10 20:56:44 +0900 [WARN] (main): Reusing the last session time 2018-12-04T00:00:00+09:00.
2018-12-10 20:56:44 +0900 [INFO] (main): Using session /Users/katsuya.tajima/tmp/digdag_sample_store_hash/.digdag/status/20181204T000000+0900.
2018-12-10 20:56:44 +0900 [INFO] (main): Starting a new session project id=1 workflow name=digdsag session_time=2018-12-04T00:00:00+09:00
2018-12-10 20:56:45 +0900 [INFO] (0018@[0:default]+digdsag+task1): rb>: Task.task1
2018-12-10 20:56:48 +0900 [INFO] (0018@[0:default]+digdsag+task2): rb>: Task.task2
---hash---
{"str"=>"aaa", "num"=>123}
---str---
"aaa"
---b---
123
Success. Task state is saved at /Users/katsuya.tajima/tmp/digdag_sample_store_hash/.digdag/status/20181204T000000+0900 directory.
  * Use --session <daily | hourly | "yyyy-MM-dd[ HH:mm:ss]"> to not reuse the last session time.
  * Use --rerun, --start +NAME, or --goal +NAME argument to rerun skipped tasks.

実行結果から実際にhashのキーが文字列に変換されていることがわかります。
また、文字列をキーとしてHashの値の取り出しに成功しています。

タスクの並列実行

タスク間でのhash渡しを利用することで、parallel実行を簡単に実現することができます。
HashのListを作成することで、各Hashごとにparallelでタスクを簡単に実行することができます。

サンプル

ディレクトリ構成

.
├── digdag2.dig
└── tasks
    └── task2.rb

digdag.dig

digdag2.dig
timezone: Asia/Tokyo

+task1:
  _retry: 3
  _export:
    docker:
      image: ruby:2.5.1
  rb>: Task2.task1
  require: 'tasks/task2'

+parallel_task:
  for_each>:
    HASH: ${HASH_LIST}
  _parallel: true
  _do:
    +task2:
      _retry: 3
      _export:
        docker:
          image: ruby:2.5.1
      rb>: Task2.task2
      require: 'tasks/task2'

task2.rb

task2.rb
require 'yaml'

class Task2
  def task1
    hash_list = [{str: 'aaa', num: 123}, {str: 'bbb', num: 456}]
    Digdag.env.store(HASH_LIST: hash_list)
  end

  def task2
    p Digdag.env.params['HASH']
  end
end

実行の流れ

1. digdagからTask2.task1を呼び出す

+task1:
  _retry: 3
  _export:
    docker:
      image: ruby:2.5.1
  rb>: Task2.task1
  require: 'tasks/task2'

2. task.rbのTask2.task1を実行

def task1
  hash_list = [{str: 'aaa', num: 12}, {str: 'bbb', num: 34}, {str: 'ccc', num: 56}, {str: 'ddd', num: 78}]
  Digdag.env.store(HASH_LIST: hash_list)
end

これにより、HASH_LIST

[{str: 'aaa', num: 12}, {str: 'bbb', num: 34}, {str: 'ccc', num: 56}, {str: 'ddd', num: 78}]

が格納されます。

3. digdagから配列の中身のhashの値ごとにTask2.task2を呼び出す

+parallel_task:
  for_each>:
    HASH: ${HASH_LIST}
  _parallel: true
  _do:
    +task2:
      _retry: 3
      _export:
        docker:
          image: ruby:2.5.1
      rb>: Task2.task2
      require: 'tasks/task2'

HASH_LIST の中身のHashごとにHASHに値が格納され、それぞれの値の状態でTask2.task2が並列に実行されます。
_parallel をfalseにすることで直列実行することも可能です。

4. task.rbのTask.task2を実行

def task2
  p Digdag.env.params['HASH']
end

実行結果

$ digdag run digdag2.dig
2018-12-10 21:01:09 +0900: Digdag v0.9.31
2018-12-10 21:01:10 +0900 [WARN] (main): Reusing the last session time 2018-12-04T00:00:00+09:00.
2018-12-10 21:01:10 +0900 [INFO] (main): Using session /Users/katsuya.tajima/tmp/digdag_sample_store_hash/.digdag/status/20181204T000000+0900.
2018-12-10 21:01:10 +0900 [INFO] (main): Starting a new session project id=1 workflow name=digdag2 session_time=2018-12-04T00:00:00+09:00
2018-12-10 21:01:11 +0900 [INFO] (0018@[0:default]+digdag2+task1): rb>: Task2.task1
2018-12-10 21:01:13 +0900 [INFO] (0018@[0:default]+digdag2+parallel_task): for_each>: {HASH=[{"str":"aaa","num":12},{"str":"bbb","num":34},{"str":"ccc","num":56},{"str":"ddd","num":78}]}
2018-12-10 21:01:14 +0900 [INFO] (0018@[0:default]+digdag2+parallel_task^sub+for-0=HASH=0=%7B%22str%22%3A%22+task2): rb>: Task2.task2
2018-12-10 21:01:14 +0900 [INFO] (0020@[0:default]+digdag2+parallel_task^sub+for-0=HASH=1=%7B%22str%22%3A%22+task2): rb>: Task2.task2
2018-12-10 21:01:14 +0900 [INFO] (0022@[0:default]+digdag2+parallel_task^sub+for-0=HASH=3=%7B%22str%22%3A%22+task2): rb>: Task2.task2
2018-12-10 21:01:14 +0900 [INFO] (0021@[0:default]+digdag2+parallel_task^sub+for-0=HASH=2=%7B%22str%22%3A%22+task2): rb>: Task2.task2
{"str"=>"bbb", "num"=>34}
{"str"=>"ddd", "num"=>78}
{"str"=>"aaa", "num"=>12}
{"str"=>"ccc", "num"=>56}
Success. Task state is saved at /Users/katsuya.tajima/tmp/digdag_sample_store_hash/.digdag/status/20181204T000000+0900 directory.
  * Use --session <daily | hourly | "yyyy-MM-dd[ HH:mm:ss]"> to not reuse the last session time.
  * Use --rerun, --start +NAME, or --goal +NAME argument to rerun skipped tasks.

実際に配列の中身がランダムに出力されており、並列に実行されたことがわかります。

まとめ

DigdagではRubyのタスク間でまとまったデータを渡すこときにHashをそのままりようできます。ただし、symbolで格納したキーは文字列に変換されてしまうことに注意しましょう。また、Hashを利用したパラメータ渡しを利用し、並列処理を簡単に実現できることを紹介しました。

7
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
7
1