TreasureData Workflow(digdag)とは?
TreasureDataが開発したワークフローエンジンです。
詳細は以下のドキュメントを。
https://www.digdag.io/
http://docs.digdag.io/
https://docs.treasuredata.com/articles/workflows
はじめる前に
概要はもちろん読んでおくとして、projectやsession, attemptなるものが出てきて、最初はやや混乱するので、以下を読んでおくとよさそう
http://docs.digdag.io/concepts.html
ここからコピペです
Projects and revisions
In Digdag, workflows are packaged together with other files used in the workflows. The files can be anything such as SQL scripts, Python/Ruby/Shell scripts, configuration files, etc. This set of the workflow definitions is called project.
なるほどなるほど。
Sessions and attempts
A session is a plan to run a workflow which should complete successfully. An attempt is an actual execution of a session. A session has multiple attempts if you retry a failed workflow.
sessionは1つのワークフローの実行単位で、sessionとattemptsは1:nの関係であると、なぜなら
The reason why sessions and attempts are separated is that an execution may fail.
失敗する可能性があるので分けていると。
なるほどなるほど。
実践
基本的にはここに書いてあるチュートリアルをもとに(というかほぼおなじこと)をやりました
workflowを定義するdigファイルを作成
こんな感じ。内容としては日次(7:00)にsum1.sqlを実行して結果をtest_wf1のテーブルに入れて、次にsum2.sqlを実行してtest_wf2に入れます。
timezone: Asia/Tokyo
schedule:
daily>: 07:00:00
_export:
td:
database: test
+task1:
td>: queries/sum1.sql
create_table: test_wf1
+task2:
td>: queries/sum2.sql
create_table: test_wf2
SQL
SQLはこんな感じです。test1とtest2というテーブルに同じSQLを投げているだけです。
SELECT
created_by,
count(distinct ip) ip,
count(distinct user) uu,
count(1) as logs
FROM
test1
WHERE
TD_TIME_RANGE(time, '2016-12-08 00:00:00','2016-12-08 01:00:00')
GROUP BY
created_by
SELECT
created_by,
count(distinct ip) ip,
count(distinct user) uu,
count(1) as logs
FROM
test2
WHERE
TD_TIME_RANGE(time, '2016-12-08 00:00:00','2016-12-08 01:00:00')
GROUP BY
created_by
ディレクトリ構成
SQLはqueries配下に置きます
wf_sample/
├── job1.dig
└── queries
├── sum1.sql
└── sum2.sql
ローカルで実行
TDのworkflowへ上げる前にローカルで実行してみましょう
$ td wf run job1
2016-12-08 20:15:49 +0900: Digdag v0.8.22
2016-12-08 20:16:02 +0900 [WARN] (main): Using a new session time 2016-12-08T00:00:00+09:00 based on schedule.
2016-12-08 20:16:02 +0900 [INFO] (main): Using session /xxx/xxx/digdag/wf_sample/.digdag/status/20161208T000000+0900.
2016-12-08 20:16:02 +0900 [INFO] (main): Starting a new session project id=1 workflow name=job1 session_time=2016-12-08T00:00:00+09:00
2016-12-08 20:16:04 +0900 [INFO] (0017@+job1+task1): td>: queries/sum1.sql
2016-12-08 20:16:05 +0900 [INFO] (0017@+job1+task1): td-client version: 0.7.26
2016-12-08 20:16:05 +0900 [INFO] (0017@+job1+task1): Logging initialized @16349ms
2016-12-08 20:16:06 +0900 [INFO] (0017@+job1+task1): td>: queries/sum1.sql
2016-12-08 20:16:07 +0900 [INFO] (0017@+job1+task1): Started presto job id=xxxxxxx:
DROP TABLE IF EXISTS "test_wf1";
CREATE TABLE "test_wf1" AS
SELECT
created_by,
count(distinct ip) ip,
count(distinct user) uu,
count(1) as logs
FROM
test1
WHERE
TD_TIME_RANGE(time, '2016-12-08 00:00:00','2016-12-08 01:00:00')
GROUP BY
created_by
2016-12-08 20:16:09 +0900 [INFO] (0017@+job1+task1): td>: queries/sum1.sql
2016-12-08 20:16:13 +0900 [INFO] (0017@+job1+task1): td>: queries/sum1.sql
2016-12-08 20:16:17 +0900 [INFO] (0017@+job1+task1): td>: queries/sum1.sql
2016-12-08 20:16:26 +0900 [INFO] (0017@+job1+task1): td>: queries/sum1.sql
2016-12-08 20:16:28 +0900 [INFO] (0017@+job1+task2): td>: queries/sum2.sql
2016-12-08 20:16:29 +0900 [INFO] (0017@+job1+task2): td>: queries/sum2.sql
2016-12-08 20:16:31 +0900 [INFO] (0017@+job1+task2): Started presto job id=xxxxxxx:
DROP TABLE IF EXISTS "test_wf2";
CREATE TABLE "test_wf2" AS
SELECT
created_by,
count(distinct ip) ip,
count(distinct user) uu,
count(1) as logs
FROM
test2
WHERE
TD_TIME_RANGE(time, '2016-12-08 00:00:00','2016-12-08 01:00:00')
GROUP BY
created_by
2016-12-08 20:16:33 +0900 [INFO] (0017@+job1+task2): td>: queries/sum2.sql
2016-12-08 20:16:36 +0900 [INFO] (0017@+job1+task2): td>: queries/sum2.sql
Success. Task state is saved at /xxx/xxx/digdag/wf_sample/.digdag/status/20161208T000000+0900 directory.
ちゃんと集計されたデータが入っています
TD workflowへpush
正常に動作することを確認できたので、定義したjobをpushします
$ td wf push wf_sample
2016-12-08 20:09:23 +0900: Digdag v0.8.22
Creating .digdag/tmp/archive-5532475972498044837.tar.gz...
Archiving job1.dig
Archiving queries/sum1.sql
Archiving queries/sum2.sql
Workflows:
job1
Uploaded:
id: xxxxxxx
name: wf_sample
revision: 36b26035-872d-419e-bed8-ab665d5996d9
archive type: db
project created at: 2016-12-08T11:09:43Z
revision updated at: 2016-12-08T11:09:43Z
アップされたようです。コマンドラインからも確認できますが、UIもあるのでそっちで確認してみましょう。
いい感じですね。次回の実行時間なんかもでてます。
TD workflowでの実行
pushに成功したのでTD workflow(サーバサイド)で実行してみます。
$ td wf start wf_sample job1 --session now
2016-12-08 20:22:03 +0900: Digdag v0.8.22
Started a session attempt:
session id: xxxxxx
attempt id: xxxxxx
uuid: 4608a620-b8c2-48f9-beb3-209f0b203d44
project: wf_sample
workflow: job1
session time: 2016-12-08 20:22:15 +0900
retry attempt name:
params: {"last_session_time":"2016-12-08T00:00:00+09:00","next_session_time":"2016-12-09T00:00:00+09:00"}
created at: 2016-12-08 20:22:26 +0900
* Use `td workflow session xxxxxx` to show session status.
* Use `td workflow task xxxxxx` and `td workflow log 613218` to show task status and logs.
こちらも管理画面で確認してみる。Sessionsに1行追加されていて、StatusがSuccessになっています。
詳細画面ではタスク1つ1つの状況も見えます
UIがなかなかいい感じです。
もうちょっと使ってみたですが書き疲れたので一旦ここで終了。
digdagとは違って、TDのworkflowの場合シェルなどのTDのクエリ以外はいまのところできないが、それでもTD内で完結するものもあるので、ちょっとしたものならこれで十分な気がします。