1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Snowflakeのタスクグラフでメール通知を試してみた

Last updated at Posted at 2025-12-17

概要

Airflow以外でも、Snowflake上でDAG(タスクグラフ)が作成できるので、お遊びでメール通知を含めたタスクグラフを試してみた。

手順

ほぼ公式ドキュメントに則る。

メールアドレスの登録

こちらを参考。今回のメール通知では登録されて、検証済みのメールが通知として届く。
メール通知のFinalizerはRetryごとに呼び出される。呼び出される際は、Retry後の結果を考慮して、ATTEMPT_NUMBERが最後の結果を見て通知が届くようにしている(ハードコーディングしているがもっといい方法があるかもしれない)。

Notification integrationの作成

use role accountadmin;
create or replace notification integration dummy_email_notification
  TYPE=EMAIL
  ENABLED=TRUE
  ALLOWED_RECIPIENTS = ('<登録してあるメールアドレス>')
;

use role securityadmin;
grant usage on integration dummy_email_notification to role stg_developer;

use role stg_developer;

SHOW NOTIFICATION INTEGRATIONS;

-- mailが届くか確認
CALL SYSTEM$SEND_EMAIL(
    'dummy_email_notification',     -- INTEGRATIONを指定
    '<登録してあるメールアドレス>',       -- 宛先のアドレスを指定
    'TEST MAIL',                  -- タイトルを指定
    'メールのテストです'            -- 本文を指定。\nで改行も可能
);

Taskの作成

以下のような構成図のtaskを作成。

image.png

-- root taskの作成
CREATE OR REPLACE TASK stg_db.work.root_task
  WAREHOUSE = STG_ETL_WH
  SCHEDULE = '1 MINUTE'
  TASK_AUTO_RETRY_ATTEMPTS = 2
  SUSPEND_TASK_AFTER_NUM_FAILURES = 3
  USER_TASK_TIMEOUT_MS = 600
  AS
    BEGIN
      CALL SYSTEM$SET_RETURN_VALUE('root_task successful');
    END;
;

-- 実験用テーブルの作成
create or replace table stg_db.work.demo (
date_exec DATE,
timestamp_exec TIMESTAMP,
source_name VARCHAR
) as 
select current_date(), current_timestamp(), 'new entry';

-- 各タスクの作成
CREATE OR REPLACE TASK stg_db.work.task_a
  WAREHOUSE = '<warehouseの指定>'
  USER_TASK_TIMEOUT_MS = 180
  AFTER root_task
  AS
    BEGIN
      LET VALUE := (SELECT SYSTEM$TASK_RUNTIME_INFO('current_task_name'));
      INSERT INTO stg_db.work.demo VALUES(current_date(), current_timestamp(), :VALUE);
    END;
;
-- エラー通知実験用の失敗タスク
CREATE OR REPLACE TASK stg_db.work.task_b
  WAREHOUSE = '<warehouseの指定>'
  USER_TASK_TIMEOUT_MS = 180
  AFTER root_task
  AS
    BEGIN
      LET VALUE := (SELECT SYSTEM$TASK_RUNTIME_INFO('current_task_name'));
      INSERT INTO stg_db.work.demo VALUES(1/0, current_timestamp(), :VALUE);
    END;
;

CREATE OR REPLACE TASK stg_db.work.task_c
  WAREHOUSE = '<warehouseの指定>'
  USER_TASK_TIMEOUT_MS = 180
  AFTER root_task
  AS
    BEGIN
      LET VALUE := (SELECT SYSTEM$TASK_RUNTIME_INFO('current_task_name'));
      INSERT INTO stg_db.work.demo VALUES(current_date(), current_timestamp(), :VALUE);
    END;
;

-- 各DAGについて、INFORMATION_SCHEMA.TASK_HISTORYから失敗したTASK情報取得の関数
create or replace function STG_DB.WORK.GET_TASK_GRAPH_RUN_SUMMARY(MY_ROOT_TASK_ID string, MY_START_TIME timestamp_ltz)
returns string
as
$$
  (
    select
      ARRAY_AGG(OBJECT_CONSTRUCT(
        'TASK_NAME', NAME,
        'DATABASE_NAME', DATABASE_NAME,
        'SCHEMA_NAME', SCHEMA_NAME,
        'STARTED', QUERY_START_TIME,
        'DURATION', DURATION,
        'QUERY_ID', QUERY_ID,
        'ERROR_MESSAGE', ERROR_MESSAGE
      )) as GRAPH_RUN_SUMMARY
    from
    (
        select
            NAME,
            DATABASE_NAME,
            SCHEMA_NAME,
            to_varchar(QUERY_START_TIME, 'YYYY-MM-DD HH24:MI:SS') as QUERY_START_TIME,
            concat(timestampdiff('seconds', QUERY_START_TIME, COMPLETED_TIME), ' s') as DURATION,
            QUERY_ID,
            ERROR_MESSAGE
        from
            table(INFORMATION_SCHEMA.TASK_HISTORY(
                  ROOT_TASK_ID => MY_ROOT_TASK_ID::string,
                  SCHEDULED_TIME_RANGE_START => MY_START_TIME::timestamp_ltz,
                  SCHEDULED_TIME_RANGE_END => current_timestamp()
          ))
        where
            STATE = 'FAILED'
            -- 一番最後のトライで、最も上流のタスク
            and ATTEMPT_NUMBER = <リトライを考慮した最大の数>
        order by
            SCHEDULED_TIME asc nulls last
        limit 1
    )
  )::string
