1
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

Digdag公式ドキュメントからDigdagを学ぶ-Operators①Workflow control operators

Last updated at Posted at 2020-07-13

目標

Digdagの公式サイトのドキュメントのOperatorsの翻訳+α
DigdagのRubyを使ってRailsにバッチを作るまでが最後の目標
http://docs.digdag.io/operators/workflow_control.html

目次

Getting started
Architecture
Concepts
Workflow definition
Scheduling workflow
Operators
Command reference
Language API -Ruby
Digdagで環境毎に設定値を変える(RubyOnRails)
Digdagを用いてRubyOnRails環境でバッチ実装

Operators

Workflow control operators

call>: Call another workflow

workfolw1.dig
timezone: Asia/Tokyo

+step1:
  call>: another_workflow.dig
+step2:
  call>: common/shared_workflow.dig
another_workflow.dig
+step1:
    sh>: echo hi! another_workflow.dig
/common/shared_workflow.dig
+step1:
    sh>: echo hi! ./common/shared_workflow.dig
結果
$ digdag run workflow1.dig --rerun
2020-07-12 13:38:54 +0900 [INFO] (0017@[0:default]+workflow1+step1): call>: another_workflow.dig
2020-07-12 13:38:54 +0900 [INFO] (0017@[0:default]+workflow1+step1^sub+step1): sh>: echo hi! another_workflow.dig
hi! another_workflow.dig
2020-07-12 13:38:54 +0900 [INFO] (0017@[0:default]+workflow1+step2): call>: common/shared_workflow.dig
2020-07-12 13:38:55 +0900 [INFO] (0017@[0:default]+workflow1+step2^sub+step1): sh>: echo hi! ./common/shared_workflow.dig
hi! ./common/shared_workflow.dig

call>: FILE
FILEにはワークフロー定義ファイルへのパスが入ります。
ファイル名は.digで終わる必要があります。
呼び出されたワークフローがサブディレクトリにある場合、ワークフローはサブディレクトリを作業ディレクトリとして使用します。
例)タスクにはcall>:common/called_workflow.digが定義されている。呼び出されたワークフローでquerys/data.sqlファイルを参照した場合は../queries/data.sqlで参照する。

call>: another_workfloww.dig

http_call>: Call workflow fetched by HTTP

http_call> オペレーターは、HTTP要求を作成し、応答本文をワークフローとして解析それをサブタスクとして埋め込みます。call>オペレーターに似ています。違いは、別のワークフローがHTTPからフェッチされることです。

この演算子は、返されたContent-Typeヘッダーに基づいて応答本文を解析します。 Content-Typeを設定する必要があり、次の値がサポートされています。

application/json: 応答をJSONとして解析します。
application/x-yaml: 返された本文をそのまま使用します。
適切なContent-Typeヘッダーが返されない場合は、content_type_overrideオプションを使用します。

Options
content_type_override:サーバーから返されたContent-Type応答ヘッダーをオーバーライドします。このオプションは、サーバーが適切なContent-Typeを返さないが、text/plainapplication/octet-streamなどの一般的な値を返す場合に役立ちます。

http_call>: https://api.example.com/foobar
content_type_override: application/x-yaml

require>: Depends on another workflow

require> オペレーターは、別のワークフローの完了を要求します。
このオペレーターはcall>オペレーターに似ていますが、このオペレーターは、既に実行されている場合、またはこのワークフローの同じセッション時間に実行されている場合、他のワークフローを開始しません。
ワークフローが実行中または新しく開始された場合、このオペレーターはワークフローが完了するまで待機します。さらにrequireオペレーターは別のプロジェクトのワークフローを開始することができます。

workflow1.dig
+step1:
  require>: another_workflow
another_workflow.dig
+step2:
  sh>: echo step2
実行結果
$ digdag run workflow1.dig --rerun
2020-07-12 14:55:34 +0900 [INFO] (0017@[0:default]+workflow1+step1): require>: another_workflow
2020-07-12 14:55:34 +0900 [INFO] (0017@[0:default]+workflow1+step1): Starting a new session project id=1 workflow name=another_workflow session_time=2020-07-11T15:00:00+00:00
2020-07-12 14:55:34 +0900 [INFO] (0017@[0:default]+another_workflow+step2): sh>: echo step2
step2

Options
project_id: project_id
project_name: project_name
project_idまたはproject_nameを設定することで、別のプロジェクトのワークフローを開始できます。プロジェクトが存在しない場合、タスクは失敗します。 project_idとproject_nameの両方を設定した場合、タスクは失敗します。

