Help us understand the problem. What is going on with this article?

Digdag公式ドキュメントからDigdagを学ぶ-アーWorkflow definition

目標

Digdagの公式サイトのドキュメントのWorkflow definitionの翻訳+α
DigdagのRubyを使ってRailsにバッチを作るまでが最後の目標
http://docs.digdag.io/workflow_definition.html

#目次

Getting started
Architecture
Concepts
Workflow definition
Scheduling workflow
Operators
Command reference
Language API -Ruby
Digdagで環境毎に設定値を変える(RubyOnRails)
Digdagを用いてRubyOnRails環境でバッチ実装

Workflow definition

Workflow definition: *.dig files

digdagワークフローは.dig拡張子が付いた名前のファイルで定義されます。
ファイルの名前はワークフローの名前です。

たとえば、hello_worldワークフローを作成すると、hello_world.digファイルが作成されます。 ファイルの内容は次のようになります。

hello_world.dig
timezone: Asia/Tokyo

+step1:
  sh>: tasks/shell_sample.sh

+step2:
  py>: tasks.MyWorkflow.step2
  param1: this is param1

+step3:
  rb>: MyWorkflow.step3
  require: tasks/ruby_sample.rb

timezoneパラメータは、ワークフローのタイムゾーンを構成するために使用され、セッションのタイムスタンプ変数とスケジューリングに影響を与えます。 デフォルトのタイムゾーンはUTCです。 他の有効なタイムゾーンの例には、America/Los_Angeles、Europe/Berlin、Asia/Tokyoなどがあります。

+記号はタスクを意味

+記号で始まるキー名はタスクです。 タスクは上から順に実行されます。 タスクは、別のタスクの子としてネストできます。 上記の例では、+step2+mainタスクの子として+step1の後に実行されます。

operators>

type>:commandまたは_type:NAMEパラメーターを持つタスクがアクションを実行します。 シェルスクリプトの実行、Pythonメソッド、電子メールの送信など、さまざまな種類のオペレーターを選択できます。組み込みオペレーターのリストについては、オペレーターのページを参照してください。

foo>:barは以下の二つのパラメーターと同じです。

_type: foo
_command: bar

Using ${variables}

ワークフローは$ {...}構文を使用して変数を埋め込むことができます。
組み込み変数を使用するか、独自の変数を定義できます。
組み込み変数については以下のドキュメント参照
http://docs.digdag.io/workflow_definition.html

Calculating variables

$ {...}構文で基本的なJavaScriptスクリプトを使用して変数を計算できます。
一般的な使用例は、タイムスタンプを別の形式にフォーマットすることです。 Digdagには、時間計算のためのMoment.jsがバンドルされています。

timezone: Asia/Tokyo

+format_session_time:
  echo>: ${moment(session_time).format("YYYY-MM-DD HH:mm:ss Z")}

+format_in_utc:
  echo>: ${moment(session_time).utc().format("YYYY-MM-DD HH:mm:ss")}

+format_tomorrow:
  echo>: ${moment(session_time).add(1, 'days').format("LLL")}

+get_execution_time:
  echo>: ${moment().format("YYYY-MM-DD HH:mm:ss Z")}
2020-07-10 22:05:34 +0900 [INFO] (0017@[0:default]+my_workflow+format_session_time): echo>: 2020-07-10 09:00:00 +09:00
2020-07-10 09:00:00 +09:00
2020-07-10 22:05:35 +0900 [INFO] (0017@[0:default]+my_workflow+format_in_utc): echo>: 2020-07-10 00:00:00
2020-07-10 00:00:00
2020-07-10 22:05:35 +0900 [INFO] (0017@[0:default]+my_workflow+format_tomorrow): echo>: July 11, 2020 9:00 AM
July 11, 2020 9:00 AM
2020-07-10 22:05:36 +0900 [INFO] (0017@[0:default]+my_workflow+get_execution_time): echo>: 2020-07-10 22:05:36 +09:00
2020-07-10 22:05:36 +09:00

