目標
Digdagの公式サイトのドキュメントのOperatorsの翻訳+α
DigdagのRubyを使ってRailsにバッチを作るまでが最後の目標
http://docs.digdag.io/operators/workflow_control.html
目次
Getting started
Architecture
Concepts
Workflow definition
Scheduling workflow
Operators
Command reference
Language API -Ruby
Digdagで環境毎に設定値を変える(RubyOnRails)
Digdagを用いてRubyOnRails環境でバッチ実装
Operators
Workflow control operators
call>: Call another workflow
timezone: Asia/Tokyo
+step1:
call>: another_workflow.dig
+step2:
call>: common/shared_workflow.dig
+step1:
sh>: echo hi! another_workflow.dig
+step1:
sh>: echo hi! ./common/shared_workflow.dig
$ digdag run workflow1.dig --rerun
2020-07-12 13:38:54 +0900 [INFO] (0017@[0:default]+workflow1+step1): call>: another_workflow.dig
2020-07-12 13:38:54 +0900 [INFO] (0017@[0:default]+workflow1+step1^sub+step1): sh>: echo hi! another_workflow.dig
hi! another_workflow.dig
2020-07-12 13:38:54 +0900 [INFO] (0017@[0:default]+workflow1+step2): call>: common/shared_workflow.dig
2020-07-12 13:38:55 +0900 [INFO] (0017@[0:default]+workflow1+step2^sub+step1): sh>: echo hi! ./common/shared_workflow.dig
hi! ./common/shared_workflow.dig
■call>: FILE
FILEにはワークフロー定義ファイルへのパスが入ります。
ファイル名は.digで終わる必要があります。
呼び出されたワークフローがサブディレクトリにある場合、ワークフローはサブディレクトリを作業ディレクトリとして使用します。
例)タスクにはcall>:common/called_workflow.dig
が定義されている。呼び出されたワークフローでquerys/data.sql
ファイルを参照した場合は../queries/data.sql
で参照する。
call>: another_workfloww.dig
http_call>: Call workflow fetched by HTTP
http_call>
オペレーターは、HTTP要求を作成し、応答本文をワークフローとして解析それをサブタスクとして埋め込みます。call>オペレーターに似ています。違いは、別のワークフローがHTTPからフェッチされることです。
この演算子は、返されたContent-Typeヘッダーに基づいて応答本文を解析します。 Content-Typeを設定する必要があり、次の値がサポートされています。
application/json
: 応答をJSONとして解析します。
application/x-yaml
: 返された本文をそのまま使用します。
適切なContent-Typeヘッダーが返されない場合は、content_type_overrideオプションを使用します。
Options
content_type_override:サーバーから返されたContent-Type応答ヘッダーをオーバーライドします。このオプションは、サーバーが適切なContent-Typeを返さないが、text/plain
やapplication/octet-stream
などの一般的な値を返す場合に役立ちます。
http_call>: https://api.example.com/foobar
content_type_override: application/x-yaml
require>: Depends on another workflow
require>
オペレーターは、別のワークフローの完了を要求します。
このオペレーターはcall>
オペレーターに似ていますが、このオペレーターは、既に実行されている場合、またはこのワークフローの同じセッション時間に実行されている場合、他のワークフローを開始しません。
ワークフローが実行中または新しく開始された場合、このオペレーターはワークフローが完了するまで待機します。さらにrequireオペレーターは別のプロジェクトのワークフローを開始することができます。
+step1:
require>: another_workflow
+step2:
sh>: echo step2
$ digdag run workflow1.dig --rerun
2020-07-12 14:55:34 +0900 [INFO] (0017@[0:default]+workflow1+step1): require>: another_workflow
2020-07-12 14:55:34 +0900 [INFO] (0017@[0:default]+workflow1+step1): Starting a new session project id=1 workflow name=another_workflow session_time=2020-07-11T15:00:00+00:00
2020-07-12 14:55:34 +0900 [INFO] (0017@[0:default]+another_workflow+step2): sh>: echo step2
step2
Options
■project_id: project_id
■project_name: project_name
project_id
またはproject_name
を設定することで、別のプロジェクトのワークフローを開始できます。プロジェクトが存在しない場合、タスクは失敗します。 project_idとproject_nameの両方を設定した場合、タスクは失敗します。
project_id: 12345
require>: another_project_wf
project_name: another_project
■rerun_on: none, failed, all (default: none)
もし依存するワークフローの試行が存在したらrerun_on
はrequire>
を実際開始するかどうかコントロールします。
none: 試行がすでに存在する場合、ワークフローを開始しません。
failed: 試行が存在し、その結果が成功しない場合、ワークフローを開始します。
all: require>
試行の結果に関係なくワークフローを開始します。
■ignore_failure:BOOLEAN
依存ワークフローがデフォルトでエラーで終了した場合、このオペレーターは失敗します。
ただし、ignore_failure:trueが設定されている場合、ワークフローがエラーで終了した場合でも、このオペーレーターは成功します。
require>: another_workflow
ignore_failure: true
■params:MAP
このオペレーターはrequireに設定されたワークフローにパラメーターを渡します。
パ別のワークフローには渡しません。
+step1:
require>: another_workflow
params:
param_name1: hello
+step2:
sh>: echo step2:${param_name1}
$ digdag run workflow1.dig --rerun
2020-07-12 15:19:34 +0900 [INFO] (0017@[0:default]+workflow1+step1): require>: another_workflow
2020-07-12 15:19:34 +0900 [INFO] (0017@[0:default]+workflow1+step1): Starting a new session project id=1 workflow name=another_workflow session_time=2020-07-11T15:00:00+00:00
2020-07-12 15:19:34 +0900 [INFO] (0017@[0:default]+another_workflow+step2): sh>: echo step2:hello
step2:hello
loop>: Repeat tasks
loop>オペレーターは、サブタスクを複数回実行します。
このオペレーターは、サブタスクの$ {i}変数をエクスポートします。その値は0から始まります。たとえば、countが3の場合、タスクはi = 0、i = 1、およびi = 2で実行されます。
+repeat:
loop>: 7
_do:
+step1:
echo>: ${moment(session_time).add(i, 'days')} is ${i} days later than ${session_date}
+step2:
echo>: ${moment(session_time).add(i, 'hours')} is ${i} hours later than ${session_local_time}.
$ digdag run workflow1.dig --rerun
2020-07-12 16:19:04 +0900 [INFO] (0017@[0:default]+workflow1+repeat): loop>: 7
2020-07-12 16:19:05 +0900 [INFO] (0017@[0:default]+workflow1+repeat^sub+loop-0+step1): echo>: "2020-07-11T15:00:00.000Z" is 0 days later than 2020-07-11
"2020-07-11T15:00:00.000Z" is 0 days later than 2020-07-11
2020-07-12 16:19:06 +0900 [INFO] (0017@[0:default]+workflow1+repeat^sub+loop-0+step2): echo>: "2020-07-11T15:00:00.000Z" is 0 hours later than 2020-07-11 15:00:00.
"2020-07-11T15:00:00.000Z" is 0 hours later than 2020-07-11 15:00:00.
2020-07-12 16:19:06 +0900 [INFO] (0017@[0:default]+workflow1+repeat^sub+loop-1+step1): echo>: "2020-07-12T15:00:00.000Z" is 1 days later than 2020-07-11
"2020-07-12T15:00:00.000Z" is 1 days later than 2020-07-11
2020-07-12 16:19:07 +0900 [INFO] (0017@[0:default]+workflow1+repeat^sub+loop-1+step2): echo>: "2020-07-11T16:00:00.000Z" is 1 hours later than 2020-07-11 15:00:00.
"2020-07-11T16:00:00.000Z" is 1 hours later than 2020-07-11 15:00:00.
2020-07-12 16:19:07 +0900 [INFO] (0017@[0:default]+workflow1+repeat^sub+loop-2+step1): echo>: "2020-07-13T15:00:00.000Z" is 2 days later than 2020-07-11
"2020-07-13T15:00:00.000Z" is 2 days later than 2020-07-11
2020-07-12 16:19:07 +0900 [INFO] (0017@[0:default]+workflow1+repeat^sub+loop-2+step2): echo>: "2020-07-11T17:00:00.000Z" is 2 hours later than 2020-07-11 15:00:00.
"2020-07-11T17:00:00.000Z" is 2 hours later than 2020-07-11 15:00:00.
2020-07-12 16:19:08 +0900 [INFO] (0017@[0:default]+workflow1+repeat^sub+loop-3+step1): echo>: "2020-07-14T15:00:00.000Z" is 3 days later than 2020-07-11
"2020-07-14T15:00:00.000Z" is 3 days later than 2020-07-11
2020-07-12 16:19:08 +0900 [INFO] (0017@[0:default]+workflow1+repeat^sub+loop-3+step2): echo>: "2020-07-11T18:00:00.000Z" is 3 hours later than 2020-07-11 15:00:00.
"2020-07-11T18:00:00.000Z" is 3 hours later than 2020-07-11 15:00:00.
2020-07-12 16:19:08 +0900 [INFO] (0017@[0:default]+workflow1+repeat^sub+loop-4+step1): echo>: "2020-07-15T15:00:00.000Z" is 4 days later than 2020-07-11
"2020-07-15T15:00:00.000Z" is 4 days later than 2020-07-11
2020-07-12 16:19:09 +0900 [INFO] (0017@[0:default]+workflow1+repeat^sub+loop-4+step2): echo>: "2020-07-11T19:00:00.000Z" is 4 hours later than 2020-07-11 15:00:00.
"2020-07-11T19:00:00.000Z" is 4 hours later than 2020-07-11 15:00:00.
2020-07-12 16:19:09 +0900 [INFO] (0017@[0:default]+workflow1+repeat^sub+loop-5+step1): echo>: "2020-07-16T15:00:00.000Z" is 5 days later than 2020-07-11
"2020-07-16T15:00:00.000Z" is 5 days later than 2020-07-11
2020-07-12 16:19:09 +0900 [INFO] (0017@[0:default]+workflow1+repeat^sub+loop-5+step2): echo>: "2020-07-11T20:00:00.000Z" is 5 hours later than 2020-07-11 15:00:00.
"2020-07-11T20:00:00.000Z" is 5 hours later than 2020-07-11 15:00:00.
2020-07-12 16:19:10 +0900 [INFO] (0017@[0:default]+workflow1+repeat^sub+loop-6+step1): echo>: "2020-07-17T15:00:00.000Z" is 6 days later than 2020-07-11
"2020-07-17T15:00:00.000Z" is 6 days later than 2020-07-11
2020-07-12 16:19:10 +0900 [INFO] (0017@[0:default]+workflow1+repeat^sub+loop-6+step2): echo>: "2020-07-11T21:00:00.000Z" is 6 hours later than 2020-07-11 15:00:00.
"2020-07-11T21:00:00.000Z" is 6 hours later than 2020-07-11 15:00:00.
Options
■_parallel:BOOLEAN
タスクを並列に実行
_parallel:true
■_do: TASKS
: loop内で実行されるタスク
for_each>: Repeat tasks for values
for_each>
オペレーターは変数セットを使ってサブタスクを複数実行する
+repeat:
for_each>:
fruit: [apple, orange]
verb: [eat, throw]
_do:
echo>: ${verb} ${fruit}
$ digdag run workflow1.dig --rerun
2020-07-12 16:27:00 +0900 [INFO] (0017@[0:default]+workflow1+repeat): for_each>: {fruit=[apple, orange], verb=[eat, throw]}
2020-07-12 16:27:01 +0900 [INFO] (0017@[0:default]+workflow1+repeat^sub+for-0=fruit=0=apple&1=verb=0=eat): echo>: eat apple
eat apple
2020-07-12 16:27:01 +0900 [INFO] (0017@[0:default]+workflow1+repeat^sub+for-0=fruit=0=apple&1=verb=1=throw): echo>: throw apple
throw apple
2020-07-12 16:27:01 +0900 [INFO] (0017@[0:default]+workflow1+repeat^sub+for-0=fruit=1=orange&1=verb=0=eat): echo>: eat orange
eat orange
2020-07-12 16:27:02 +0900 [INFO] (0017@[0:default]+workflow1+repeat^sub+for-0=fruit=1=orange&1=verb=1=throw): echo>: throw orange
throw orange
Options
■for_each>: VARIABLES
キーのループで使用される変数:[値、値、...]構文。
変数は、オブジェクトまたはJSON文字列です。
for_each>: {i: [1, 2, 3]}
for_each>: {i: '[1, 2, 3]'}
■_parallel:BOOLEAN
反復処理のタスク後並列に実行
■_do:TASKS
実行されるタスク
for_range>: Repeat tasks for a range
for_range>
オペレーターは、変数のセットを使用してサブタスクを複数回実行します。
このオペレーターは、サブタスクの${range.from}
、${range.to}
、および${range.index}
変数をエクスポートします。インデックスは0から始まります。
+repeat:
for_range>:
from: 10
to: 50
step: 10
_do:
echo>: processing from ${range.from} to ${range.to}.
$ digdag run workflow1.dig --rerun
2020-07-12 16:47:17 +0900 [INFO] (0017@[0:default]+workflow1+repeat): for_range>: {from=10, to=50, step=10}
2020-07-12 16:47:18 +0900 [INFO] (0017@[0:default]+workflow1+repeat^sub+range-from=10&to=20): echo>: processing from 10 to 20.
processing from 10 to 20.
2020-07-12 16:47:18 +0900 [INFO] (0017@[0:default]+workflow1+repeat^sub+range-from=20&to=30): echo>: processing from 20 to 30.
processing from 20 to 30.
2020-07-12 16:47:18 +0900 [INFO] (0017@[0:default]+workflow1+repeat^sub+range-from=30&to=40): echo>: processing from 30 to 40.
processing from 30 to 40.
2020-07-12 16:47:19 +0900 [INFO] (0017@[0:default]+workflow1+repeat^sub+range-from=40&to=50): echo>: processing from 40 to 50.
processing from 40 to 50.
Options
■for_range>:
slices: 反復をslicesで指定した数で分割して実行
for_range>:
from: 0
to: 10
slices: 3
# this repeats tasks for 3 times (size of a slice is computed automatically):
# * {range.from: 0, range.to: 4, range.index: 0}
# * {range.from: 4, range.to: 8, range.index: 1}
# * {range.from: 8, range.to: 10, range.index: 2}
_do:
echo>: from ${range.from} to ${range.to}
■_parallel:BOOLEAN
反復処理のタスク後並列に実行
■_do:TASKS
実行されるタスク
if>: Conditional execution
tureの場合_do
のサブタスクを実行する
falseの場合_else_do
のサブタスクを実行
+run_if_param_is_false:
if>: ${param}
_do:
echo>: ${param} == true
_else_do:
echo>: ${param} == false
$ digdag run workflow1.dig --rerun -p param=true
2020-07-12 17:01:32 +0900 [INFO] (0017@[0:default]+workflow1+run_if_param_is_false): if>: true
2020-07-12 17:01:33 +0900 [INFO] (0017@[0:default]+workflow1+run_if_param_is_false^sub): echo>: true == true
true == true
$ digdag run workflow1.dig --rerun -p param=false
2020-07-12 17:01:14 +0900 [INFO] (0017@[0:default]+workflow1+run_if_param_is_false): if>: false
2020-07-12 17:01:15 +0900 [INFO] (0017@[0:default]+workflow1+run_if_param_is_false^sub): echo>: false == false
false == false
fail>: Makes the workflow failed
検証に失敗した場合実行される
+fail_if_too_few:
if>: ${count < 10}
_do:
fail>: count is less than 10!
$ digdag run workflow1.dig --rerun -p count=11
2020-07-12 17:05:52 +0900: Digdag v0.9.41
2020-07-12 17:05:54 +0900 [WARN] (main): Reusing the last session time 2020-07-11T15:00:00+00:00.
2020-07-12 17:05:54 +0900 [INFO] (main): Using session /Users/akira/Desktop/ruby/sample/workflows/.digdag/status/20200711T150000+0000.
2020-07-12 17:05:54 +0900 [INFO] (main): Starting a new session project id=1 workflow name=workflow1 session_time=2020-07-11T15:00:00+00:00
2020-07-12 17:05:55 +0900 [INFO] (0017@[0:default]+workflow1+fail_if_too_few): if>: false
$ digdag run workflow1.dig --rerun -p count=9
2020-07-12 17:05:46 +0900 [INFO] (0017@[0:default]+workflow1+fail_if_too_few): if>: true
2020-07-12 17:05:47 +0900 [INFO] (0017@[0:default]+workflow1+fail_if_too_few^sub): fail>: count is less than 10!
2020-07-12 17:05:47 +0900 [ERROR] (0017@[0:default]+workflow1+fail_if_too_few^sub): Task +workflow1+fail_if_too_few^sub failed.
count is less than 10!
2020-07-12 17:05:47 +0900 [INFO] (0017@[0:default]+workflow1^failure-alert): type: notify
error:
* +workflow1+fail_if_too_few^sub:
count is less than 10!
echo>: Shows a message
メッセージ出力
+say_hello:
echo>: Hello world!