始めに
Treasure Workflow(以下workflow)の使い方として、よく日次で定形処理を実行するということがあるかと思います。
さらにworkflow内でクエリを実行し、その際に実行日から任意の日数分前までを処理範囲としたいという要望もユースケースとして想定されますよね。その場合workflowの実行日を取得しクエリに組み込む必要があるのですが、実装方法がわからない、実装してみたけど期待した結果にならないなどあるかと思いますので、サンプルを作成し動かしつつ説明していこうかと思います。
本記事の最終目標
本記事の最終目標は、下記クエリを定期的にworkflowから実行することとします。
1つずつ確認しながら実装していきましょう。
SELECT *
FROM <table_name>
WHERE time_col BETWEEN <n_days_ago> AND <today>;
workflowの変数について
workflowは事前に定義した変数を ${<変数名>}
という形で参照し利用することができます。
var_test
という変数を定義してecho>:
オペレータで表示してみましょう。
_export:
var_test: this is test for variable
+echo_task:
echo>: ${var_test}
ローカルモード(td wf run
コマンド)で実行します。
$ td wf run test.dig
2019-12-23 17:36:50 +0900: Digdag v0.9.39
・
・
2019-12-23 17:36:53 +0900 [INFO] (0017@[0:default]+test_echo_var+echo_task): echo>: this is test for variable
this is test for variable
workflowの変数を利用することができましたね。
TDのクエリで変数を利用するには
Oracle Databaseならバインド変数、MySQLならユーザー定義変数と呼びますが、SQLに変数を設定し値を代入することで動的に実行することができます。
Treasure Data(TD)で同様に変数利用するにはworkflowを利用します。
まず、workflowでクエリを実行するにはtd>:
オペレータを利用します。下記イメージですね。
_export:
database: test_db
+run_query:
td>: queries/sample.sql
td>:
オペレータの引数で設定したファイル(queries配下のsample.sql)に記述されたクエリが実行されます。ちなみにデフォルトのクエリエンジンはHiveではなくPrestoです。
SELECT 'test1' AS col1;
これに前述のworkflowにおける変数の利用方法と合わせることで、クエリで変数利用することができます。実際に試してみましょう。
まずは下記digファイルを作成します。var_testという変数を定義しています。
_export:
database: test_db
var_test: this is test for variable
+run_query:
td>: queries/sample.sql
そして下記のようにSQLファイルに記載したクエリで${<変数名>}
を記述します。
SELECT '${var_test}' AS col1;
このworkflowを実際に実行してみると下記ログのようにクエリSELECT 'this is test for variable' AS col1;
が実行されたことがわかります。
2019-12-29 12:52:41.976 +0000 [INFO] (0261@[X:test_var_prj]+test+run_query) io.digdag.core.agent.OperatorManager: td>: queries/sample.sql
2019-12-29 12:52:43.531 +0000 [INFO] (0173@[X:test_var_prj]+test+run_query) io.digdag.core.agent.OperatorManager: td>: queries/sample.sql
2019-12-29 12:52:44.335 +0000 [INFO] (0173@[X:test_var_prj]+test+run_query) io.digdag.standards.operator.td.TdOperatorFactory$TdOperator: Started presto job id=XXXXXXXX:
SELECT 'this is test for variable' AS col1;
2019-12-29 12:52:46.255 +0000 [INFO] (0203@[X:test_var_prj]+test+run_query) io.digdag.core.agent.OperatorManager: td>: queries/sample.sql
workflowにおける実行日の取得方法
ここまでの説明で、クエリにてworkflowで定義した変数を利用することができるようになりました。後は変数にworkflow実行日を渡すことでなんとかなりそうですよね。
workflow内でどのように実行日を取得するかと言いますと、ビルトイン変数というものを利用します。ビルトイン変数とは、その名の通りworkflowにもともと備わっている変数のことです。
色々と種類があるので詳細はドキュメントを参照してほしいのですが、例えばsession_date
という名前のビルトイン変数はworkflowのsession_time
の年・月・日が格納されている変数になります。(session_timeとはworkflowのセッション時間、すなわちworkflowの実行予定時刻のことを指します)
実際にsession_time
とsession_date
をecho>:
オペレータを使ってログに出力してみましょう。
+echo_session_time:
echo>: ${session_time}
+echo_session_date:
echo>: ${session_date}
上記を実行すると下記のようなログが出力されます。workflowを実行したのが2019/12/29の13時過ぎ(タイムゾーンはUTC)なのですが下記のようにsession_date
には2019-12-29
という形で値が格納されていたことがわかります。
2019-12-29 13:04:50.758 +0000 [INFO] (0253@[X:test_var_prj]+echo_session_var+echo_session_time) io.digdag.core.agent.OperatorManager: echo>: 2019-12-29T13:04:46+00:00
2019-12-29 13:04:52.394 +0000 [INFO] (0320@[X:test_var_prj]+echo_session_var+echo_session_date) io.digdag.core.agent.OperatorManager: echo>: 2019-12-29
workflowの実行日を取得することができましたね。後は実際のユースケースに落とし込んで行きましょう。
workflowで日付の計算をする
後は実装するだけなのですが、今までの説明はworkflow実行日を文字列として取得しているだけなので、n日前を求めるにはどうすればいいのか迷いませんか?
色々と方法があるかと思いますが、ここでは2つ方法を紹介します。
クエリで日付計算する
1つめの方法としてクエリ内で計算する、というものが挙げられます。
ドキュメントにも下記サンプルがある通り、PrestoではDATE型を利用することで日付の加算・減算が可能になります。下の例ではDATE型の値date '2012-08-08'
から2日引いています。
Operator | Example | Result |
---|---|---|
- | date '2012-08-08' - interval '2' day | 2012-08-06 |
workflowの変数は参照しても文字列として値を渡すだけなので、まずは文字列型の値をDATE型に変換する必要があります。ここではcast関数を使いましょう。cast(<変換対象> as <変換先のデータ型>)
というフォーマットでデータ型を変換する関数です。利用イメージは下記になります。
ちなみに変換対象が日付として正しくない値(例えば13月のような2019-13-08
)はエラーになります。
SELECT cast('2019-08-08' AS DATE) AS cast_date,
...
これに先述したクエリにおける変数利用を組み合わせると下記になります。
SELECT cast_date('${session_date}' AS DATE) AS cast_sess_date,
...
さらにドキュメントの例のように日数計算しようとすると下記クエリの2カラム目のようになります。workflowからこれを実行してみましょう。
SELECT cast('${session_date}' AS DATE) AS sess_date,
cast('${session_date}' AS DATE) - interval '2' day AS sess_date_2days_ago;
上記を実行すると下記結果が返ります。正しく2日前が計算され返されていますね。
Moment.jsで日付計算する
2つめの方法として、Moment.jsを利用して日付の計算を行う、ということが挙げられます。
実はworkflowの変数参照部分では簡単なJavaScriptを実行させることができます。Momemt.jsとはドキュメントに下記記載があります通り、日付や時刻の操作をJavaScriptで行うことができるライブラリです。workflowではこのライブラリがバンドルされているため、特に設定することなくMoment.jsを利用することができます。
Parse, validate, manipulate, and display dates and times in JavaScript.
順を追って説明していきます。
###JavaScriptをworkflowで利用する方法
${<変数名>}
という利用ばかりしていると特に間違えやすいのですが、例えばJavaScriptの記法を使って文字列比較する場合に${変数名}==='test'
という書き方で動作すると期待してしまうかもしれません。ですが、正しくは${変数名==='test'}
というように${...}
の中で完結するようにする必要があります。
具体的には下記のようにして利用することになります。
_export:
test_var: test
+echo_check:
echo>: ${test_var==='test'}
workflow内でMoment.jsを使う方法
workflow内でJavaScriptを利用することができることがわかったかと思います。Moment.jsは先述の通りデフォルトでバンドルされているため、後はMoment.jsの記法を${...}
内で記載するだけですね。
利用したことがある方には不要な説明になりますが、例えば2日前という計算をする場合はsubtract関数を利用します。実際に下記を実行してみます。
+echo_session_date:
echo>: ${session_date}
+echo_session_date_2days_ago:
echo>: ${moment(session_date).subtract(2, 'days').format("YYYY-MM-DD")}
すると下記のように正しく2日前を算出できていることがわかります。
2019-12-29 14:14:12.761 +0000 [INFO] (0319@[0:test_moment_prj]+test_moment+echo_session_date) io.digdag.core.agent.OperatorManager: echo>: 2019-12-29
2019-12-29 14:14:18.388 +0000 [INFO] (0201@[0:test_moment_prj]+test_moment+echo_session_date_2days_ago) io.digdag.core.agent.OperatorManager: echo>: 2019-12-27
これをクエリと組み合わせると下記のようになります。
_export:
database: test_db
sess_date_2days_ago: ${moment(session_date).subtract(2, 'days').format("YYYY-MM-DD")}
+run_query:
td>: queries/calc_date.sql
SELECT '${sess_date_2days_ago}' AS sess_date_2days_ago;
実行すると下記のように2日前が正しく計算されて返されることがわかります。
実装方法
下記をworkflowで実行することが目標だったので、これまでの説明内容を使って実装していきます。
SELECT *
FROM <table_name>
WHERE time_col BETWEEN <n_days_ago> AND <today>;
workflow利用者にとってあまりメジャーではない(?)Moment.jsを利用する方法を試してみます。
2019/12/30に実行されるworkflowにて、5日前(2019/12/25)から実行日までのデータを下記テーブルから抽出するケースを考えます。
col1 | col2 | time |
---|---|---|
1 | 2019-12-22 | 1577631248 |
2 | 2019-12-23 | 1577631248 |
3 | 2019-12-24 | 1577631248 |
4 | 2019-12-25 | 1577631248 |
5 | 2019-12-26 | 1577631248 |
6 | 2019-12-27 | 1577631248 |
7 | 2019-12-28 | 1577631248 |
8 | 2019-12-29 | 1577631248 |
9 | 2019-12-30 | 1577631248 |
10 | 2019-12-31 | 1577631248 |
2019/12/25から2019/12/30までの6レコードが対象になります。
digファイルは下記のようになるでしょう。ここまでのサンプルでは記載していませんでしたが、workflow内の日付計算を日本時間(JST)で行うにはtimezone:
設定が必要です。
_export:
database: test_tab
sess_date_5days_ago: ${moment(session_date).subtract(5, 'days').format("YYYY-MM-DD")}
timezone: Asia/Tokyo
+run_query:
td>: queries/extract_5_days.sql
SQLファイルは下記のようになります。今まではworkflowで定義した変数はSELECT内で参照しているだけでしたが、今回はWHERE句で利用しています。
SELECT col1,
col2
FROM date_test_tab
WHERE col2 BETWEEN '${sess_date_5days_ago}' AND '${session_date}';
このworkflowを実行すると変数の値が渡されることになり下記クエリが実際には実行されます。
SELECT col1,
col2
FROM date_test_tab
WHERE col2 BETWEEN '2019-12-25' AND '2019-12-30';
その結果下記結果が返ります。ORDER BY
を付与していないので分かりづらいのですが、期待した結果を抽出できていますね。
最後に
説明が長くなってしまいましたがいかがでしたでしょうか?
本記事で説明した内容を理解し実装することができれば、色々な要素が詰まった方法になるため実現できることの幅が広がるのではないかと思います。
よろしければ参考にしていただけますと幸いです。