この記事でやること
この記事ではPurviewで特定のクラスに分類されたイベントを検知してSlackに通知します。
準備
通知先
通知先はTeamsでもメールでも何でも良いのですが今回はSlackにしています。事前に通知用のチャネルを作ったりアクセス トークンを用意したりします。ここなどを参考にしています。
サンプルデータ
テストデータはStarter Kitにあるファイルを利用しています。今回はクレジットカード番号とメールアドレスに分類された際に通知をするのでそれらを含んだデータを使いました。
"Password","email","id","creditcard"
"b4c906d48f60cdf6a70801d9c9182793a478e1a3a2cb03542676a759db5f843f","ldorset7c@ucoz.com","5647518a-2a93-4611-9746-f050fd0a19d6","201907394460151"
"c8ee38f64609cdd6c8b80dcf888cc80d49fb25817e78dde5bc1094c42b4fa2d8","nweond2f@yale.edu","575d3815-3747-4aa8-94a6-3e32d9885d9c","5038685806838408"
"2d67e5d4eab9004f745c2eaff169be12b6be6a6816eac70b8497bcfefa092f08","fhowson1n@hhs.gov","ef678e73-0b90-4612-9ad0-00401c24b3c8","67616352246247000"
実装
Slackクライアントの設定
事前に用意したアクセス トークンと通知先のチャネルIDを指定します。
from slack_sdk import WebClient
SLACK_ACCESS_TOKEN = 'アクセス トークン'
CHANNEL_ID = 'チャネルID'
# Slackクライアントを作成
slack_client = WebClient(SLACK_ACCESS_TOKEN)
EventHubの利用設定
EventHubの接続文字列はPurviewアカウントのプロパティの"アトラス Kafka エンドポイントのプライマリ接続文字列"から入手できます。(Endpoint=sb://atlas-xxx
・・・的なやつ)
EVENTHUB_NAMEは"atlas_entities"固定です。
import sys
import asyncio
import nest_asyncio # Jupyter Notebookでのエラー対応
from pprint import *
from azure.eventhub.aio import EventHubConsumerClient
from azure.eventhub.extensions.checkpointstoreblobaio import BlobCheckpointStore
# Connection strings
EVENTHUB_CONNECTION_STR = 'EventHubの接続文字列'
EVENTHUB_NAME = 'atlas_entities'
STORAGE_CONNECTION_STR = 'ストレージアカウントの接続文字列'
BLOB_CONTAINER_NAME = 'コンテナ名'
EventHubから情報を取得した結果クレジットカード番号(MICROSOFT.FINANCIAL.CREDIT_CARD_NUMBER)とメールアドレス(MICROSOFT.PERSONAL.EMAIL)に該当したか判断するためのリストを設定。
サポートされる分類の一覧はここ。正式名称はPurview Studioから。
# 通知対象クラスの指定
class_list = ['MICROSOFT.FINANCIAL.CREDIT_CARD_NUMBER', 'MICROSOFT.PERSONAL.EMAIL']
メッセージの受信とSlackへの通知
async def on_event(partition_context, event):
event_data = event.body_as_json()
eventType = event_data['message']['operationType']
# クラスが追加もしくは更新された場合
if eventType == 'CLASSIFICATION_ADD' or eventType == 'CLASSIFICATION_UPDATE':
classificationName = event_data['message']['entity']['classificationNames'][0]
# 通知対象のクラスにマッチした場合
if classificationName in class_list:
message = f"""\
通知対象の情報({classificationName})が検出されました。
name={event_data['message']['entity']['attributes']['name']}
qualifiedName={event_data['message']['entity']['attributes']['qualifiedName']}
"""
# Slackに通知
slack_client.chat_postMessage(channel=CHANNEL_ID, text=message, icon_emoji=':whale:', username='Purview Notice')
await partition_context.update_checkpoint(event)
async def receive(client):
await client.receive(
on_event=on_event,
starting_position="-1", # "-1" is from the beginning of the partition.
)
async def main():
print("Starting stream...", file=sys.stderr)
checkpoint_store = BlobCheckpointStore.from_connection_string(STORAGE_CONNECTION_STR, BLOB_CONTAINER_NAME)
client = EventHubConsumerClient.from_connection_string(
EVENTHUB_CONNECTION_STR,
consumer_group="$Default",
eventhub_name=EVENTHUB_NAME,
checkpoint_store=checkpoint_store, # For load-balancing and checkpoint. Leave None for no load-balancing.
)
async with client:
await receive(client)
非同期処理がNotebookでエラーとなったので、nest_asyncio.apply()を呼んでいます。
if __name__ == '__main__':
print("Python process started.", file=sys.stderr)
nest_asyncio.apply()
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
参考
EventHubからの非同期のデータ受信なども含めてここが参考になります。