2
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

DatabricksAdvent Calendar 2024

Day 16

Delta Live Tablesのイベントフックによるカスタムモニタリング

Last updated at Posted at 2024-12-19

このような新機能ができているとは。

イベント フックを使用して、 イベント が Delta Live Tables パイプラインの イベント ログに永続化されるときに実行されるカスタム Python コールバック関数を追加できます。 イベントフックを使用して、カスタムのモニタリングとアラートのソリューションを実装できます。 たとえば、イベント フックを使用して、特定のイベントが発生したときに電子メールを送信したり、ログに書き込んだり、サードパーティのソリューションと統合してパイプライン イベントを監視したりできます。

プレビュー
本機能はパブリックプレビューです。

マニュアルのサンプルを動かしてみます。

その前に、SlackのワークスペースとSlackアプリ(適切なスコープchannels:joinchat:writeを設定)して、アプリをチャンネルに追加しておきます。で、Bot User OAuth Tokenをコピーします。ここで結構手間取りました...。

DLT動かす前に、APIを叩いて動作確認します。

curl -X POST 'https://slack.com/api/chat.postMessage' \
-d 'token=<Slack AppのBot User OAuth Token>' \
-d 'channel=#slack-integration-test' \
-d 'text=*テキスト*'

動きました。

{"ok":true,"channel":"C085QCA8UKF","ts":"1734597464.623469","message":{"user":"U085CRB91AB","type":"message","ts":"1734597464.623469","bot_id":"B085VSKG4R2","app_id":"A086G3WSRG8","text":"*\u30c6\u30ad\u30b9\u30c8*","team":"T085T4VJSP5","bot_profile":{"id":"B085VSKG4R2","app_id":"A086G3WSRG8","name":"Demo App","icons":{"image_36":"https:\/\/a.slack-edge.com\/80588\/img\/plugins\/app\/bot_36.png","image_48":"https:\/\/a.slack-edge.com\/80588\/img\/plugins\/app\/bot_48.png","image_72":"https:\/\/a.slack-edge.com\/80588\/img\/plugins\/app\/service_72.png"},"deleted":false,"updated":1734596342,"team_id":"T085T4VJSP5"},"blocks":[{"type":"rich_text","block_id":"D2s","elements":[{"type":"rich_text_section","elements":[{"type":"text","text":"\u30c6\u30ad\u30b9\u30c8","style":{"bold":true}}]}]}]}}

DLTのノートブックを実装します。

from dlt import table, on_event_hook, read
import requests
import json
import time

API_TOKEN = "<Slack AppのBot User OAuth Token>"
SLACK_POST_MESSAGE_URL = 'https://slack.com/api/chat.postMessage'
DEV_CHANNEL = '#slack-integration-test'
SLACK_HTTPS_HEADER_COMMON = {
 'Content-Type': 'application/json',
 'Authorization': 'Bearer ' + API_TOKEN
}


# 単一のデータセットを作成します。
@table
def test_dataset():
 return spark.range(5)


# イベントフックの定義。
@on_event_hook
def write_events_to_slack(event):
  res = requests.post(url=SLACK_POST_MESSAGE_URL, headers=SLACK_HTTPS_HEADER_COMMON, json = {
    'channel': DEV_CHANNEL,
    'text': 'Event hook triggered by event: ' + event['event_type'] + ' event.'
  })

このノートブックを指定してDLTパイプラインを作成して実行します。

パイプラインを実行すると画面下にイベントログが表示されますが、上の実装ではイベントをSlackチャンネルに投稿しています。パイプライン動かしている時点でSlackに通知が来ていました。

Slackチャンネルを見てみると、DLTイベントをフックしてメッセージが書き込まれています!面白い。

Screenshot 2024-12-19 at 17.43.43.jpeg

他のシステムとも連携できるので、データパイプラインの監視の幅が広がります。ご活用ください!

はじめてのDatabricks

はじめてのDatabricks

Databricks無料トライアル

Databricks無料トライアル

2
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?