another_project_wf
project_id: 12345
require>: another_project_wf
project_name: another_project

rerun_on: none, failed, all (default: none)
もし依存するワークフローの試行が存在したらrerun_onrequire>を実際開始するかどうかコントロールします。
none: 試行がすでに存在する場合、ワークフローを開始しません。
failed: 試行が存在し、その結果が成功しない場合、ワークフローを開始します。
all: require>試行の結果に関係なくワークフローを開始します。

ignore_failure:BOOLEAN
依存ワークフローがデフォルトでエラーで終了した場合、このオペレーターは失敗します。
ただし、ignore_failure:trueが設定されている場合、ワークフローがエラーで終了した場合でも、このオペーレーターは成功します。

require>: another_workflow
ignore_failure: true

params:MAP
このオペレーターはrequireに設定されたワークフローにパラメーターを渡します。
パ別のワークフローには渡しません。

workflow1.dig
+step1:
  require>: another_workflow
  params:
    param_name1: hello
another_workflow
+step2:
  sh>: echo step2:${param_name1}
実行結果
$ digdag run workflow1.dig --rerun
2020-07-12 15:19:34 +0900 [INFO] (0017@[0:default]+workflow1+step1): require>: another_workflow
2020-07-12 15:19:34 +0900 [INFO] (0017@[0:default]+workflow1+step1): Starting a new session project id=1 workflow name=another_workflow session_time=2020-07-11T15:00:00+00:00
2020-07-12 15:19:34 +0900 [INFO] (0017@[0:default]+another_workflow+step2): sh>: echo step2:hello
step2:hello

loop>: Repeat tasks

loop>オペレーターは、サブタスクを複数回実行します。

このオペレーターは、サブタスクの$ {i}変数をエクスポートします。その値は0から始まります。たとえば、countが3の場合、タスクはi = 0、i = 1、およびi = 2で実行されます。

workflow1.dig
+repeat:
  loop>: 7
  _do:
    +step1:
      echo>: ${moment(session_time).add(i, 'days')} is ${i} days later than ${session_date}
    +step2:
      echo>: ${moment(session_time).add(i, 'hours')} is ${i} hours later than ${session_local_time}.
結果
$ digdag run workflow1.dig --rerun

