このような新機能ができているとは。
イベント フックを使用して、 イベント が Delta Live Tables パイプラインの イベント ログに永続化されるときに実行されるカスタム Python コールバック関数を追加できます。 イベントフックを使用して、カスタムのモニタリングとアラートのソリューションを実装できます。 たとえば、イベント フックを使用して、特定のイベントが発生したときに電子メールを送信したり、ログに書き込んだり、サードパーティのソリューションと統合してパイプライン イベントを監視したりできます。
プレビュー
本機能はパブリックプレビューです。
マニュアルのサンプルを動かしてみます。
その前に、SlackのワークスペースとSlackアプリ(適切なスコープchannels:join
とchat:write
を設定)して、アプリをチャンネルに追加しておきます。で、Bot User OAuth Tokenをコピーします。ここで結構手間取りました...。
DLT動かす前に、APIを叩いて動作確認します。
curl -X POST 'https://slack.com/api/chat.postMessage' \
-d 'token=<Slack AppのBot User OAuth Token>' \
-d 'channel=#slack-integration-test' \
-d 'text=*テキスト*'
動きました。
{"ok":true,"channel":"C085QCA8UKF","ts":"1734597464.623469","message":{"user":"U085CRB91AB","type":"message","ts":"1734597464.623469","bot_id":"B085VSKG4R2","app_id":"A086G3WSRG8","text":"*\u30c6\u30ad\u30b9\u30c8*","team":"T085T4VJSP5","bot_profile":{"id":"B085VSKG4R2","app_id":"A086G3WSRG8","name":"Demo App","icons":{"image_36":"https:\/\/a.slack-edge.com\/80588\/img\/plugins\/app\/bot_36.png","image_48":"https:\/\/a.slack-edge.com\/80588\/img\/plugins\/app\/bot_48.png","image_72":"https:\/\/a.slack-edge.com\/80588\/img\/plugins\/app\/service_72.png"},"deleted":false,"updated":1734596342,"team_id":"T085T4VJSP5"},"blocks":[{"type":"rich_text","block_id":"D2s","elements":[{"type":"rich_text_section","elements":[{"type":"text","text":"\u30c6\u30ad\u30b9\u30c8","style":{"bold":true}}]}]}]}}
DLTのノートブックを実装します。
from dlt import table, on_event_hook, read
import requests
import json
import time
API_TOKEN = "<Slack AppのBot User OAuth Token>"
SLACK_POST_MESSAGE_URL = 'https://slack.com/api/chat.postMessage'
DEV_CHANNEL = '#slack-integration-test'
SLACK_HTTPS_HEADER_COMMON = {
'Content-Type': 'application/json',
'Authorization': 'Bearer ' + API_TOKEN
}
# 単一のデータセットを作成します。
@table
def test_dataset():
return spark.range(5)
# イベントフックの定義。
@on_event_hook
def write_events_to_slack(event):
res = requests.post(url=SLACK_POST_MESSAGE_URL, headers=SLACK_HTTPS_HEADER_COMMON, json = {
'channel': DEV_CHANNEL,
'text': 'Event hook triggered by event: ' + event['event_type'] + ' event.'
})
このノートブックを指定してDLTパイプラインを作成して実行します。
パイプラインを実行すると画面下にイベントログが表示されますが、上の実装ではイベントをSlackチャンネルに投稿しています。パイプライン動かしている時点でSlackに通知が来ていました。
Slackチャンネルを見てみると、DLTイベントをフックしてメッセージが書き込まれています!面白い。
他のシステムとも連携できるので、データパイプラインの監視の幅が広がります。ご活用ください!