$$
;

-- 失敗したTASK情報をHTMLでメールで表示する関数
CREATE OR REPLACE FUNCTION STG_DB.WORK.HTML_FROM_JSON_TASK_RUNS(JSON_DATA STRING)
  RETURNS STRING
  LANGUAGE PYTHON
  RUNTIME_VERSION = '3.11'
  HANDLER = 'GENERATE_HTML_TABLE'
AS
$$
import json

def GENERATE_HTML_TABLE(JSON_DATA):
    column_widths = ["250px", "120px", "120px", "300px", "80px", "400px", "480px"]

    DATA = json.loads(JSON_DATA)
    HTML = f"""
    <p><strong>Task Graph Error🔴</strong>
      <br>Sign in to Snowsight to see more details.</p>
    <table border="1" style="border-color:#DEE3EA"
      cellpadding="5" cellspacing="0">
      <thead>
        <tr>
    """
    headers = ["Task name", "Database name", "Schema name", "Started", "Duration","Query ID", "Error message"]
    for i, header in enumerate(headers):
        HTML += f'<th scope="col" style="text-align:left; width: {column_widths[i]}">{header.capitalize()}</th>'

    HTML += """
        </tr>
      </thead>
      <tbody>
    """
    for ROW_DATA in DATA:
        HTML += "<tr>"
        for header in headers:
            key = header.replace(" ", "_").upper()
            CELL_DATA = ROW_DATA.get(key, "")
            HTML += f'<td style="text-align:left; width: {column_widths[headers.index(header)]}">{CELL_DATA}</td>'
        HTML += "</tr>"
    HTML += """
      </tbody>
    </table>
    """
    return HTML
$$
;

-- エラー通知のタスクを作成
create or replace task stg_db.work.notify_finalizer
warehouse = '<warehouseの指定>'
finalize = root_task
as
  declare
    MY_ROOT_TASK_ID string;
    MY_START_TIME timestamp_ltz;
    SUMMARY_JSON string;
    SUMMARY_HTML string;
  begin
    -- get root task ID
    MY_ROOT_TASK_ID := (call SYSTEM$TASK_RUNTIME_INFO('CURRENT_ROOT_TASK_UUID'));
   
    -- get root task scheduled time
    MY_START_TIME := (call SYSTEM$TASK_RUNTIME_INFO('CURRENT_TASK_GRAPH_ORIGINAL_SCHEDULED_TIMESTAMP'));
    
    -- combine all task run infos into one json string
    SUMMARY_JSON := (select GET_TASK_GRAPH_RUN_SUMMARY(:MY_ROOT_TASK_ID, :MY_START_TIME));
   
    -- convert json into html table
    SUMMARY_HTML := (select HTML_FROM_JSON_TASK_RUNS(:SUMMARY_JSON));
   
    -- send html to email
    if (length(:SUMMARY_HTML) > 0) then
        call SYSTEM$SEND_EMAIL(
          'dummy_email_notification',
          '<登録してあるメールアドレス>',
          'DAG error notigication for root_task',
          :SUMMARY_HTML,
          'text/html'
        );
    end if;
  
  end;
;

show tasks;

DAGの実行

以下の手順で実行。DAGの止め忘れには要注意。各タスクの実行では、事前にアカウント単位でEXECUTE TASKの権限を各roleに付与する必要あり。

-- DAG実行
alter task task_a resume;
alter task task_b resume;
alter task task_c resume;
alter task notify_finalizer resume;
alter task root_task resume;

-- DAG停止
alter task root_task suspend;

結果

sample.png

おまけ(AWS SNSを用いてSlackへメール通知)

メール通知でのCALL SYSTEM$SEND_EMAILで届くメールはSnowflake上で検証済みのメール、つまりSnowflakeでユーザー登録されたメールとなる。SnowflakeでSlack用のユーザー登録しない方法として、AWS SNSを用いる場合をおまけで紹介する。

手順

公式ドキュメントのクラウドプロバイダーの通知送信手順と↑のメール通知手順をベースとする。

Amazon SNS トピックの作成とサブスクリプションの追加

細かくなるので省略。こちらの記事(AWSのSNSでslackにアラーム通知する方法)とかわかりやすいかも。

Notification integrationの作成

以下のように作成する。