2020-07-12 16:19:04 +0900 [INFO] (0017@[0:default]+workflow1+repeat): loop>: 7
2020-07-12 16:19:05 +0900 [INFO] (0017@[0:default]+workflow1+repeat^sub+loop-0+step1): echo>: "2020-07-11T15:00:00.000Z" is 0 days later than 2020-07-11
"2020-07-11T15:00:00.000Z" is 0 days later than 2020-07-11
2020-07-12 16:19:06 +0900 [INFO] (0017@[0:default]+workflow1+repeat^sub+loop-0+step2): echo>: "2020-07-11T15:00:00.000Z" is 0 hours later than 2020-07-11 15:00:00.
"2020-07-11T15:00:00.000Z" is 0 hours later than 2020-07-11 15:00:00.
2020-07-12 16:19:06 +0900 [INFO] (0017@[0:default]+workflow1+repeat^sub+loop-1+step1): echo>: "2020-07-12T15:00:00.000Z" is 1 days later than 2020-07-11
"2020-07-12T15:00:00.000Z" is 1 days later than 2020-07-11
2020-07-12 16:19:07 +0900 [INFO] (0017@[0:default]+workflow1+repeat^sub+loop-1+step2): echo>: "2020-07-11T16:00:00.000Z" is 1 hours later than 2020-07-11 15:00:00.
"2020-07-11T16:00:00.000Z" is 1 hours later than 2020-07-11 15:00:00.
2020-07-12 16:19:07 +0900 [INFO] (0017@[0:default]+workflow1+repeat^sub+loop-2+step1): echo>: "2020-07-13T15:00:00.000Z" is 2 days later than 2020-07-11
"2020-07-13T15:00:00.000Z" is 2 days later than 2020-07-11
2020-07-12 16:19:07 +0900 [INFO] (0017@[0:default]+workflow1+repeat^sub+loop-2+step2): echo>: "2020-07-11T17:00:00.000Z" is 2 hours later than 2020-07-11 15:00:00.
"2020-07-11T17:00:00.000Z" is 2 hours later than 2020-07-11 15:00:00.
2020-07-12 16:19:08 +0900 [INFO] (0017@[0:default]+workflow1+repeat^sub+loop-3+step1): echo>: "2020-07-14T15:00:00.000Z" is 3 days later than 2020-07-11
"2020-07-14T15:00:00.000Z" is 3 days later than 2020-07-11
2020-07-12 16:19:08 +0900 [INFO] (0017@[0:default]+workflow1+repeat^sub+loop-3+step2): echo>: "2020-07-11T18:00:00.000Z" is 3 hours later than 2020-07-11 15:00:00.
"2020-07-11T18:00:00.000Z" is 3 hours later than 2020-07-11 15:00:00.
2020-07-12 16:19:08 +0900 [INFO] (0017@[0:default]+workflow1+repeat^sub+loop-4+step1): echo>: "2020-07-15T15:00:00.000Z" is 4 days later than 2020-07-11
"2020-07-15T15:00:00.000Z" is 4 days later than 2020-07-11
2020-07-12 16:19:09 +0900 [INFO] (0017@[0:default]+workflow1+repeat^sub+loop-4+step2): echo>: "2020-07-11T19:00:00.000Z" is 4 hours later than 2020-07-11 15:00:00.
"2020-07-11T19:00:00.000Z" is 4 hours later than 2020-07-11 15:00:00.
2020-07-12 16:19:09 +0900 [INFO] (0017@[0:default]+workflow1+repeat^sub+loop-5+step1): echo>: "2020-07-16T15:00:00.000Z" is 5 days later than 2020-07-11
"2020-07-16T15:00:00.000Z" is 5 days later than 2020-07-11
2020-07-12 16:19:09 +0900 [INFO] (0017@[0:default]+workflow1+repeat^sub+loop-5+step2): echo>: "2020-07-11T20:00:00.000Z" is 5 hours later than 2020-07-11 15:00:00.
"2020-07-11T20:00:00.000Z" is 5 hours later than 2020-07-11 15:00:00.
2020-07-12 16:19:10 +0900 [INFO] (0017@[0:default]+workflow1+repeat^sub+loop-6+step1): echo>: "2020-07-17T15:00:00.000Z" is 6 days later than 2020-07-11
"2020-07-17T15:00:00.000Z" is 6 days later than 2020-07-11
2020-07-12 16:19:10 +0900 [INFO] (0017@[0:default]+workflow1+repeat^sub+loop-6+step2): echo>: "2020-07-11T21:00:00.000Z" is 6 hours later than 2020-07-11 15:00:00.
"2020-07-11T21:00:00.000Z" is 6 hours later than 2020-07-11 15:00:00.

Options
_parallel:BOOLEAN
タスクを並列に実行
_parallel:true
_do: TASKS: loop内で実行されるタスク

for_each>: Repeat tasks for values

for_each> オペレーターは変数セットを使ってサブタスクを複数実行する

workflow1.rb
+repeat:
  for_each>:
    fruit: [apple, orange]
    verb: [eat, throw]
  _do:
    echo>: ${verb} ${fruit}

結果
$ digdag run workflow1.dig --rerun
2020-07-12 16:27:00 +0900 [INFO] (0017@[0:default]+workflow1+repeat): for_each>: {fruit=[apple, orange], verb=[eat, throw]}
2020-07-12 16:27:01 +0900 [INFO] (0017@[0:default]+workflow1+repeat^sub+for-0=fruit=0=apple&1=verb=0=eat): echo>: eat apple
eat apple
2020-07-12 16:27:01 +0900 [INFO] (0017@[0:default]+workflow1+repeat^sub+for-0=fruit=0=apple&1=verb=1=throw): echo>: throw apple
throw apple
2020-07-12 16:27:01 +0900 [INFO] (0017@[0:default]+workflow1+repeat^sub+for-0=fruit=1=orange&1=verb=0=eat): echo>: eat orange
eat orange
2020-07-12 16:27:02 +0900 [INFO] (0017@[0:default]+workflow1+repeat^sub+for-0=fruit=1=orange&1=verb=1=throw): echo>: throw orange
throw orange

Options
for_each>: VARIABLES
キーのループで使用される変数:[値、値、...]構文。
変数は、オブジェクトまたはJSON文字列です。

例1
for_each>: {i: [1, 2, 3]}
例2
for_each>: {i: '[1, 2, 3]'}

_parallel:BOOLEAN
反復処理のタスク後並列に実行

_do:TASKS
実行されるタスク

for_range>: Repeat tasks for a range

for_range> オペレーターは、変数のセットを使用してサブタスクを複数回実行します。

このオペレーターは、サブタスクの${range.from}${range.to} 、および${range.index} 変数をエクスポートします。インデックスは0から始まります。