Defining variables

変数定義方法

1. YAMLでの_exportパラメーターの使用
2. APIを使用してプログラムで変数を設定
3. 変数を使用したセッションの開始

Using _export: parameter

YAMLファイルでは、_export:指示者が変数を定義します。
これはデータベースのホスト名などの静的構成をロードするのに役立ちます。

タスクに_export指示者がある場合、スコープ内で変数を定義するためタスクとその子は変数を使用できます。

my_workflow.dig
_export:
  foo: 1

+prepare:
  sh>: echo foo:${foo} bar:${bar}

+analyze:
  _export:
    bar: 2

  +step1:
    sh>: echo foo:${foo} bar:${bar}

+dump:
  sh>: echo foo:${foo} bar:${bar}
結果
2020-07-10 22:18:11 +0900 [INFO] (0017@[0:default]+my_workflow+prepare): sh>: echo foo:1
foo:1
2020-07-10 22:18:11 +0900 [INFO] (0017@[0:default]+my_workflow+analyze+step1): sh>: echo foo:1 bar:2
foo:1 bar:2
2020-07-10 22:18:12 +0900 [INFO] (0017@[0:default]+my_workflow+dump): sh>: echo foo:1
foo:1

すべてのタスクでfoo=1を使用できますが、bar=2を使用できるのは+step1(および+analyze)だけです。

Using API

言語APIを使用してプログラムで変数を設定できます。 たとえば、Python APIはdigdag.env.exportとdigdag.env.storeを提供します。
今回の内容にはRubyAPIを中心に説明したいと思います。APIについては別章で扱うのでこちらでも詳細説明はしません。

import digdag

class MyWorkflow(object):
  def prepare(self):
    digdag.env.store({"my_param": 2})

  def analyze(self, my_var):
    print("my_var should be 2: %d" % my_var)

Starting a session with variables

新しいワークフローセッションを開始するときに変数を設定できます。変数を設定するには、-p KEY = VALUEを複数回使用します。
ドキュメントにはサンプルがないので。単純に外部からもらうパラメーターをコンソルに出力するワークフローを作成します。

my_workflow.dig
+print_my_var1:
  sh>: echo my_var1:${my_var1}

+print_my_var2:
  sh>: echo my_var2:${my_var2}

my_var1とmy_bar2は実行時パラメーターから取得できます。以下のように実行してみましょう〜

digdag run my_workflow.dig --rerun -p my_var1=foo -p my_var2=bar
結果
2020-07-11 10:29:33 +0900 [INFO] (0017@[0:default]+my_workflow+print_my_var1): sh>: echo my_var1:foo
my_var1:foo
2020-07-11 10:29:33 +0900 [INFO] (0017@[0:default]+my_workflow+print_my_var2): sh>: echo my_var2:bar
my_var2:bar

!include another file

YAMLファイルを小さなファイルに分割して、複雑なワークフローを整理できます。
!includeを使用して分割されたワークフローを自分のワークフローに含めることができます。

my_workflow.dig
+task1:
    !include : 'tasks/task1.dig'
+task2:
    !include : 'tasks/task2.dig'
task1.dig
+task1:
    sh>: echo タスク1です
task2.dig
+task2:
    sh>: echo タスク2です

my_workflow.digを実行するとtask1.dig task2.digのタスクが実行されるのがわかります。

run
$digdag run my_workflow.dig --rerun
2020-07-11 10:40:39 +0900 [INFO] (0017@[0:default]+my_workflow+task1+task1): sh>: echo タスク1です
タスク1です
2020-07-11 10:40:40 +0900 [INFO] (0017@[0:default]+my_workflow+task2+task2): sh>: echo タスク2です
タスク2です

Parallel execution

_parallel:true がグループに設定されている場合、グループ内のタスクは並行して実行されます。

