Help us understand the problem. What is going on with this article?

Digdagのワークフロー結果をSlackに通知する

More than 3 years have passed since last update.

Digdagシリーズその3。

第1回は「Digdag Serverのインストール手順」
第2回は「DigdagのSecret機能を使う」

Digdagでワークフローを書いた際にやりたくなるのが、成功・失敗の通知です。そのやり方を試行錯誤した結果をまとめます。

Digdagでのワークフロー定義

まずはおさらい。
Digdagではタスクと呼ばれる処理の単位をつなげてワークフローを定義することができます。

例えば、シェル実行、pythonスクリプト実行、Rubyスクリプト実行のタスクを順番に実行するワークフロー定義は、こんな感じで書けます。

timezone: UTC

+step1:
  sh>: tasks/shell_sample.sh

+step2:
  py>: tasks.MyWorkflow.step2
  param1: this is param1

+step3:
  rb>: MyWorkflow.step3
  require: tasks/ruby_sample.rb

ワークフローの成功・失敗を捕捉する

ワークフローのタスクが全て成功した場合、もしくはいずれかのタスクが失敗した場合にだけ実行するタスクもワークフロー定義の中に書けます。

# this task runs when a workflow succeeds.
_check:
  sh>: tasks/runs_when_workflow_succeeded.sh

# this task runs when a workflow fails.
_error:
  sh>: tasks/runs_when_workflow_failed.sh

_check:オプションの中に書いたタスクは、ワークフローが成功した場合にだけ生成されて実行され、_error:オプションの中に書いたタスクはワークフローが失敗した場合にだけ生成されて実行されます。

失敗・成功の通知

ということで、_check:オプションもしくは_error:オプションを使って結果をSlackに通知させたいところですが、Digdag側ではメール送信を行うコマンドmail>:くらいしか用意されていないようです。

HTTPリクエストを送るhttp>:オペレータや、なんでもありのsh>:とかを使えばできなくはないですが、できればslack>:オペレータがあればいいのになぁ〜、と。

そこで、ネットの海を調べたところ、Slackへのメッセージ送信を行うコマンドslack>:を実装したプラグインが公開されているのを発見!圧倒的感謝!!

しかし、試したところ新しいDigdagのバージョン(0.8.21)では動かなかったり、いくつか追加したい点があったのでカスタマイズしました。

https://github.com/bwtakacy/digdag-slack

使い方は簡単で、

_export:
  plugin:
    repositories:
      - file://${repository_path}
    dependencies:
      - jp.techium.blog:digdag-slack:0.1.1

+step1:
  slack>: message.txt
  workflow_name: test_digdag
  webhook: https://xxxxxxx
  channel: general
  username: digdag
  icon_emoji: ghost

というように_exportでプラグインのレポジトリを指定しておけば、slack>*オペレータをタスク内で使えるようになります。
webhookといったオプションは元のプラグインと同じです。

変えたところとして、メッセージテンプレートに埋め込む変数も定義できるようにしました。上の例ではworkflow_nameとしてワークフロー名をメッセージに入れるようにしています。
あとは、slack>:オペレータに指定したファイルにて、

message.txt
ワークフロー名:${workflow_name}からのメッセージ

というようにすればOK。

実行結果の例

スクリーンショット 2016-12-14 22.55.40.png

エラー内容も取得する

成功の時は上でいいのですが、失敗した時はエラー内容も合わせて通知したいところです。
色々調べたところ、Digdagさんは_errorオプションで定義したタスクが呼び出されす際に、パラメータとしてerror(message, stacktrace)を定義してくれているようです。

公式ドキュメントを探したのですが記載されている箇所が見つからなかったので、該当するソースコードをポイントしておきます。

digdag-core/src/main/java/io/digdag/core/workflow/WorkflowExecutor.java
 635     private void collectErrorParams(Config params, StoredTask task)
 636     {
 637         List<Long> childrenFromThis;
 638         {
 639             TaskTree tree = new TaskTree(sm.getTaskRelations(task.getAttemptId()));
 640             childrenFromThis = tree.getRecursiveChildrenIdList(task.getId());
 641         }
 642 
 643         // merge store params to export params
 644         List<ParameterUpdate> childrenStoreParams = sm.getStoreParams(childrenFromThis);
 645         for (ParameterUpdate childStoreParams : childrenStoreParams) {
 646             childStoreParams.applyTo(params);
 647         }
 648 
 649         // merge all error params
 650         Config error = cf.create();
 651         {
 652             List<Config> childrenErrors = sm.getErrors(childrenFromThis);
 653             for (Config childError : childrenErrors) {
 654                 error.merge(childError);
 655             }
 656         }
 657         params.set("error", error);
 658     }
digdag-spi/src/main/java/io/digdag/spi/TaskExecutionException.java
 12 public class TaskExecutionException
 13         extends RuntimeException
 14 {
 15     public static ConfigElement buildExceptionErrorConfig(Throwable ex)
 16     {
 17         Map<String, String> map = ImmutableMap.of(
 18                 "message", ex.toString(),
 19                 "stacktrace",
 20                 Arrays.asList(ex.getStackTrace())
 21                 .stream()
 22                 .map(it -> it.toString())
 23                 .collect(Collectors.joining(", ")));
 24         return ConfigElement.ofMap(map);
 25     }

なので、

  _error:
    sh>: echo ${error.stacktrace}

こんな感じでエラー情報にアクセスできます。

これを踏まえると、ワークフローにて

  _error:
    slack>: error_message.txt
    workflow_name: test_digdag
    error_message: ${error.message}
    error_stacktrace: ${error.stacktrace}
    webhook: https://xxxxxxx
    channel: my_channel
    username: digdag
    icon_emoji: ghost 

として、メッセージテンプレートを

error_message.txt
Digdagのワークフロー ${workflow_name} が失敗したよ
Error: `${error_message}`
StackTrace: ```${error_stacktrace}```

としておけばOKです。

実行結果の例

スクリーンショット 2016-12-14 22.39.59.png

以上。

bwtakacy
お仕事:データリサーチエンジニア 技術:C/Java/Python, PostgreSQL/Hadoop/Spark/TreasureData, 機械学習, DeepLearning
Why not register and get more from Qiita?
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away