結局ダメだったので違う方法で実装した話。
使いたかった経緯
- 集計元のRawDataは1日1回別システムからS3経由のDataConnectorでTreasureDataにimport。
- 日次で1回動くworkflowで前日分のデータを集計。
- 相手方のシステムが前日分データは次の日には入らない(何日か遅れる【不定】と言ってきた。)
- 仕方がないのでworkflowで前7日分のデータを毎日再集計することに。
- tdコマンドの
workflow backfill
使えば行けるんじゃね?という思いから調査してみたがどうにもworkflow上で動かない。 - サポート問い合わせで「それはできません。」との回答をもらう。
- 仕方がないので別な方法で対応した話。
最終的に実装した方法
backfillが使えればそのまま行けたのだけれどもdigファイルとsqlファイルを修正して対応。
対応方針:workflowでループ組んで1日分のtimestamp毎にfor_rangeで繰り返す。
digファイル
+task1:
for_range>:
from: ${moment(session_time).add('days', -7).unix()}
to: ${session_unixtime}
step: 86400
_do:
+sub1:
td>: analysis.sql
insert_into: tmp_reporting
analysis.sql
SELECT
TD_TIME_FORMAT(${range.from}, 'yyyy-MM-dd', 'JST') AS aggregate_day,
IF(
SUM(sku_ex_vat) IS NULL,
0,
SUM(sku_ex_vat) AS bigint)
) AS sales_amount
FROM
order_table
WHERE
TD_TIME_RANGE(order_timestamp,
TD_TIME_FORMAT(${range.from}, 'yyyy-MM-dd', 'JST'),
TD_TIME_FORMAT(${range.to}, 'yyyy-MM-dd', 'JST'),
'JST')
workflowでもっといろいろ出来るといいなー