目標
Digdagの公式サイトのドキュメントのWorkflow definitionの翻訳+α
DigdagのRubyを使ってRailsにバッチを作るまでが最後の目標
http://docs.digdag.io/workflow_definition.html
##目次
Getting started
Architecture
Concepts
Workflow definition
Scheduling workflow
Operators
Command reference
Language API -Ruby
Digdagで環境毎に設定値を変える(RubyOnRails)
Digdagを用いてRubyOnRails環境でバッチ実装
Workflow definition
*Workflow definition: .dig files
digdagワークフローは.dig拡張子
が付いた名前のファイルで定義されます。
ファイルの名前はワークフローの名前です。
たとえば、hello_world
ワークフローを作成すると、hello_world.dig
ファイルが作成されます。 ファイルの内容は次のようになります。
timezone: Asia/Tokyo
+step1:
sh>: tasks/shell_sample.sh
+step2:
py>: tasks.MyWorkflow.step2
param1: this is param1
+step3:
rb>: MyWorkflow.step3
require: tasks/ruby_sample.rb
timezone
パラメータは、ワークフローのタイムゾーンを構成するために使用され、セッションのタイムスタンプ変数とスケジューリングに影響を与えます。 デフォルトのタイムゾーンはUTC
です。 他の有効なタイムゾーンの例には、America/Los_Angeles、Europe/Berlin、Asia/Tokyoなどがあります。
+記号はタスクを意味
+記号で始まるキー名はタスクです。 タスクは上から順に実行されます。 タスクは、別のタスクの子としてネストできます。 上記の例では、+step2
は+main
タスクの子として+step1
の後に実行されます。
operators>
type>:commandまたは**_type:NAME**パラメーターを持つタスクがアクションを実行します。 シェルスクリプトの実行、Pythonメソッド、電子メールの送信など、さまざまな種類のオペレーターを選択できます。組み込みオペレーターのリストについては、オペレーターのページを参照してください。
foo>:barは以下の二つのパラメーターと同じです。
_type: foo
_command: bar
Using ${variables}
ワークフローは$ {...}構文を使用して変数を埋め込むことができます。
組み込み変数を使用するか、独自の変数を定義できます。
組み込み変数については以下のドキュメント参照
http://docs.digdag.io/workflow_definition.html
Calculating variables
$ {...}構文で基本的なJavaScriptスクリプトを使用して変数を計算できます。
一般的な使用例は、タイムスタンプを別の形式にフォーマットすることです。 Digdagには、時間計算のためのMoment.jsがバンドルされています。
timezone: Asia/Tokyo
+format_session_time:
echo>: ${moment(session_time).format("YYYY-MM-DD HH:mm:ss Z")}
+format_in_utc:
echo>: ${moment(session_time).utc().format("YYYY-MM-DD HH:mm:ss")}
+format_tomorrow:
echo>: ${moment(session_time).add(1, 'days').format("LLL")}
+get_execution_time:
echo>: ${moment().format("YYYY-MM-DD HH:mm:ss Z")}
2020-07-10 22:05:34 +0900 [INFO] (0017@[0:default]+my_workflow+format_session_time): echo>: 2020-07-10 09:00:00 +09:00
2020-07-10 09:00:00 +09:00
2020-07-10 22:05:35 +0900 [INFO] (0017@[0:default]+my_workflow+format_in_utc): echo>: 2020-07-10 00:00:00
2020-07-10 00:00:00
2020-07-10 22:05:35 +0900 [INFO] (0017@[0:default]+my_workflow+format_tomorrow): echo>: July 11, 2020 9:00 AM
July 11, 2020 9:00 AM
2020-07-10 22:05:36 +0900 [INFO] (0017@[0:default]+my_workflow+get_execution_time): echo>: 2020-07-10 22:05:36 +09:00
2020-07-10 22:05:36 +09:00
Defining variables
変数定義方法
1. YAMLでの_exportパラメーターの使用
2. APIを使用してプログラムで変数を設定
3. 変数を使用したセッションの開始
Using _export: parameter
YAMLファイルでは、_export:
指示者が変数を定義します。
これはデータベースのホスト名などの静的構成をロードするのに役立ちます。
タスクに_export指示者がある場合、スコープ内で変数を定義するためタスクとその子は変数を使用できます。
_export:
foo: 1
+prepare:
sh>: echo foo:${foo} bar:${bar}
+analyze:
_export:
bar: 2
+step1:
sh>: echo foo:${foo} bar:${bar}
+dump:
sh>: echo foo:${foo} bar:${bar}
2020-07-10 22:18:11 +0900 [INFO] (0017@[0:default]+my_workflow+prepare): sh>: echo foo:1
foo:1
2020-07-10 22:18:11 +0900 [INFO] (0017@[0:default]+my_workflow+analyze+step1): sh>: echo foo:1 bar:2
foo:1 bar:2
2020-07-10 22:18:12 +0900 [INFO] (0017@[0:default]+my_workflow+dump): sh>: echo foo:1
foo:1
すべてのタスクでfoo=1を使用できますが、bar=2を使用できるのは+step1(および+analyze)だけです。
Using API
言語APIを使用してプログラムで変数を設定できます。 たとえば、Python APIはdigdag.env.exportとdigdag.env.storeを提供します。
今回の内容にはRubyAPIを中心に説明したいと思います。APIについては別章で扱うのでこちらでも詳細説明はしません。
import digdag
class MyWorkflow(object):
def prepare(self):
digdag.env.store({"my_param": 2})
def analyze(self, my_var):
print("my_var should be 2: %d" % my_var)
Starting a session with variables
新しいワークフローセッションを開始するときに変数を設定できます。変数を設定するには、-p KEY = VALUE
を複数回使用します。
ドキュメントにはサンプルがないので。単純に外部からもらうパラメーターをコンソルに出力するワークフローを作成します。
+print_my_var1:
sh>: echo my_var1:${my_var1}
+print_my_var2:
sh>: echo my_var2:${my_var2}
my_var1とmy_bar2は実行時パラメーターから取得できます。以下のように実行してみましょう〜
digdag run my_workflow.dig --rerun -p my_var1=foo -p my_var2=bar
2020-07-11 10:29:33 +0900 [INFO] (0017@[0:default]+my_workflow+print_my_var1): sh>: echo my_var1:foo
my_var1:foo
2020-07-11 10:29:33 +0900 [INFO] (0017@[0:default]+my_workflow+print_my_var2): sh>: echo my_var2:bar
my_var2:bar
!include another file
YAMLファイルを小さなファイルに分割して、複雑なワークフローを整理できます。
!include
を使用して分割されたワークフローを自分のワークフローに含めることができます。
+task1:
!include : 'tasks/task1.dig'
+task2:
!include : 'tasks/task2.dig'
+task1:
sh>: echo タスク1です
+task2:
sh>: echo タスク2です
my_workflow.dig
を実行するとtask1.dig task2.dig
のタスクが実行されるのがわかります。
$digdag run my_workflow.dig --rerun
2020-07-11 10:40:39 +0900 [INFO] (0017@[0:default]+my_workflow+task1+task1): sh>: echo タスク1です
タスク1です
2020-07-11 10:40:40 +0900 [INFO] (0017@[0:default]+my_workflow+task2+task2): sh>: echo タスク2です
タスク2です
Parallel execution
_parallel:true
がグループに設定されている場合、グループ内のタスクは並行して実行されます。
+prepare:
# +data1, +data2, and +data3 run in parallel.
_parallel: true
+data1:
sh>: echo data1
+data2:
sh>: echo data2
+data3:
sh>: echo data3
+analyze:
sh>: echo analyze
data1, data2, data3が並列で出力されています。
$ digdag run my_workflow.dig --rerun
2020-07-11 10:50:45 +0900 [INFO] (0018@[0:default]+my_workflow+prepare+data2): sh>: echo data2
2020-07-11 10:50:45 +0900 [INFO] (0017@[0:default]+my_workflow+prepare+data1): sh>: echo data1
2020-07-11 10:50:45 +0900 [INFO] (0019@[0:default]+my_workflow+prepare+data3): sh>: echo data3
data1
data2
data3
2020-07-11 10:50:45 +0900 [INFO] (0019@[0:default]+my_workflow+analyze): sh>: echo analyze
analyze
_background:true
がタスクまたはグループに設定されている場合、タスクまたはグループは前のタスクと並行して実行されます。次のタスクは、バックグラウンドタスクまたはグループの完了を待ちます。
+prepare:
+data1:
sh>: echo data1
# +data1 and +data2 run in parallel.
+data2:
_background: true
sh>: echo data2
# +data3 runs after +data1 and +data2.
+data3:
sh>: echo data3
+analyze:
sh>: echo analyze
data1,data2が並列で実行されます。
$ digdag run my_workflow.dig --rerun
2020-07-11 11:00:06 +0900 [INFO] (0018@[0:default]+my_workflow+prepare+data2): sh>: echo data2
2020-07-11 11:00:06 +0900 [INFO] (0017@[0:default]+my_workflow+prepare+data1): sh>: echo data1
data2
data1
2020-07-11 11:00:06 +0900 [INFO] (0018@[0:default]+my_workflow+prepare+data3): sh>: echo data3
data3
2020-07-11 11:00:06 +0900 [INFO] (0018@[0:default]+my_workflow+analyze): sh>: echo analyze
Retrying failed tasks automatically
_retry:N(Nは整数:1、2、3、…)パラメーターがグループに設定されている場合、1つ以上の子タスクが失敗したときに、グループを最初から再試行します。
+prepare:
# If +erase_table, +load_data, or +check_loaded_data fail, it retries from
# +erase_table again.
_retry: 3
+erase_table:
sh>: echo erase_table
+load_data:
sh>: echo load_data
+check_loaded_data:
sh>: tasks/error.sh
+analyze:
sh>: echo analyze
#!/bin/bash
exit 0
最初の実行とRetry3件の4件のエラーが発生しました。Retry途中で正常に実行されたらエラー数は減ると思います。
$ digdag run my_workflow.dig --rerun
2020-07-11 11:21:21 +0900 [INFO] (0017@[0:default]+my_workflow+prepare+erase_table): sh>: echo erase_table
erase_table
2020-07-11 11:21:21 +0900 [INFO] (0017@[0:default]+my_workflow+prepare+load_data): sh>: echo load_data
load_data
2020-07-11 11:21:22 +0900 [INFO] (0017@[0:default]+my_workflow+prepare+check_loaded_data): sh>: tasks/error.sh
2020-07-11 11:21:22 +0900 [ERROR] (0017@[0:default]+my_workflow+prepare+check_loaded_data): Task failed with unexpected error: Command failed with code 1
java.lang.RuntimeException: Command failed with code 1
at io.digdag.standards.operator.ShOperatorFactory$ShOperator.runTask(ShOperatorFactory.java:143)
at io.digdag.util.BaseOperator.run(BaseOperator.java:35)
at io.digdag.core.agent.OperatorManager.callExecutor(OperatorManager.java:315)
at io.digdag.cli.Run$OperatorManagerWithSkip.callExecutor(Run.java:705)
at io.digdag.core.agent.OperatorManager.runWithWorkspace(OperatorManager.java:257)
at io.digdag.core.agent.OperatorManager.lambda$runWithHeartbeat$2(OperatorManager.java:137)
at io.digdag.core.agent.LocalWorkspaceManager.withExtractedArchive(LocalWorkspaceManager.java:25)
at io.digdag.core.agent.OperatorManager.runWithHeartbeat(OperatorManager.java:135)
at io.digdag.core.agent.OperatorManager.run(OperatorManager.java:119)
at io.digdag.cli.Run$OperatorManagerWithSkip.run(Run.java:687)
at io.digdag.core.agent.MultiThreadAgent.lambda$null$0(MultiThreadAgent.java:127)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
2020-07-11 11:21:22 +0900 [INFO] (0017@[0:default]+my_workflow+prepare+erase_table): sh>: echo erase_table
erase_table
2020-07-11 11:21:22 +0900 [INFO] (0017@[0:default]+my_workflow+prepare+load_data): sh>: echo load_data
load_data
2020-07-11 11:21:23 +0900 [INFO] (0017@[0:default]+my_workflow+prepare+check_loaded_data): sh>: tasks/error.sh
2020-07-11 11:21:23 +0900 [ERROR] (0017@[0:default]+my_workflow+prepare+check_loaded_data): Task failed with unexpected error: Command failed with code 1
java.lang.RuntimeException: Command failed with code 1
at io.digdag.standards.operator.ShOperatorFactory$ShOperator.runTask(ShOperatorFactory.java:143)
at io.digdag.util.BaseOperator.run(BaseOperator.java:35)
at io.digdag.core.agent.OperatorManager.callExecutor(OperatorManager.java:315)
at io.digdag.cli.Run$OperatorManagerWithSkip.callExecutor(Run.java:705)
at io.digdag.core.agent.OperatorManager.runWithWorkspace(OperatorManager.java:257)
at io.digdag.core.agent.OperatorManager.lambda$runWithHeartbeat$2(OperatorManager.java:137)
at io.digdag.core.agent.LocalWorkspaceManager.withExtractedArchive(LocalWorkspaceManager.java:25)
at io.digdag.core.agent.OperatorManager.runWithHeartbeat(OperatorManager.java:135)
at io.digdag.core.agent.OperatorManager.run(OperatorManager.java:119)
at io.digdag.cli.Run$OperatorManagerWithSkip.run(Run.java:687)
at io.digdag.core.agent.MultiThreadAgent.lambda$null$0(MultiThreadAgent.java:127)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
2020-07-11 11:21:23 +0900 [INFO] (0017@[0:default]+my_workflow+prepare+erase_table): sh>: echo erase_table
erase_table
2020-07-11 11:21:23 +0900 [INFO] (0017@[0:default]+my_workflow+prepare+load_data): sh>: echo load_data
load_data
2020-07-11 11:21:24 +0900 [INFO] (0017@[0:default]+my_workflow+prepare+check_loaded_data): sh>: tasks/error.sh
2020-07-11 11:21:24 +0900 [ERROR] (0017@[0:default]+my_workflow+prepare+check_loaded_data): Task failed with unexpected error: Command failed with code 1
java.lang.RuntimeException: Command failed with code 1
at io.digdag.standards.operator.ShOperatorFactory$ShOperator.runTask(ShOperatorFactory.java:143)
at io.digdag.util.BaseOperator.run(BaseOperator.java:35)
at io.digdag.core.agent.OperatorManager.callExecutor(OperatorManager.java:315)
at io.digdag.cli.Run$OperatorManagerWithSkip.callExecutor(Run.java:705)
at io.digdag.core.agent.OperatorManager.runWithWorkspace(OperatorManager.java:257)
at io.digdag.core.agent.OperatorManager.lambda$runWithHeartbeat$2(OperatorManager.java:137)
at io.digdag.core.agent.LocalWorkspaceManager.withExtractedArchive(LocalWorkspaceManager.java:25)
at io.digdag.core.agent.OperatorManager.runWithHeartbeat(OperatorManager.java:135)
at io.digdag.core.agent.OperatorManager.run(OperatorManager.java:119)
at io.digdag.cli.Run$OperatorManagerWithSkip.run(Run.java:687)
at io.digdag.core.agent.MultiThreadAgent.lambda$null$0(MultiThreadAgent.java:127)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
2020-07-11 11:21:24 +0900 [INFO] (0017@[0:default]+my_workflow+prepare+erase_table): sh>: echo erase_table
erase_table
2020-07-11 11:21:24 +0900 [INFO] (0017@[0:default]+my_workflow+prepare+load_data): sh>: echo load_data
load_data
2020-07-11 11:21:24 +0900 [INFO] (0017@[0:default]+my_workflow+prepare+check_loaded_data): sh>: tasks/error.sh
2020-07-11 11:21:24 +0900 [ERROR] (0017@[0:default]+my_workflow+prepare+check_loaded_data): Task failed with unexpected error: Command failed with code 1
java.lang.RuntimeException: Command failed with code 1
at io.digdag.standards.operator.ShOperatorFactory$ShOperator.runTask(ShOperatorFactory.java:143)
at io.digdag.util.BaseOperator.run(BaseOperator.java:35)
at io.digdag.core.agent.OperatorManager.callExecutor(OperatorManager.java:315)
at io.digdag.cli.Run$OperatorManagerWithSkip.callExecutor(Run.java:705)
at io.digdag.core.agent.OperatorManager.runWithWorkspace(OperatorManager.java:257)
at io.digdag.core.agent.OperatorManager.lambda$runWithHeartbeat$2(OperatorManager.java:137)
at io.digdag.core.agent.LocalWorkspaceManager.withExtractedArchive(LocalWorkspaceManager.java:25)
at io.digdag.core.agent.OperatorManager.runWithHeartbeat(OperatorManager.java:135)
at io.digdag.core.agent.OperatorManager.run(OperatorManager.java:119)
at io.digdag.cli.Run$OperatorManagerWithSkip.run(Run.java:687)
at io.digdag.core.agent.MultiThreadAgent.lambda$null$0(MultiThreadAgent.java:127)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
2020-07-11 11:21:25 +0900 [INFO] (0017@[0:default]+my_workflow^failure-alert): type: notify
error:
* +my_workflow+prepare+check_loaded_data:
Command failed with code 1 (runtime)
* +my_workflow+prepare+check_loaded_data:
Command failed with code 1 (runtime)
* +my_workflow+prepare+check_loaded_data:
Command failed with code 1 (runtime)
* +my_workflow+prepare+check_loaded_data:
Command failed with code 1 (runtime)
次のように、間隔を_retryに設定もできます
+prepare:
_retry:
limit: 3
interval: 10
interval_type: exponential
■limit: 再試行回数
■interval:再実行間隔時間(秒)
■interval_type: constantかexponential
constant
の場合は再実行間隔は一定になります。
exponential
の場合は再実行間隔は2 x(retry_count-1)
として再試行ごとに増加します。上記の例では最初の再試行間隔は10秒、2番目は20秒、3番目は40秒です。
Sending error notification
# ワークフローが失敗する時に実行される
_error:
sh>: tasks/runs_when_workflow_failed.sh
+analyze:
sh>: tasks/analyze_prepared_data_sets.sh
エラーのタイミングでmail>operator
を利用してメール送信可能
詳しい説明はオペレーターの説明でやります〜
http://docs.digdag.io/operators/mail.html