前回Digdagのstoreのパラメータを環境ごとにセットアップするTipsを紹介しました。
今回はRubyを使ったDigdagのstoreパラメータ渡しのTipsを紹介します。
やり方
Digdagではrubyのタスク間でHashを渡すことができます。これを利用することで、まとまった情報をタスク間で共有することができます。
一点注意点があり、Digdag storeでkeyがシンボルのhashを渡しても、Digdag paramsで取得したHashのキーが文字列になってしまうということです。
また、もともと文字列をキーとしてHashを作成すればそのままの形でパラメータが渡されます。
> Digdag.env.store(HASH: {key1: 123, key2: 'abc'})
->
> Digdag.env.params['HASH']
=> {'key1' => 123, 'key2' => 'abc'}
サンプル
最終的な構成はGitHubからも確認できます。(https://github.com/katsuyan/digdag_sample_store_hash)
ディレクトリ構成
.
├── digdsag.dig
└── tasks
└── task.rb
digdag.dig
timezone: Asia/Tokyo
+task1:
_retry: 3
_export:
docker:
image: ruby:2.5.1
rb>: Task.task1
require: 'tasks/task'
+task2:
_retry: 3
_export:
docker:
image: ruby:2.5.1
rb>: Task.task2
require: 'tasks/task'
task.rb
require 'yaml'
class Task
def task1
hash = {str: 'aaa', num: 123}
Digdag.env.store(HASH: hash)
end
def task2
hash = Digdag.env.params['HASH']
puts '---hash---'
p hash
puts '---str---'
p hash['str']
puts '---b---'
p hash['num']
end
end
実行の流れ
1. digdagからTask.task1を呼び出す
+task1:
_retry: 3
_export:
docker:
image: ruby:2.5.1
rb>: Task.task1
require: 'tasks/task'
2. task.rbのTask.task1を実行
def task1
hash = {str: 'aaa', num: 123}
Digdag.env.store(HASH: hash)
end
これにより、HASH
に{str: 'aaa', num: 123}
が格納されます。
3. digdagからTask.task2を呼び出す
+task2:
_retry: 3
_export:
docker:
image: ruby:2.5.1
rb>: Task.task2
require: 'tasks/task'
4. task.rbのTask.task2を実行
def task2
hash = Digdag.env.params['HASH']
puts '---hash---'
p hash
puts '---str---'
p hash['str']
puts '---b---'
p hash['num']
end
Digdag.env.params
を利用してHASH
に格納された値を取り出します。
この時、格納時にkeyとしてsymbolを利用したものは文字列に変換されているので注意が必要です。
実行結果
$ digdag run digdsag.dig
2018-12-10 20:56:42 +0900: Digdag v0.9.31
2018-12-10 20:56:44 +0900 [WARN] (main): Reusing the last session time 2018-12-04T00:00:00+09:00.
2018-12-10 20:56:44 +0900 [INFO] (main): Using session /Users/katsuya.tajima/tmp/digdag_sample_store_hash/.digdag/status/20181204T000000+0900.
2018-12-10 20:56:44 +0900 [INFO] (main): Starting a new session project id=1 workflow name=digdsag session_time=2018-12-04T00:00:00+09:00
2018-12-10 20:56:45 +0900 [INFO] (0018@[0:default]+digdsag+task1): rb>: Task.task1
2018-12-10 20:56:48 +0900 [INFO] (0018@[0:default]+digdsag+task2): rb>: Task.task2
---hash---
{"str"=>"aaa", "num"=>123}
---str---
"aaa"
---b---
123
Success. Task state is saved at /Users/katsuya.tajima/tmp/digdag_sample_store_hash/.digdag/status/20181204T000000+0900 directory.
* Use --session <daily | hourly | "yyyy-MM-dd[ HH:mm:ss]"> to not reuse the last session time.
* Use --rerun, --start +NAME, or --goal +NAME argument to rerun skipped tasks.
実行結果から実際にhashのキーが文字列に変換されていることがわかります。
また、文字列をキーとしてHashの値の取り出しに成功しています。
タスクの並列実行
タスク間でのhash渡しを利用することで、parallel実行を簡単に実現することができます。
HashのListを作成することで、各Hashごとにparallelでタスクを簡単に実行することができます。
サンプル
ディレクトリ構成
.
├── digdag2.dig
└── tasks
└── task2.rb
digdag.dig
timezone: Asia/Tokyo
+task1:
_retry: 3
_export:
docker:
image: ruby:2.5.1
rb>: Task2.task1
require: 'tasks/task2'
+parallel_task:
for_each>:
HASH: ${HASH_LIST}
_parallel: true
_do:
+task2:
_retry: 3
_export:
docker:
image: ruby:2.5.1
rb>: Task2.task2
require: 'tasks/task2'
task2.rb
require 'yaml'
class Task2
def task1
hash_list = [{str: 'aaa', num: 123}, {str: 'bbb', num: 456}]
Digdag.env.store(HASH_LIST: hash_list)
end
def task2
p Digdag.env.params['HASH']
end
end
実行の流れ
1. digdagからTask2.task1を呼び出す
+task1:
_retry: 3
_export:
docker:
image: ruby:2.5.1
rb>: Task2.task1
require: 'tasks/task2'
2. task.rbのTask2.task1を実行
def task1
hash_list = [{str: 'aaa', num: 12}, {str: 'bbb', num: 34}, {str: 'ccc', num: 56}, {str: 'ddd', num: 78}]
Digdag.env.store(HASH_LIST: hash_list)
end
これにより、HASH_LIST
に
[{str: 'aaa', num: 12}, {str: 'bbb', num: 34}, {str: 'ccc', num: 56}, {str: 'ddd', num: 78}]
が格納されます。
3. digdagから配列の中身のhashの値ごとにTask2.task2を呼び出す
+parallel_task:
for_each>:
HASH: ${HASH_LIST}
_parallel: true
_do:
+task2:
_retry: 3
_export:
docker:
image: ruby:2.5.1
rb>: Task2.task2
require: 'tasks/task2'
HASH_LIST
の中身のHashごとにHASH
に値が格納され、それぞれの値の状態でTask2.task2が並列に実行されます。
_parallel
をfalseにすることで直列実行することも可能です。
4. task.rbのTask.task2を実行
def task2
p Digdag.env.params['HASH']
end
実行結果
$ digdag run digdag2.dig
2018-12-10 21:01:09 +0900: Digdag v0.9.31
2018-12-10 21:01:10 +0900 [WARN] (main): Reusing the last session time 2018-12-04T00:00:00+09:00.
2018-12-10 21:01:10 +0900 [INFO] (main): Using session /Users/katsuya.tajima/tmp/digdag_sample_store_hash/.digdag/status/20181204T000000+0900.
2018-12-10 21:01:10 +0900 [INFO] (main): Starting a new session project id=1 workflow name=digdag2 session_time=2018-12-04T00:00:00+09:00
2018-12-10 21:01:11 +0900 [INFO] (0018@[0:default]+digdag2+task1): rb>: Task2.task1
2018-12-10 21:01:13 +0900 [INFO] (0018@[0:default]+digdag2+parallel_task): for_each>: {HASH=[{"str":"aaa","num":12},{"str":"bbb","num":34},{"str":"ccc","num":56},{"str":"ddd","num":78}]}
2018-12-10 21:01:14 +0900 [INFO] (0018@[0:default]+digdag2+parallel_task^sub+for-0=HASH=0=%7B%22str%22%3A%22+task2): rb>: Task2.task2
2018-12-10 21:01:14 +0900 [INFO] (0020@[0:default]+digdag2+parallel_task^sub+for-0=HASH=1=%7B%22str%22%3A%22+task2): rb>: Task2.task2
2018-12-10 21:01:14 +0900 [INFO] (0022@[0:default]+digdag2+parallel_task^sub+for-0=HASH=3=%7B%22str%22%3A%22+task2): rb>: Task2.task2
2018-12-10 21:01:14 +0900 [INFO] (0021@[0:default]+digdag2+parallel_task^sub+for-0=HASH=2=%7B%22str%22%3A%22+task2): rb>: Task2.task2
{"str"=>"bbb", "num"=>34}
{"str"=>"ddd", "num"=>78}
{"str"=>"aaa", "num"=>12}
{"str"=>"ccc", "num"=>56}
Success. Task state is saved at /Users/katsuya.tajima/tmp/digdag_sample_store_hash/.digdag/status/20181204T000000+0900 directory.
* Use --session <daily | hourly | "yyyy-MM-dd[ HH:mm:ss]"> to not reuse the last session time.
* Use --rerun, --start +NAME, or --goal +NAME argument to rerun skipped tasks.
実際に配列の中身がランダムに出力されており、並列に実行されたことがわかります。
まとめ
DigdagではRubyのタスク間でまとまったデータを渡すこときにHashをそのままりようできます。ただし、symbolで格納したキーは文字列に変換されてしまうことに注意しましょう。また、Hashを利用したパラメータ渡しを利用し、並列処理を簡単に実現できることを紹介しました。