my_workflow.dig
+prepare:
  # +data1, +data2, and +data3 run in parallel.
  _parallel: true

  +data1:
    sh>: echo data1
  +data2:
    sh>: echo data2
  +data3:
    sh>: echo data3
+analyze:
  sh>: echo analyze

data1, data2, data3が並列で出力されています。

実行結果
$ digdag run my_workflow.dig --rerun
2020-07-11 10:50:45 +0900 [INFO] (0018@[0:default]+my_workflow+prepare+data2): sh>: echo data2
2020-07-11 10:50:45 +0900 [INFO] (0017@[0:default]+my_workflow+prepare+data1): sh>: echo data1
2020-07-11 10:50:45 +0900 [INFO] (0019@[0:default]+my_workflow+prepare+data3): sh>: echo data3
data1
data2
data3
2020-07-11 10:50:45 +0900 [INFO] (0019@[0:default]+my_workflow+analyze): sh>: echo analyze
analyze

_background:true がタスクまたはグループに設定されている場合、タスクまたはグループは前のタスクと並行して実行されます。次のタスクは、バックグラウンドタスクまたはグループの完了を待ちます。

my_workflow.dig
+prepare:
  +data1:
    sh>: echo data1

  # +data1 and +data2 run in parallel.
  +data2:
    _background: true
    sh>: echo data2

  # +data3 runs after +data1 and +data2.
  +data3:
    sh>: echo data3

+analyze:
  sh>: echo analyze

data1,data2が並列で実行されます。

結果
$ digdag run my_workflow.dig --rerun

2020-07-11 11:00:06 +0900 [INFO] (0018@[0:default]+my_workflow+prepare+data2): sh>: echo data2
2020-07-11 11:00:06 +0900 [INFO] (0017@[0:default]+my_workflow+prepare+data1): sh>: echo data1
data2
data1
2020-07-11 11:00:06 +0900 [INFO] (0018@[0:default]+my_workflow+prepare+data3): sh>: echo data3
data3
2020-07-11 11:00:06 +0900 [INFO] (0018@[0:default]+my_workflow+analyze): sh>: echo analyze

Retrying failed tasks automatically

_retry:N(Nは整数:1、2、3、…)パラメーターがグループに設定されている場合、1つ以上の子タスクが失敗したときに、グループを最初から再試行します。

my_workflow.dig
+prepare:
  # If +erase_table, +load_data, or +check_loaded_data fail, it retries from
  # +erase_table again.
  _retry: 3

  +erase_table:
    sh>: echo erase_table

  +load_data:
    sh>: echo load_data
  +check_loaded_data:
    sh>: tasks/error.sh

+analyze:
  sh>: echo analyze
error.sh
#!/bin/bash
exit 0

最初の実行とRetry3件の4件のエラーが発生しました。Retry途中で正常に実行されたらエラー数は減ると思います。

結果
$ digdag run my_workflow.dig --rerun
2020-07-11 11:21:21 +0900 [INFO] (0017@[0:default]+my_workflow+prepare+erase_table): sh>: echo erase_table
erase_table
2020-07-11 11:21:21 +0900 [INFO] (0017@[0:default]+my_workflow+prepare+load_data): sh>: echo load_data
load_data
2020-07-11 11:21:22 +0900 [INFO] (0017@[0:default]+my_workflow+prepare+check_loaded_data): sh>: tasks/error.sh
2020-07-11 11:21:22 +0900 [ERROR] (0017@[0:default]+my_workflow+prepare+check_loaded_data): Task failed with unexpected error: Command failed with code 1
java.lang.RuntimeException: Command failed with code 1
        at io.digdag.standards.operator.ShOperatorFactory$ShOperator.runTask(ShOperatorFactory.java:143)
        at io.digdag.util.BaseOperator.run(BaseOperator.java:35)
        at io.digdag.core.agent.OperatorManager.callExecutor(OperatorManager.java:315)
        at io.digdag.cli.Run$OperatorManagerWithSkip.callExecutor(Run.java:705)
        at io.digdag.core.agent.OperatorManager.runWithWorkspace(OperatorManager.java:257)
        at io.digdag.core.agent.OperatorManager.lambda$runWithHeartbeat$2(OperatorManager.java:137)
        at io.digdag.core.agent.LocalWorkspaceManager.withExtractedArchive(LocalWorkspaceManager.java:25)
        at io.digdag.core.agent.OperatorManager.runWithHeartbeat(OperatorManager.java:135)
        at io.digdag.core.agent.OperatorManager.run(OperatorManager.java:119)
        at io.digdag.cli.Run$OperatorManagerWithSkip.run(Run.java:687)
        at io.digdag.core.agent.MultiThreadAgent.lambda$null$0(MultiThreadAgent.java:127)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
