概要
現職で digdag を集計バッチをコントロールするジョブサーバとして利用していますが、いくつか機能的に足りないところがあったので Python で機能拡張してみました。それについて簡単に紹介したいと思います。
不足してる機能
ジョブ待ち合わせ
ワークフロー
require オペレータが存在し、これによって複数のワークフロー間の依存関係を記述することは既存機能で可能です。しかしながら、待ち合わせができるワークフローは同一プロジェクト内のワークフローに限られています。「依存関係があるならば同一プロジェクトで管理しろ」と言われそうですが、よんどころない理由で別プロジェクト管理にしたい場合には依存関係をもたせることができません。
タスク
_parallel: true を記述して並行実行しているところで一部だけタスクの待ち合わせをしたい場合、タスクの記述順序とかを駆使して書く必要があり、若干 dig ファイルの可読性が落ちるのではと思ってます。
MySQL の操作
digdag には Treasure Data や PostgreSQL に対する操作が簡単にできるオペレータが用意されていますが、MySQL に対するものはありません。唯一できるのは Treasure Data 用のオペレータで出力先として MySQL のエンドポイントURLを指定して結果をインポートすることくらいです。たとえば MySQL にクエリを投げて、結果を MySQL に書き出すといったことを実現するオペレータがありません。
_error ディレクトリでの失敗タスクの取得
digdag はタスクの実行が失敗した場合の操作として error ディレクトリが用意されています。これはちょうど Java などのプログラム言語で実装されている try~catch の catch ブロック内に記載する処理に相当します。例えばここでエラーがあったという通知を Chatwork や Slack にしようとしても、肝心のエラーが発生したタスク名を取得することができません。digdag の v0.9.9 移行では digdag.env.params 辞書にタスク名が格納されるようになりましたが、_error ディレクトリ内では既にタスク名が更新され _error+タスク名 となっているため利用価値がありません。
機能拡張について
上記機能を digdag に補うために Python3 で pydigdag というのを作りました。
インストール
Python3 が使用できる環境で以下を実行します。CentOS 7 など Python3 が入ってない環境の場合には、拙筆 マルチユーザ環境のための pyenv を参考にして構築するといいでしょう。
sudo -E pip install git+https://github.com/takkeybook/pydigdag.git
提供メソッド
MySQL
以下のメソッドを利用するに当たり、_export ディレクトリ内に mysql キーが存在し、その配下に user, password, host, database が設定されている必要があります。
partial_delete_hourly
指定テーブルの列 region_day、region_hour がそれぞれ指定年月日、時のレコードを削除します。テーブル名として OUTPUT、指定日時として START_TIME(書式は YYYY-mm-dd HH:00:00)のパラメタが _export ディレクトリ内で設定されている必要があります。
partial_delete_daily
指定テーブルの列 region_day が指定年月日のレコードを削除します。テーブル名として OUTPUT、指定年月日として START_DATE(書式は YYYY-mm-dd)のパラメタが _export ディレクトリ内で設定されている必要があります。
partial_delete_weekly
指定テーブルの列 region_start_week が指定年月日のレコードを削除します。テーブル名として OUTPUT、指定年月日として START_WEEK_DATE(書式は YYYY-mm-dd)のパラメタが _export ディレクトリ内で設定されている必要があります。
partial_delete_monthly
指定テーブルの列 region_month が指定年月のレコードを削除します。テーブル名として OUTPUT、指定年月日として START_MONTH(書式は YYYY-mm-01)のパラメタが _export ディレクトリ内で設定されている必要があります。
delete_all
指定テーブル全てを削除します。テーブル名として OUTPUT パラメタが _export ディレクトリ内で指定されている必要があります。
run_sql
指定ファイルに記載されたクエリを実行します。クエリ内に {変数} という記述がある場合、_export ディレクトリ内で設定された値で置き換えられます。_export ディレクトリ内で QUERY_FILE パラメタが設定されていること。またクエリファイル内で使用している {変数} の「変数」もパラメタとして設定されている必要があります。
wait
指定ファイルに記載されたクエリを実行してレコードの存在確認をし、存在するまで待ち受けます。最大待ち受け時間は MAX_WAIT_TIME の値で設定された秒数です。_export ディレクトリ内で QUERY_FILE、MAX_WAIT_TIME パラメタが設定されていること、またクエリファイル内で使用している {変数} の「変数」もパラメタとして設定されている必要があります。
補足
partial_delete_XXXX は TreasureData で集計した結果を MySQL に格納する際に既存レコードとの置き換えができないので、予め削除しておくために用意されたものです。該当レコードの指定が特定の列名になっているのは現職のテーブル構成がそうなっているからで、汎用性がないのは勘弁してください。このあたりは修正して列名も指定できるようにすれば汎用性が高まるかと思ってます。
Chatwork
以下のメソッドを利用するに当たり、_export ディレクトリ内に chatwork キーが存在し、apikey, endpoint, roomid が設定されている必要があります。
notify
_error ディレクトリ内でエラー発生の通知を指定された roomid の部屋に投稿します。なお _export ディレクトリ内の digdag キーの配下に endpoint(digdag webUI の URL)が設定されていると、通知メッセージにエラーが発生した attempt へのリンクが併記されます。
その他
get_failed_task
現在実行中のワークフロー内で実行に失敗したタスク名を返します。並列実行する複数のタスクが2個以上失敗した場合には attempt id の大きい方を返します。これは digdag 内部で利用するというより、Chatwork の notify メソッドで利用されている関数です。
wait_local_file
指定ファイルがワークフローが実行されるサーバのローカルファイルシステムに存在するまで待ち受けます。最大待ち受け時間は MAX_WAIT_TIME の値で設定された秒数です。_export ディレクトリ内で LOCAL_FILE、MAX_WAIT_TIME パラメタが設定されている必要があります。
wait_task
指定されたタスクの実行完了を待ち受けます。当該タスクが失敗した場合には、待ち合わせしているタスクを実行せずに終了させます。最大待ち受け時間は MAX_WAIT_TIME の値で設定された秒数です。_export ディレクトリ内で TASK_NAME、MAX_WAIT_TIME パラメタが設定されている必要があります。
wait_workflow
指定されたプロジェクト内のワークフローの実行完了を待ち受けます。当該ワークフローが失敗した場合には、待ち合わせしているタスクを実行せずに終了させます。最大待ち受け時間は MAX_WAIT_TIME の値で設定された秒数です。待ち受けるワークフローと待ち合わせるワークフローのスケジュール指定が同じであること(たとえばいずれも毎時実行とか日次実行とかという意味です)、_export ディレクトリ内で待ち受けるワークフロー情報として PROJECT_NAME, WORKFLOW_NAME パラメタが設定されていること、MAX_WAIT_TIME パラメタが設定されていることが必要となります。
使用例
MySQL
あるプロジェクト内において、以下のファイルを用意します。
host: DBのホスト名あるいはIPアドレス
user: DBにアクセスするためのユーザ名
password: 上記ユーザのパスワード
database: アクセスするデータベース名
from pydigdag.MySQLJob import MySQLJob
実行したいクエリを以下のように定義しておきます。
REPLACE INTO {OUTPUT}
SELECT
region_day,
service_key,
SUM(payment) AS payment,
NOW() AS created_at,
NOW() AS updated_at
FROM
{INTPUT}
WHERE
region_day = '{START_DATE}'
GROUP BY
region_day,
service_key
これを digdag で定期的に実行する場合、dig ファイルを以下のように記述することになります。
timezone: Asia/Tokyo
_export:
START_TIME: ${moment(session_time).add(-1, "hours").format("YYYY-MM-DD HH:00:00")}
START_DATE: ${moment(START_TIME).format("YYYY-MM-DD")}
INPUT: hourly_payments
OUTPUT: daily_payments
QUERY_FILE: queries/daily_payments.sql
mysql:
!include : config/mysql.dig
+run:
py>: lib.MySQLJob.run_sql
Chatwork
あるプロジェクト内において、以下のファイルを用意します。
from pydigdag.chatwork import ChatworkApi
apikey: Chatwork API を利用するためのAPIキー
roomid: 投稿先のルームID
endpoint: https://api.chatwork.com/v2
endpoint: http://digdag.example.com/
#!/bin/bash
exit 1
timezone: Asia/Tokyo
_export:
chatwork:
!include : config/chatwork.dig
_error:
py>: lib.ChatworkApi.notify
+task:
sh>: scripts/failed.sh
sample_notify.dig を実行させると以下のようなメッセージが Chatwork に通知されることになります。
以下のタスクでエラーが発生しました
タスク名: +pydigdagTest+task
セッションUUID:7ceed71f-050b-400e-bcd8-d0ecd509335f
セッション時間:2017-08-15T11:47:20+09:00
管理画面URL:http://digdag.example.com/sessions/12345
その他(待受)
タスク
#!/bin/bash
sleep 120
echo "task1"
exit 0
#!/bin/bash
echo "task2"
exit 0
timezone: Asia/Tokyo
+wait_success:
_parallel: true
+task1:
sh>: scripts/task1.sh
+task2:
_export:
TASK_NAME: task1
MAX_WAIT_TIME: 20
+wait:
py>: lib.control.wait_task
+run:
sh>: scripts/task2.sh
上記は task1.sh の実行を待って task2.sh を実行させる例となります。これと同じ結果は単に逐次実行するように記述すればいいのでよい例ではありませんが、あくまで使用例ということで勘弁してください。
ワークフロー
timezone: Asia/Tokyo
schedule:
hourly>: 39:00
+task:
_export:
PROJECT_NAME: required_workflow
WORKFLOW_NAME: required_workflow
MAX_WAIT_TIME: 180
+wait:
py>: lib.control.wait_workflow
+run:
sh>: scripts/task.sh
これを例えば wait_workflow として登録しておきます。
timezone: Asia/Tokyo
schedule:
hourly>: 40:00
+setup:
echo>: start ${session_time}
+run:
sh>: scripts/task.sh
これを required_workflow として登録しておきます。wait_workflow は毎時39分に、required_workflow は毎時40分に実行されますが、wait_workflow は required_workflow の終了を待って実行されるようになります。