Digdagシリーズその3。
第1回は「Digdag Serverのインストール手順」
第2回は「DigdagのSecret機能を使う」
Digdagでワークフローを書いた際にやりたくなるのが、成功・失敗の通知です。そのやり方を試行錯誤した結果をまとめます。
Digdagでのワークフロー定義
まずはおさらい。
Digdagではタスクと呼ばれる処理の単位をつなげてワークフローを定義することができます。
例えば、シェル実行、pythonスクリプト実行、Rubyスクリプト実行のタスクを順番に実行するワークフロー定義は、こんな感じで書けます。
timezone: UTC
+step1:
sh>: tasks/shell_sample.sh
+step2:
py>: tasks.MyWorkflow.step2
param1: this is param1
+step3:
rb>: MyWorkflow.step3
require: tasks/ruby_sample.rb
ワークフローの成功・失敗を捕捉する
ワークフローのタスクが全て成功した場合、もしくはいずれかのタスクが失敗した場合にだけ実行するタスクもワークフロー定義の中に書けます。
# this task runs when a workflow succeeds.
_check:
sh>: tasks/runs_when_workflow_succeeded.sh
# this task runs when a workflow fails.
_error:
sh>: tasks/runs_when_workflow_failed.sh
_check:
オプションの中に書いたタスクは、ワークフローが成功した場合にだけ生成されて実行され、_error:
オプションの中に書いたタスクはワークフローが失敗した場合にだけ生成されて実行されます。
失敗・成功の通知
ということで、_check:
オプションもしくは_error:
オプションを使って結果をSlackに通知させたいところですが、Digdag側ではメール送信を行うコマンドmail>:
くらいしか用意されていないようです。
HTTPリクエストを送るhttp>:
オペレータや、なんでもありのsh>:
とかを使えばできなくはないですが、できればslack>:
オペレータがあればいいのになぁ〜、と。
そこで、ネットの海を調べたところ、Slackへのメッセージ送信を行うコマンドslack>:
を実装したプラグインが公開されているのを発見!圧倒的感謝!!
- ワークフローエンジンDigDagのSlackプラグイン公開しました
http://blog.techium.jp/entry/2016/07/11/090000_1
しかし、試したところ新しいDigdagのバージョン(0.8.21)では動かなかったり、いくつか追加したい点があったのでカスタマイズしました。
使い方は簡単で、
_export:
plugin:
repositories:
- file://${repository_path}
dependencies:
- jp.techium.blog:digdag-slack:0.1.1
+step1:
slack>: message.txt
workflow_name: test_digdag
webhook: https://xxxxxxx
channel: general
username: digdag
icon_emoji: ghost
というように_export
でプラグインのレポジトリを指定しておけば、slack>*
オペレータをタスク内で使えるようになります。
webhookといったオプションは元のプラグインと同じです。
変えたところとして、メッセージテンプレートに埋め込む変数も定義できるようにしました。上の例ではworkflow_name
としてワークフロー名をメッセージに入れるようにしています。
あとは、slack>:
オペレータに指定したファイルにて、
ワークフロー名:${workflow_name}からのメッセージ
というようにすればOK。
実行結果の例
エラー内容も取得する
成功の時は上でいいのですが、失敗した時はエラー内容も合わせて通知したいところです。
色々調べたところ、Digdagさんは_error
オプションで定義したタスクが呼び出されす際に、パラメータとしてerror(message, stacktrace)
を定義してくれているようです。
公式ドキュメントを探したのですが記載されている箇所が見つからなかったので、該当するソースコードをポイントしておきます。
635 private void collectErrorParams(Config params, StoredTask task)
636 {
637 List<Long> childrenFromThis;
638 {
639 TaskTree tree = new TaskTree(sm.getTaskRelations(task.getAttemptId()));
640 childrenFromThis = tree.getRecursiveChildrenIdList(task.getId());
641 }
642
643 // merge store params to export params
644 List<ParameterUpdate> childrenStoreParams = sm.getStoreParams(childrenFromThis);
645 for (ParameterUpdate childStoreParams : childrenStoreParams) {
646 childStoreParams.applyTo(params);
647 }
648
649 // merge all error params
650 Config error = cf.create();
651 {
652 List<Config> childrenErrors = sm.getErrors(childrenFromThis);
653 for (Config childError : childrenErrors) {
654 error.merge(childError);
655 }
656 }
657 params.set("error", error);
658 }
12 public class TaskExecutionException
13 extends RuntimeException
14 {
15 public static ConfigElement buildExceptionErrorConfig(Throwable ex)
16 {
17 Map<String, String> map = ImmutableMap.of(
18 "message", ex.toString(),
19 "stacktrace",
20 Arrays.asList(ex.getStackTrace())
21 .stream()
22 .map(it -> it.toString())
23 .collect(Collectors.joining(", ")));
24 return ConfigElement.ofMap(map);
25 }
なので、
_error:
sh>: echo ${error.stacktrace}
こんな感じでエラー情報にアクセスできます。
これを踏まえると、ワークフローにて
_error:
slack>: error_message.txt
workflow_name: test_digdag
error_message: ${error.message}
error_stacktrace: ${error.stacktrace}
webhook: https://xxxxxxx
channel: my_channel
username: digdag
icon_emoji: ghost
として、メッセージテンプレートを
Digdagのワークフロー ${workflow_name} が失敗したよ
Error: `${error_message}`
StackTrace: ```${error_stacktrace}```
としておけばOKです。
実行結果の例
以上。