公式ページ
Digdagとは
- Treasure Dataで作られたOSSの分散Workflowエンジン
- yaml形式で記述する。(拡張子は.ymlではなく.dig)
- 起動にはjava8以上が必要(※以下、Getting Startedより)
please download and install the latest Java SE Development Kit 8 (must be newer than 8u72).
インストール方法
とても簡単だった。手順は3つ
- curlコマンドでダウンロード
curl -o ~/.digdag/bin/digdag --create-dirs -L "https://dl.digdag.io/digdag-latest"
- パーミッションの変更
chmod +x ~/.digdag/bin/digdag
- 環境変数にPATH追加して再読込
echo 'export PATH="$HOME/.digdag/bin:$PATH"' >> ~/.bashrc
source ~/.bash_profile
※ .bash_profile再読込時に.bashrcを読み込む設定をしてない場合は、.bash_profileに以下を追加してからsourceコマンド再読込を行う
if [ -f ~/.bashrc ]; then
. ~/.bashrc
fi
動かしてみる
- Local-mode
まず、以下でプロジェクトの作成を行う。
digdag init digdag-sample
作成したプロジェクトへ移動し、sample(.dig拡張子のファイル)の中身を見てみる
timezone: UTC
+setup:
echo>: start ${session_time}
+disp_current_date:
echo>: ${moment(session_time).utc().format('YYYY-MM-DD HH:mm:ss Z')}
+repeat:
for_each>:
order: [first, second, third]
animal: [dog, cat]
_do:
echo>: ${order} ${animal}
_parallel: true
+teardown:
echo>: finish ${session_time}
という感じ
これをdigdag runコマンドで実行してみると
$ digdag run digdag-sample.dig
2018-08-17 10:59:06 +0900 [INFO] (main): Starting a new session project id=1 workflow name=digdag-sample session_time=2018-08-15T00:00:00+00:00
2018-08-17 10:59:06 +0900 [INFO] (0026@[0:default]+digdag-sample+setup): echo>: start 2018-08-15T00:00:00+00:00
start 2018-08-15T00:00:00+00:00
2018-08-17 10:59:07 +0900 [INFO] (0026@[0:default]+digdag-sample+disp_current_date): echo>: 2018-08-15 00:00:00 +00:00
2018-08-15 00:00:00 +00:00
2018-08-17 10:59:07 +0900 [INFO] (0026@[0:default]+digdag-sample+repeat): for_each>: {order=[first, second, third], animal=[dog, cat]}
2018-08-17 10:59:07 +0900 [INFO] (0026@[0:default]+digdag-sample+repeat^sub+for-0=order=0=first&1=animal=0=dog): echo>: first dog
first dog
2018-08-17 10:59:07 +0900 [INFO] (0029@[0:default]+digdag-sample+repeat^sub+for-0=order=1=second&1=animal=1=cat): echo>: second cat
2018-08-17 10:59:07 +0900 [INFO] (0030@[0:default]+digdag-sample+repeat^sub+for-0=order=2=third&1=animal=0=dog): echo>: third dog
third dog
second cat
2018-08-17 10:59:07 +0900 [INFO] (0027@[0:default]+digdag-sample+repeat^sub+for-0=order=0=first&1=animal=1=cat): echo>: first cat
2018-08-17 10:59:07 +0900 [INFO] (0028@[0:default]+digdag-sample+repeat^sub+for-0=order=1=second&1=animal=0=dog): echo>: second dog
first cat
second dog
2018-08-17 10:59:07 +0900 [INFO] (0031@[0:default]+digdag-sample+repeat^sub+for-0=order=2=third&1=animal=1=cat): echo>: third cat
third cat
2018-08-17 10:59:07 +0900 [INFO] (0031@[0:default]+digdag-sample+teardown): echo>: finish 2018-08-15T00:00:00+00:00
finish 2018-08-15T00:00:00+00:00
という感じにLinuxコマンドが実行された。
ちなみに、もう一度実行してみると
2018-08-17 11:13:21 +0900 [INFO] (main): Starting a new session project id=1 workflow name=digdag-sample session_time=2018-08-15T00:00:00+00:00
2018-08-17 11:13:21 +0900 [WARN] (0026@[0:default]+digdag-sample+setup): Skipped
2018-08-17 11:13:21 +0900 [WARN] (0026@[0:default]+digdag-sample+disp_current_date): Skipped
2018-08-17 11:13:21 +0900 [WARN] (0026@[0:default]+digdag-sample+repeat): Skipped
2018-08-17 11:13:21 +0900 [WARN] (0026@[0:default]+digdag-sample+repeat^sub+for-0=order=0=first&1=animal=0=dog): Skipped
2018-08-17 11:13:21 +0900 [WARN] (0026@[0:default]+digdag-sample+repeat^sub+for-0=order=0=first&1=animal=1=cat): Skipped
2018-08-17 11:13:21 +0900 [WARN] (0027@[0:default]+digdag-sample+repeat^sub+for-0=order=1=second&1=animal=0=dog): Skipped
2018-08-17 11:13:21 +0900 [WARN] (0028@[0:default]+digdag-sample+repeat^sub+for-0=order=1=second&1=animal=1=cat): Skipped
2018-08-17 11:13:21 +0900 [WARN] (0026@[0:default]+digdag-sample+repeat^sub+for-0=order=2=third&1=animal=1=cat): Skipped
2018-08-17 11:13:21 +0900 [WARN] (0028@[0:default]+digdag-sample+repeat^sub+for-0=order=2=third&1=animal=0=dog): Skipped
2018-08-17 11:13:21 +0900 [WARN] (0028@[0:default]+digdag-sample+teardown): Skipped
となり。タスクがすべてスキップされてしまう。
これは.digdag配下にセッション情報が保存されていて、その中に「setup〜teardownまでのタスクが成功した」という状態が保存されているため、2度目以降の実行がスキップされてしまうらしい。
中身は以下みたいな感じに出力されている
$ ls -l .digdag/status/20180815T000000+0000
+digdag-sample+disp_current_date.yml
+digdag-sample+repeat.yml
+digdag-sample+repeat^sub+for-0=order=0=first&1=animal=0=dog.yml
+digdag-sample+repeat^sub+for-0=order=0=first&1=animal=1=cat.yml
+digdag-sample+repeat^sub+for-0=order=1=second&1=animal=0=dog.yml
+digdag-sample+repeat^sub+for-0=order=1=second&1=animal=1=cat.yml
+digdag-sample+repeat^sub+for-0=order=2=third&1=animal=0=dog.yml
+digdag-sample+repeat^sub+for-0=order=2=third&1=animal=1=cat.yml
+digdag-sample+repeat^sub.yml
+digdag-sample+setup.yml
+digdag-sample+teardown.yml
+digdag-sample.yml
セッションの保存期間--sessionオプションで設定することができる。引数を
- hourly
- daily
- last
- "yyyy-MM-dd HHSS"
などにすれば、定められた期間のみ、情報を保存する。
またsessionに関係なくすべて実行する場合は -aオプションを付ける
エラー時に通知を行う
slackへの通知
slackへの通知はプラグインを使って行う方法もあるが、今回は公式で用意しているicoming-webhooksを使って、httpリクエストで通知を送る。
導入は上記サイトかこちらなどがわかりやすいので、割愛
上記手順に従うと、エンドポイント(URL)が取得できるのでそれを保存しておく。
そして、digdagの中身を以下のように書いて、実行すると通知が飛ばせる。
timezone: UTC
# エンドポイント読み込み
_export:
!include : cmn/env.dig
# 意図的にエラーを発生
+fail_test:
fail>: "error test"
# エラー通知の中身
_error:
+notify_to_slack:
http>: ${SLACK_INCOMING_WEBHOOK_URL}
method: POST
content:
attachments:
- title: "workflow faild"
text: ${error.message}
color: danger
content_format: json
SLACK_INCOMING_WEBHOOK_URL: エンドポイントのURL
ちなみに、以下の
_export:
!include : cmn/env.dig
で、URLをincludeしているが、例えば下記のようにslack_noti_test.digと同じ階層にenv.digを置き
_export:
!include : env.dig
として実行しようとすると
Workflow 'env' includes unknown keys: [SLACK_INCOMING_WEBHOOK_URL] (model validation)
みたいな感じのエラーが出るので、注意。
(実行ファイルにcmnのようにサブディレクトリをつくり、そこにいれる必要がある)
スケジューリング
とても簡単で、上のエラー通知の処理を1分ごとに実行するなら
timezone: UTC
schedule:
minutes_interval>: 1
_export:
!include : cmn/env.dig
+fail_test:
fail>: "error test"
_error:
+notify_to_slack:
http>: ${SLACK_INCOMING_WEBHOOK_URL}
method: POST
content:
attachments:
- title: "workflow faild"
text: ${error.message}
color: danger
content_format: json
のように、処理の上に下記の部分を追加するだけ
schedule:
minutes_interval>: 1
他にも、デイリー、ウィークリーなど様々な期間を簡単に設定できる
細かい設定などはここに記載されている
その他TDコマンド