2020-07-11 11:21:22 +0900 [INFO] (0017@[0:default]+my_workflow+prepare+erase_table): sh>: echo erase_table
erase_table
2020-07-11 11:21:22 +0900 [INFO] (0017@[0:default]+my_workflow+prepare+load_data): sh>: echo load_data
load_data
2020-07-11 11:21:23 +0900 [INFO] (0017@[0:default]+my_workflow+prepare+check_loaded_data): sh>: tasks/error.sh
2020-07-11 11:21:23 +0900 [ERROR] (0017@[0:default]+my_workflow+prepare+check_loaded_data): Task failed with unexpected error: Command failed with code 1
java.lang.RuntimeException: Command failed with code 1
        at io.digdag.standards.operator.ShOperatorFactory$ShOperator.runTask(ShOperatorFactory.java:143)
        at io.digdag.util.BaseOperator.run(BaseOperator.java:35)
        at io.digdag.core.agent.OperatorManager.callExecutor(OperatorManager.java:315)
        at io.digdag.cli.Run$OperatorManagerWithSkip.callExecutor(Run.java:705)
        at io.digdag.core.agent.OperatorManager.runWithWorkspace(OperatorManager.java:257)
        at io.digdag.core.agent.OperatorManager.lambda$runWithHeartbeat$2(OperatorManager.java:137)
        at io.digdag.core.agent.LocalWorkspaceManager.withExtractedArchive(LocalWorkspaceManager.java:25)
        at io.digdag.core.agent.OperatorManager.runWithHeartbeat(OperatorManager.java:135)
        at io.digdag.core.agent.OperatorManager.run(OperatorManager.java:119)
        at io.digdag.cli.Run$OperatorManagerWithSkip.run(Run.java:687)
        at io.digdag.core.agent.MultiThreadAgent.lambda$null$0(MultiThreadAgent.java:127)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
2020-07-11 11:21:23 +0900 [INFO] (0017@[0:default]+my_workflow+prepare+erase_table): sh>: echo erase_table
erase_table
2020-07-11 11:21:23 +0900 [INFO] (0017@[0:default]+my_workflow+prepare+load_data): sh>: echo load_data
load_data
2020-07-11 11:21:24 +0900 [INFO] (0017@[0:default]+my_workflow+prepare+check_loaded_data): sh>: tasks/error.sh
2020-07-11 11:21:24 +0900 [ERROR] (0017@[0:default]+my_workflow+prepare+check_loaded_data): Task failed with unexpected error: Command failed with code 1
java.lang.RuntimeException: Command failed with code 1
        at io.digdag.standards.operator.ShOperatorFactory$ShOperator.runTask(ShOperatorFactory.java:143)
        at io.digdag.util.BaseOperator.run(BaseOperator.java:35)
        at io.digdag.core.agent.OperatorManager.callExecutor(OperatorManager.java:315)
        at io.digdag.cli.Run$OperatorManagerWithSkip.callExecutor(Run.java:705)
        at io.digdag.core.agent.OperatorManager.runWithWorkspace(OperatorManager.java:257)
        at io.digdag.core.agent.OperatorManager.lambda$runWithHeartbeat$2(OperatorManager.java:137)
        at io.digdag.core.agent.LocalWorkspaceManager.withExtractedArchive(LocalWorkspaceManager.java:25)
        at io.digdag.core.agent.OperatorManager.runWithHeartbeat(OperatorManager.java:135)
        at io.digdag.core.agent.OperatorManager.run(OperatorManager.java:119)
        at io.digdag.cli.Run$OperatorManagerWithSkip.run(Run.java:687)
        at io.digdag.core.agent.MultiThreadAgent.lambda$null$0(MultiThreadAgent.java:127)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
