LoginSignup
0
0

More than 1 year has passed since last update.

Azure Purviewで特定クラスへの分類を検出した際に通知する

Posted at

この記事でやること

この記事ではPurviewで特定のクラスに分類されたイベントを検知してSlackに通知します。
image.png

準備

通知先

通知先はTeamsでもメールでも何でも良いのですが今回はSlackにしています。事前に通知用のチャネルを作ったりアクセス トークンを用意したりします。ここなどを参考にしています。

サンプルデータ

テストデータはStarter Kitにあるファイルを利用しています。今回はクレジットカード番号とメールアドレスに分類された際に通知をするのでそれらを含んだデータを使いました。

"Password","email","id","creditcard"
"b4c906d48f60cdf6a70801d9c9182793a478e1a3a2cb03542676a759db5f843f","ldorset7c@ucoz.com","5647518a-2a93-4611-9746-f050fd0a19d6","201907394460151"
"c8ee38f64609cdd6c8b80dcf888cc80d49fb25817e78dde5bc1094c42b4fa2d8","nweond2f@yale.edu","575d3815-3747-4aa8-94a6-3e32d9885d9c","5038685806838408"
"2d67e5d4eab9004f745c2eaff169be12b6be6a6816eac70b8497bcfefa092f08","fhowson1n@hhs.gov","ef678e73-0b90-4612-9ad0-00401c24b3c8","67616352246247000"

Purviewでスキャンするとこんな感じになります。
image.png

実装

Slackクライアントの設定

事前に用意したアクセス トークンと通知先のチャネルIDを指定します。

from slack_sdk import WebClient
SLACK_ACCESS_TOKEN = 'アクセス トークン'
CHANNEL_ID = 'チャネルID'

# Slackクライアントを作成
slack_client = WebClient(SLACK_ACCESS_TOKEN)

EventHubの利用設定

EventHubの接続文字列はPurviewアカウントのプロパティの"アトラス Kafka エンドポイントのプライマリ接続文字列"から入手できます。(Endpoint=sb://atlas-xxx
・・・的なやつ)
EVENTHUB_NAMEは"atlas_entities"固定です。

import sys
import asyncio
import nest_asyncio # Jupyter Notebookでのエラー対応
from pprint import *
from azure.eventhub.aio import EventHubConsumerClient
from azure.eventhub.extensions.checkpointstoreblobaio import BlobCheckpointStore

# Connection strings
EVENTHUB_CONNECTION_STR = 'EventHubの接続文字列'
EVENTHUB_NAME = 'atlas_entities'
STORAGE_CONNECTION_STR = 'ストレージアカウントの接続文字列'
BLOB_CONTAINER_NAME = 'コンテナ名'

EventHubから情報を取得した結果クレジットカード番号(MICROSOFT.FINANCIAL.CREDIT_CARD_NUMBER)とメールアドレス(MICROSOFT.PERSONAL.EMAIL)に該当したか判断するためのリストを設定。
サポートされる分類の一覧はここ。正式名称はPurview Studioから。

# 通知対象クラスの指定
class_list = ['MICROSOFT.FINANCIAL.CREDIT_CARD_NUMBER', 'MICROSOFT.PERSONAL.EMAIL']

メッセージの受信とSlackへの通知

async def on_event(partition_context, event):
    event_data = event.body_as_json()
    eventType = event_data['message']['operationType']

    # クラスが追加もしくは更新された場合
    if eventType == 'CLASSIFICATION_ADD' or eventType == 'CLASSIFICATION_UPDATE':
        classificationName = event_data['message']['entity']['classificationNames'][0]
        # 通知対象のクラスにマッチした場合
        if classificationName in class_list:
            message = f"""\
通知対象の情報({classificationName})が検出されました。
  name={event_data['message']['entity']['attributes']['name']}
  qualifiedName={event_data['message']['entity']['attributes']['qualifiedName']}
            """

            # Slackに通知
            slack_client.chat_postMessage(channel=CHANNEL_ID, text=message, icon_emoji=':whale:', username='Purview Notice')

    await partition_context.update_checkpoint(event)
async def receive(client):
    await client.receive(
        on_event=on_event,
        starting_position="-1",  # "-1" is from the beginning of the partition.
    )
async def main():
    print("Starting stream...", file=sys.stderr)
    checkpoint_store = BlobCheckpointStore.from_connection_string(STORAGE_CONNECTION_STR, BLOB_CONTAINER_NAME)
    client = EventHubConsumerClient.from_connection_string(
        EVENTHUB_CONNECTION_STR,
        consumer_group="$Default",
        eventhub_name=EVENTHUB_NAME,
        checkpoint_store=checkpoint_store,  # For load-balancing and checkpoint. Leave None for no load-balancing.
    )
    async with client:
        await receive(client)

非同期処理がNotebookでエラーとなったので、nest_asyncio.apply()を呼んでいます。

if __name__ == '__main__':
    print("Python process started.", file=sys.stderr)
    nest_asyncio.apply()
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

参考

EventHubからの非同期のデータ受信なども含めてここが参考になります。

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0