create notification integration  if not exists dummy_sns_notification
  enabled = true
  type = queue
  notification_provider = aws_sns
  direction = outbound
  aws_sns_role_arn='<作成したSNSトピックのARN>'
  aws_sns_topic_arn='<作成したIAMロールのARN>'
;

IAMユーザーARNとSNSトピック外部IDを取得

desc integration dummy_sns_notification;を実行して、SF_AWS_IAM_USER_ARNとSF_AWS_EXTERNAL_IDの値を記録する。

IAMロールの信頼関係を変更

作成したIAMロールの信頼ポリシーを以下を付け加え、更新する

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Sid": "",
      "Effect": "Allow",
      "Principal": {
        "AWS": "<SF_AWS_IAM_USER_ARNの値>"
      },
      "Action": "sts:AssumeRole",
      "Condition": {
        "StringEquals": {
          "sts:ExternalId": "<SF_AWS_EXTERNAL_IDの値>"
        }
      }
    }
  ]
}

Taskを作成

権限や構成などは↑のメール通知機能のケースと同様なので省略。Gmailとは違い、HTML機能を表示するには別途手順が必要そうなので、今回はJSON形式で通知する。

-- 各DAGについて、INFORMATION_SCHEMA.TASK_HISTORYから失敗したTASK情報取得の関数
create or replace function DEV_DB.WORK.GET_TASK_GRAPH_RUN_SUMMARY(MY_ROOT_TASK_ID string, MY_START_TIME timestamp_ltz)
returns string
as
$$
  (
    select
      OBJECT_CONSTRUCT(
        'TASK_NAME', NAME,
        'DATABASE_NAME', DATABASE_NAME,
        'SCHEMA_NAME', SCHEMA_NAME,
        'STARTED', QUERY_START_TIME,
        'DURATION', DURATION,
        'QUERY_ID', QUERY_ID,
        'ERROR_MESSAGE', ERROR_MESSAGE
      ) as GRAPH_RUN_SUMMARY
    from
    (
        select
            NAME,
            DATABASE_NAME,
            SCHEMA_NAME,
            to_varchar(QUERY_START_TIME, 'YYYY-MM-DD HH24:MI:SS') as QUERY_START_TIME,
            concat(timestampdiff('seconds', QUERY_START_TIME, COMPLETED_TIME), ' s') as DURATION,
            QUERY_ID,
            ERROR_MESSAGE
        from
            table(INFORMATION_SCHEMA.TASK_HISTORY(
                  ROOT_TASK_ID => MY_ROOT_TASK_ID::string,
                  SCHEDULED_TIME_RANGE_START => MY_START_TIME::timestamp_ltz,
                  SCHEDULED_TIME_RANGE_END => current_timestamp()
          ))
        where
            STATE = 'FAILED'
            -- 一番最後のトライで、最も上流のタスク
            and ATTEMPT_NUMBER = <リトライを考慮した最大の数>
        order by
            SCHEDULED_TIME asc nulls last
        limit 1
    )
  )::string
$$
;


-- create finalizer task
create or replace task dev_db.work.notify_finalizer
warehouse = '<warehouseの指定>'
finalize = dev_db.work.root_task
as
  declare
    MY_ROOT_TASK_ID string;
    MY_START_TIME timestamp_ltz;
    SUMMARY_JSON string;
    SUMMARY_HTML string;
  begin
    -- get root task ID
    MY_ROOT_TASK_ID := (call SYSTEM$TASK_RUNTIME_INFO('CURRENT_ROOT_TASK_UUID'));
   
    -- get root task scheduled time
    MY_START_TIME := (call SYSTEM$TASK_RUNTIME_INFO('CURRENT_TASK_GRAPH_ORIGINAL_SCHEDULED_TIMESTAMP'));
    
    -- combine all task run infos into one json string
    SUMMARY_JSON := (select GET_TASK_GRAPH_RUN_SUMMARY(:MY_ROOT_TASK_ID, :MY_START_TIME));
    
    if (length(:SUMMARY_JSON) > 0) then
        CALL SYSTEM$SEND_SNOWFLAKE_NOTIFICATION(
          SNOWFLAKE.NOTIFICATION.APPLICATION_JSON(:SUMMARY_JSON),
          SNOWFLAKE.NOTIFICATION.INTEGRATION('dummy_sns_notification')
        );
    end if;
 
  end;
;

DAGを実行

↑のメール通知と同じなので省略

結果

クラウドプロバイダよりメール通知のほうが、勝手にHTMLを良しなに表示してくれるので、見た目はそっちの方がいいかも。Amazon SNSでもHTML表示頑張れるかもしれないが、今回はスキップ。

まとめ・感想

  • Airflowの方が書いていて楽しいが、Snowflakeで完結させたいならタスクグラフの方がいいかもしれない
  • Slackなどに届けたい場合、メール通知機能ではわざわざユーザー作らなくてはいけないが、そんなことしたくないひとはAWS SNSサービスを用いてもいいかもしれない(Snowflakeでまとめて管理できるならメール通知の方がいいかも)
1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?