2020-07-11 11:21:24 +0900 [INFO] (0017@[0:default]+my_workflow+prepare+erase_table): sh>: echo erase_table
erase_table
2020-07-11 11:21:24 +0900 [INFO] (0017@[0:default]+my_workflow+prepare+load_data): sh>: echo load_data
load_data
2020-07-11 11:21:24 +0900 [INFO] (0017@[0:default]+my_workflow+prepare+check_loaded_data): sh>: tasks/error.sh
2020-07-11 11:21:24 +0900 [ERROR] (0017@[0:default]+my_workflow+prepare+check_loaded_data): Task failed with unexpected error: Command failed with code 1
java.lang.RuntimeException: Command failed with code 1
        at io.digdag.standards.operator.ShOperatorFactory$ShOperator.runTask(ShOperatorFactory.java:143)
        at io.digdag.util.BaseOperator.run(BaseOperator.java:35)
        at io.digdag.core.agent.OperatorManager.callExecutor(OperatorManager.java:315)
        at io.digdag.cli.Run$OperatorManagerWithSkip.callExecutor(Run.java:705)
        at io.digdag.core.agent.OperatorManager.runWithWorkspace(OperatorManager.java:257)
        at io.digdag.core.agent.OperatorManager.lambda$runWithHeartbeat$2(OperatorManager.java:137)
        at io.digdag.core.agent.LocalWorkspaceManager.withExtractedArchive(LocalWorkspaceManager.java:25)
        at io.digdag.core.agent.OperatorManager.runWithHeartbeat(OperatorManager.java:135)
        at io.digdag.core.agent.OperatorManager.run(OperatorManager.java:119)
        at io.digdag.cli.Run$OperatorManagerWithSkip.run(Run.java:687)
        at io.digdag.core.agent.MultiThreadAgent.lambda$null$0(MultiThreadAgent.java:127)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
2020-07-11 11:21:25 +0900 [INFO] (0017@[0:default]+my_workflow^failure-alert): type: notify
error: 
  * +my_workflow+prepare+check_loaded_data:
    Command failed with code 1 (runtime)
  * +my_workflow+prepare+check_loaded_data:
    Command failed with code 1 (runtime)
  * +my_workflow+prepare+check_loaded_data:
    Command failed with code 1 (runtime)
  * +my_workflow+prepare+check_loaded_data:
    Command failed with code 1 (runtime)

次のように、間隔を_retryに設定もできます

+prepare:
  _retry:
    limit: 3
    interval: 10
    interval_type: exponential

■limit: 再試行回数
■interval:再実行間隔時間(秒)
■interval_type: constantかexponential constant の場合は再実行間隔は一定になります。
exponentialの場合は再実行間隔は2 x(retry_count-1) として再試行ごとに増加します。上記の例では最初の再試行間隔は10秒、2番目は20秒、3番目は40秒です。

Sending error notification

# ワークフローが失敗する時に実行される
_error:
  sh>: tasks/runs_when_workflow_failed.sh

+analyze:
  sh>: tasks/analyze_prepared_data_sets.sh

エラーのタイミングでmail>operatorを利用してメール送信可能
詳しい説明はオペレーターの説明でやります〜
http://docs.digdag.io/operators/mail.html

zozotech
70億人のファッションを技術の力で変えていく
https://tech.zozo.com/
Why not register and get more from Qiita?
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away