workflow1.dig
+repeat:
  for_range>:
    from: 10
    to: 50
    step: 10
  _do:
    echo>: processing from ${range.from} to ${range.to}.
結果
$ digdag run workflow1.dig --rerun
2020-07-12 16:47:17 +0900 [INFO] (0017@[0:default]+workflow1+repeat): for_range>: {from=10, to=50, step=10}
2020-07-12 16:47:18 +0900 [INFO] (0017@[0:default]+workflow1+repeat^sub+range-from=10&to=20): echo>: processing from 10 to 20.
processing from 10 to 20.
2020-07-12 16:47:18 +0900 [INFO] (0017@[0:default]+workflow1+repeat^sub+range-from=20&to=30): echo>: processing from 20 to 30.
processing from 20 to 30.
2020-07-12 16:47:18 +0900 [INFO] (0017@[0:default]+workflow1+repeat^sub+range-from=30&to=40): echo>: processing from 30 to 40.
processing from 30 to 40.
2020-07-12 16:47:19 +0900 [INFO] (0017@[0:default]+workflow1+repeat^sub+range-from=40&to=50): echo>: processing from 40 to 50.
processing from 40 to 50.

Options
for_range>:
slices: 反復をslicesで指定した数で分割して実行

for_range>:
  from: 0
  to: 10
  slices: 3
  # this repeats tasks for 3 times (size of a slice is computed automatically):
  #  * {range.from: 0, range.to: 4, range.index: 0}
  #  * {range.from: 4, range.to: 8, range.index: 1}
  #  * {range.from: 8, range.to: 10, range.index: 2}
_do:
  echo>: from ${range.from} to ${range.to}

_parallel:BOOLEAN
反復処理のタスク後並列に実行

_do:TASKS
実行されるタスク

if>: Conditional execution

tureの場合_doのサブタスクを実行する
falseの場合_else_doのサブタスクを実行

workflow1.dig
+run_if_param_is_false:
  if>: ${param}
  _do:
    echo>: ${param} == true
  _else_do:
    echo>: ${param} == false
param_true
$ digdag run workflow1.dig --rerun -p param=true
2020-07-12 17:01:32 +0900 [INFO] (0017@[0:default]+workflow1+run_if_param_is_false): if>: true
2020-07-12 17:01:33 +0900 [INFO] (0017@[0:default]+workflow1+run_if_param_is_false^sub): echo>: true == true
true == true
param_false
$ digdag run workflow1.dig --rerun -p param=false
2020-07-12 17:01:14 +0900 [INFO] (0017@[0:default]+workflow1+run_if_param_is_false): if>: false
2020-07-12 17:01:15 +0900 [INFO] (0017@[0:default]+workflow1+run_if_param_is_false^sub): echo>: false == false
false == false

fail>: Makes the workflow failed

検証に失敗した場合実行される

+fail_if_too_few:
  if>: ${count < 10}
  _do:
    fail>: count is less than 10!
count_11
$ digdag run workflow1.dig --rerun -p count=11
2020-07-12 17:05:52 +0900: Digdag v0.9.41
2020-07-12 17:05:54 +0900 [WARN] (main): Reusing the last session time 2020-07-11T15:00:00+00:00.
2020-07-12 17:05:54 +0900 [INFO] (main): Using session /Users/akira/Desktop/ruby/sample/workflows/.digdag/status/20200711T150000+0000.
2020-07-12 17:05:54 +0900 [INFO] (main): Starting a new session project id=1 workflow name=workflow1 session_time=2020-07-11T15:00:00+00:00
2020-07-12 17:05:55 +0900 [INFO] (0017@[0:default]+workflow1+fail_if_too_few): if>: false

count_9
$ digdag run workflow1.dig --rerun -p count=9
2020-07-12 17:05:46 +0900 [INFO] (0017@[0:default]+workflow1+fail_if_too_few): if>: true
2020-07-12 17:05:47 +0900 [INFO] (0017@[0:default]+workflow1+fail_if_too_few^sub): fail>: count is less than 10!
2020-07-12 17:05:47 +0900 [ERROR] (0017@[0:default]+workflow1+fail_if_too_few^sub): Task +workflow1+fail_if_too_few^sub failed.
count is less than 10!
2020-07-12 17:05:47 +0900 [INFO] (0017@[0:default]+workflow1^failure-alert): type: notify
error: 
  * +workflow1+fail_if_too_few^sub:
    count is less than 10!

echo>: Shows a message

メッセージ出力

+say_hello:
  echo>: Hello world!
1
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?