概要
Airflow以外でも、Snowflake上でDAG(タスクグラフ)が作成できるので、お遊びでメール通知を含めたタスクグラフを試してみた。
手順
ほぼ公式ドキュメントに則る。
メールアドレスの登録
こちらを参考。今回のメール通知では登録されて、検証済みのメールが通知として届く。
メール通知のFinalizerはRetryごとに呼び出される。呼び出される際は、Retry後の結果を考慮して、ATTEMPT_NUMBERが最後の結果を見て通知が届くようにしている(ハードコーディングしているがもっといい方法があるかもしれない)。
Notification integrationの作成
use role accountadmin;
create or replace notification integration dummy_email_notification
TYPE=EMAIL
ENABLED=TRUE
ALLOWED_RECIPIENTS = ('<登録してあるメールアドレス>')
;
use role securityadmin;
grant usage on integration dummy_email_notification to role stg_developer;
use role stg_developer;
SHOW NOTIFICATION INTEGRATIONS;
-- mailが届くか確認
CALL SYSTEM$SEND_EMAIL(
'dummy_email_notification', -- INTEGRATIONを指定
'<登録してあるメールアドレス>', -- 宛先のアドレスを指定
'TEST MAIL', -- タイトルを指定
'メールのテストです' -- 本文を指定。\nで改行も可能
);
Taskの作成
以下のような構成図のtaskを作成。
-- root taskの作成
CREATE OR REPLACE TASK stg_db.work.root_task
WAREHOUSE = STG_ETL_WH
SCHEDULE = '1 MINUTE'
TASK_AUTO_RETRY_ATTEMPTS = 2
SUSPEND_TASK_AFTER_NUM_FAILURES = 3
USER_TASK_TIMEOUT_MS = 600
AS
BEGIN
CALL SYSTEM$SET_RETURN_VALUE('root_task successful');
END;
;
-- 実験用テーブルの作成
create or replace table stg_db.work.demo (
date_exec DATE,
timestamp_exec TIMESTAMP,
source_name VARCHAR
) as
select current_date(), current_timestamp(), 'new entry';
-- 各タスクの作成
CREATE OR REPLACE TASK stg_db.work.task_a
WAREHOUSE = '<warehouseの指定>'
USER_TASK_TIMEOUT_MS = 180
AFTER root_task
AS
BEGIN
LET VALUE := (SELECT SYSTEM$TASK_RUNTIME_INFO('current_task_name'));
INSERT INTO stg_db.work.demo VALUES(current_date(), current_timestamp(), :VALUE);
END;
;
-- エラー通知実験用の失敗タスク
CREATE OR REPLACE TASK stg_db.work.task_b
WAREHOUSE = '<warehouseの指定>'
USER_TASK_TIMEOUT_MS = 180
AFTER root_task
AS
BEGIN
LET VALUE := (SELECT SYSTEM$TASK_RUNTIME_INFO('current_task_name'));
INSERT INTO stg_db.work.demo VALUES(1/0, current_timestamp(), :VALUE);
END;
;
CREATE OR REPLACE TASK stg_db.work.task_c
WAREHOUSE = '<warehouseの指定>'
USER_TASK_TIMEOUT_MS = 180
AFTER root_task
AS
BEGIN
LET VALUE := (SELECT SYSTEM$TASK_RUNTIME_INFO('current_task_name'));
INSERT INTO stg_db.work.demo VALUES(current_date(), current_timestamp(), :VALUE);
END;
;
-- 各DAGについて、INFORMATION_SCHEMA.TASK_HISTORYから失敗したTASK情報取得の関数
create or replace function STG_DB.WORK.GET_TASK_GRAPH_RUN_SUMMARY(MY_ROOT_TASK_ID string, MY_START_TIME timestamp_ltz)
returns string
as
$$
(
select
ARRAY_AGG(OBJECT_CONSTRUCT(
'TASK_NAME', NAME,
'DATABASE_NAME', DATABASE_NAME,
'SCHEMA_NAME', SCHEMA_NAME,
'STARTED', QUERY_START_TIME,
'DURATION', DURATION,
'QUERY_ID', QUERY_ID,
'ERROR_MESSAGE', ERROR_MESSAGE
)) as GRAPH_RUN_SUMMARY
from
(
select
NAME,
DATABASE_NAME,
SCHEMA_NAME,
to_varchar(QUERY_START_TIME, 'YYYY-MM-DD HH24:MI:SS') as QUERY_START_TIME,
concat(timestampdiff('seconds', QUERY_START_TIME, COMPLETED_TIME), ' s') as DURATION,
QUERY_ID,
ERROR_MESSAGE
from
table(INFORMATION_SCHEMA.TASK_HISTORY(
ROOT_TASK_ID => MY_ROOT_TASK_ID::string,
SCHEDULED_TIME_RANGE_START => MY_START_TIME::timestamp_ltz,
SCHEDULED_TIME_RANGE_END => current_timestamp()
))
where
STATE = 'FAILED'
-- 一番最後のトライで、最も上流のタスク
and ATTEMPT_NUMBER = <リトライを考慮した最大の数>
order by
SCHEDULED_TIME asc nulls last
limit 1
)
)::string
$$
;
-- 失敗したTASK情報をHTMLでメールで表示する関数
CREATE OR REPLACE FUNCTION STG_DB.WORK.HTML_FROM_JSON_TASK_RUNS(JSON_DATA STRING)
RETURNS STRING
LANGUAGE PYTHON
RUNTIME_VERSION = '3.11'
HANDLER = 'GENERATE_HTML_TABLE'
AS
$$
import json
def GENERATE_HTML_TABLE(JSON_DATA):
column_widths = ["250px", "120px", "120px", "300px", "80px", "400px", "480px"]
DATA = json.loads(JSON_DATA)
HTML = f"""
<p><strong>Task Graph Error🔴</strong>
<br>Sign in to Snowsight to see more details.</p>
<table border="1" style="border-color:#DEE3EA"
cellpadding="5" cellspacing="0">
<thead>
<tr>
"""
headers = ["Task name", "Database name", "Schema name", "Started", "Duration","Query ID", "Error message"]
for i, header in enumerate(headers):
HTML += f'<th scope="col" style="text-align:left; width: {column_widths[i]}">{header.capitalize()}</th>'
HTML += """
</tr>
</thead>
<tbody>
"""
for ROW_DATA in DATA:
HTML += "<tr>"
for header in headers:
key = header.replace(" ", "_").upper()
CELL_DATA = ROW_DATA.get(key, "")
HTML += f'<td style="text-align:left; width: {column_widths[headers.index(header)]}">{CELL_DATA}</td>'
HTML += "</tr>"
HTML += """
</tbody>
</table>
"""
return HTML
$$
;
-- エラー通知のタスクを作成
create or replace task stg_db.work.notify_finalizer
warehouse = '<warehouseの指定>'
finalize = root_task
as
declare
MY_ROOT_TASK_ID string;
MY_START_TIME timestamp_ltz;
SUMMARY_JSON string;
SUMMARY_HTML string;
begin
-- get root task ID
MY_ROOT_TASK_ID := (call SYSTEM$TASK_RUNTIME_INFO('CURRENT_ROOT_TASK_UUID'));
-- get root task scheduled time
MY_START_TIME := (call SYSTEM$TASK_RUNTIME_INFO('CURRENT_TASK_GRAPH_ORIGINAL_SCHEDULED_TIMESTAMP'));
-- combine all task run infos into one json string
SUMMARY_JSON := (select GET_TASK_GRAPH_RUN_SUMMARY(:MY_ROOT_TASK_ID, :MY_START_TIME));
-- convert json into html table
SUMMARY_HTML := (select HTML_FROM_JSON_TASK_RUNS(:SUMMARY_JSON));
-- send html to email
if (length(:SUMMARY_HTML) > 0) then
call SYSTEM$SEND_EMAIL(
'dummy_email_notification',
'<登録してあるメールアドレス>',
'DAG error notigication for root_task',
:SUMMARY_HTML,
'text/html'
);
end if;
end;
;
show tasks;
DAGの実行
以下の手順で実行。DAGの止め忘れには要注意。各タスクの実行では、事前にアカウント単位でEXECUTE TASKの権限を各roleに付与する必要あり。
-- DAG実行
alter task task_a resume;
alter task task_b resume;
alter task task_c resume;
alter task notify_finalizer resume;
alter task root_task resume;
-- DAG停止
alter task root_task suspend;
結果
おまけ(AWS SNSを用いてSlackへメール通知)
メール通知でのCALL SYSTEM$SEND_EMAILで届くメールはSnowflake上で検証済みのメール、つまりSnowflakeでユーザー登録されたメールとなる。SnowflakeでSlack用のユーザー登録しない方法として、AWS SNSを用いる場合をおまけで紹介する。
手順
公式ドキュメントのクラウドプロバイダーの通知送信手順と↑のメール通知手順をベースとする。
Amazon SNS トピックの作成とサブスクリプションの追加
細かくなるので省略。こちらの記事(AWSのSNSでslackにアラーム通知する方法)とかわかりやすいかも。
Notification integrationの作成
以下のように作成する。
create notification integration if not exists dummy_sns_notification
enabled = true
type = queue
notification_provider = aws_sns
direction = outbound
aws_sns_role_arn='<作成したSNSトピックのARN>'
aws_sns_topic_arn='<作成したIAMロールのARN>'
;
IAMユーザーARNとSNSトピック外部IDを取得
desc integration dummy_sns_notification;を実行して、SF_AWS_IAM_USER_ARNとSF_AWS_EXTERNAL_IDの値を記録する。
IAMロールの信頼関係を変更
作成したIAMロールの信頼ポリシーを以下を付け加え、更新する
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "",
"Effect": "Allow",
"Principal": {
"AWS": "<SF_AWS_IAM_USER_ARNの値>"
},
"Action": "sts:AssumeRole",
"Condition": {
"StringEquals": {
"sts:ExternalId": "<SF_AWS_EXTERNAL_IDの値>"
}
}
}
]
}
Taskを作成
権限や構成などは↑のメール通知機能のケースと同様なので省略。Gmailとは違い、HTML機能を表示するには別途手順が必要そうなので、今回はJSON形式で通知する。
-- 各DAGについて、INFORMATION_SCHEMA.TASK_HISTORYから失敗したTASK情報取得の関数
create or replace function DEV_DB.WORK.GET_TASK_GRAPH_RUN_SUMMARY(MY_ROOT_TASK_ID string, MY_START_TIME timestamp_ltz)
returns string
as
$$
(
select
OBJECT_CONSTRUCT(
'TASK_NAME', NAME,
'DATABASE_NAME', DATABASE_NAME,
'SCHEMA_NAME', SCHEMA_NAME,
'STARTED', QUERY_START_TIME,
'DURATION', DURATION,
'QUERY_ID', QUERY_ID,
'ERROR_MESSAGE', ERROR_MESSAGE
) as GRAPH_RUN_SUMMARY
from
(
select
NAME,
DATABASE_NAME,
SCHEMA_NAME,
to_varchar(QUERY_START_TIME, 'YYYY-MM-DD HH24:MI:SS') as QUERY_START_TIME,
concat(timestampdiff('seconds', QUERY_START_TIME, COMPLETED_TIME), ' s') as DURATION,
QUERY_ID,
ERROR_MESSAGE
from
table(INFORMATION_SCHEMA.TASK_HISTORY(
ROOT_TASK_ID => MY_ROOT_TASK_ID::string,
SCHEDULED_TIME_RANGE_START => MY_START_TIME::timestamp_ltz,
SCHEDULED_TIME_RANGE_END => current_timestamp()
))
where
STATE = 'FAILED'
-- 一番最後のトライで、最も上流のタスク
and ATTEMPT_NUMBER = <リトライを考慮した最大の数>
order by
SCHEDULED_TIME asc nulls last
limit 1
)
)::string
$$
;
-- create finalizer task
create or replace task dev_db.work.notify_finalizer
warehouse = '<warehouseの指定>'
finalize = dev_db.work.root_task
as
declare
MY_ROOT_TASK_ID string;
MY_START_TIME timestamp_ltz;
SUMMARY_JSON string;
SUMMARY_HTML string;
begin
-- get root task ID
MY_ROOT_TASK_ID := (call SYSTEM$TASK_RUNTIME_INFO('CURRENT_ROOT_TASK_UUID'));
-- get root task scheduled time
MY_START_TIME := (call SYSTEM$TASK_RUNTIME_INFO('CURRENT_TASK_GRAPH_ORIGINAL_SCHEDULED_TIMESTAMP'));
-- combine all task run infos into one json string
SUMMARY_JSON := (select GET_TASK_GRAPH_RUN_SUMMARY(:MY_ROOT_TASK_ID, :MY_START_TIME));
if (length(:SUMMARY_JSON) > 0) then
CALL SYSTEM$SEND_SNOWFLAKE_NOTIFICATION(
SNOWFLAKE.NOTIFICATION.APPLICATION_JSON(:SUMMARY_JSON),
SNOWFLAKE.NOTIFICATION.INTEGRATION('dummy_sns_notification')
);
end if;
end;
;
DAGを実行
↑のメール通知と同じなので省略
結果
クラウドプロバイダよりメール通知のほうが、勝手にHTMLを良しなに表示してくれるので、見た目はそっちの方がいいかも。Amazon SNSでもHTML表示頑張れるかもしれないが、今回はスキップ。
まとめ・感想
- Airflowの方が書いていて楽しいが、Snowflakeで完結させたいならタスクグラフの方がいいかもしれない
- Slackなどに届けたい場合、メール通知機能ではわざわざユーザー作らなくてはいけないが、そんなことしたくないひとはAWS SNSサービスを用いてもいいかもしれない(Snowflakeでまとめて管理できるならメール通知の方がいいかも)

