はじめに
Microsoft Fabic の Real-Time Intelligence を使用して、 Amazon Kinesis Data Streams のデータを分析してみます。
Real-Time Intelligence とは
Apache Kafka , Azure Event Hub などのストリーム処理に不可欠なブローカーや、データベース変更イベントからストリームデータを取得してリアルタイム処理/ストア・分析/可視化/監視・アクションを行うためのエクスペリエンスです。
主なユースケースとして、IoT や、ログ解析があります。
保存先となるイベントハウスには Azure Data Explorer , Microsoft Sentinel, Azure Monitor
など、さまざまな Microsoft 製品で実証済みのログ分析エンジンである Kusto が使用されています。
これにより、非構造なログ系のデータ から 構造化データ まで、大規模なデータ を 低遅延 かつ 高速に 分析することが可能です。
Amazon Kinesis Data Streams のリアルタイム分析
以下の図のようなアーキテクチャでシステムを構成します。
⓪. サンプルデータ送信:
- Fabric Spark ノートブックを使用して、サンプルデータの生成および送信を行います。
①. ストリームデータの取込み:
- イベントストリーム を用いて、ストリームデータを取り込みます。イベントストリームは、リアルタイムハブ から Amazon Kinesis Data Streams への接続を構成することでセットアップします。
②. 保管とデータ探索:
- イベントハウス にデータをストリーム処理で書き込み、クエリを通じてデータの探索と分析を行います。
③. データの可視化:
- リアルタイムダッシュボード を使用して、リアルタイムでデータを可視化します。
④. データの監視:
- Reflex によってストリームデータを監視し、設定した閾値に基づく通知を構成します。
準備
Kinesis Data Streams と IAM ユーザーのセットアップ
AWS コンソール上で データストリームを作成します。
データストリームへの書き込みと読み取りを行う IAM ユーザーを作成し、ポリシーを割り当てます。
今回は以下のようにポリシーを構成しました。
実運用では、条件指定や、読取と書き込みのポリシーを分離するなど検討ください。
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "VisualEditor0",
"Effect": "Allow",
"Action": [
"kinesis:PutRecord",
"kinesis:PutRecords",
"kinesis:ListStreams",
"kinesis:GetRecords",
"kinesis:GetShardIterator",
"kinesis:DescribeStream",
"kinesis:ListShards"
],
"Resource": "*"
}
]
}
参考
- 書き込みで必要な権限:"kinesis:PutRecord","kinesis:PutRecords","kinesis:ListStreams"
- 読み取りで必要な権限:"kinesis:GetRecords", "kinesis:GetShardIterator","kinesis:DescribeStream","kinesis:ListShards"
※一部余分があるかもしれません
Fabric から Kinesis Data Streams への認証方法は現在のところ、IAM ユーザーによるアクセスキー(シークレットID)とシークレットキーによる基本認証のみなので、これを記録しておきます。
⓪サンプルデータ送信
ノートブックを実行するための環境を作成します。(ノートブック上で pip コマンドを実行する場合には必須ではありません。)
aws にアクセスするための boto3 と、データ生成用の faker をインストール対象にし、公開します。
環境が公開されたら、ノートブックを作成して、環境を指定します。
ノートブックによりデータを送信します。
最初に IAM ユーザー用の情報を記載します。
今回のような簡単なデモ用途以外では、 Key Vault にシークレット情報を保存して、 Fabric 用 Microsoft Spark Utilities の資格情報ユーティリティ を使用して取得するようにしてください。
import os
# Set environment variables
os.environ['AWS_ACCESS_KEY_ID'] = 'アクセスキーID'
os.environ['AWS_SECRET_ACCESS_KEY'] = 'シークレットキー'
環境を作成しなかった場合には、以下のようにノートブック上でインラインインストールしておきましょう
%pip install faker
%pip install boto3
次のセルで、boto3 による Kinesis クライアントの生成と、データ送信用の関数を定義します。
毎秒データが送信され、100 秒に一回、異常な温度が報告されるようになっています。
stream_name には作成したストリーム名を指定してください。
import boto3
import json
import random
import time
from datetime import datetime
from faker import Faker
import logging
# Fakerインスタンスを作成
fake = Faker()
# Kinesisクライアントを作成
client = boto3.client('kinesis', region_name='ap-northeast-3')
stream_name = 'fabric-stream-source'
cities = [
{"city": "Tokyo", "country": "JP", "latitude": "35.682839", "longitude": "139.759455"},
{"city": "New York", "country": "US", "latitude": "40.712776", "longitude": "-74.005974"},
{"city": "Paris", "country": "FR", "latitude": "48.856613", "longitude": "2.352222"}
]
def generate_sensor_event_data(event_count):
"""
センサーイベントデータを生成します。100回に1回、異常な温度データを生成します。
Parameters:
- event_count: 現在のイベントの数(異常データを挿入するためのカウンター)
Returns:
- event_data: 生成されたセンサーイベントデータ
"""
# 100回に1回異常な温度データを生成
if event_count % 100 == 0:
# 異常な温度範囲(例えば、-10°C や 50°C)
temperature = round(random.choice([-10.0, 50.0]), 2)
else:
# 通常の温度範囲(15.0°C ~ 30.0°C)
temperature = round(random.uniform(15.0, 30.0), 2)
# 3つの都市からランダムに選択
location_data = random.choice(cities)
return {
"event_id": fake.uuid4(),
"timestamp": datetime.now().isoformat(),
"temperature": temperature,
"humidity": round(random.uniform(30.0, 80.0), 2),
"pressure": round(random.uniform(950.0, 1050.0), 2),
"latitude": location_data["latitude"], # 緯度
"longitude": location_data["longitude"], # 経度
"city": location_data["city"], # 都市名
"country": location_data["country"] # 国名
}
def sendToKinesis(client, stream_name, data):
"""
Kinesisへデータを送信する関数
Parameters:
- client: Kinesisクライアント
- stream_name: Kinesisのストリーム名
- data: 送信するデータ
"""
try:
serialized_data = json.dumps(data) # データをJSONにシリアライズ
client.put_record(
StreamName=stream_name,
PartitionKey=str(random.randrange(0, 100)),
Data=serialized_data
)
except Exception as e:
logging.error(f"Failed to send data to Kinesis: {e}")
def generateEvents(client, stream_name, total_events=1800):
"""
センサーイベントを生成し、Kinesisに送信する関数
Parameters:
- client: Kinesisクライアント
- stream_name: Kinesisのストリーム名
- total_events: 生成するイベントの総数
"""
try:
for event_count in range(1, total_events + 1):
event = generate_sensor_event_data(event_count) # 異常値を含むセンサーデータを生成
sendToKinesis(client, stream_name, event) # Kinesisに送信
print(event)
time.sleep(1) # データ送信間隔(1秒ごと)
except KeyboardInterrupt:
logging.info("Process interrupted by user.")
イベント生成を開始します。
print(datetime.now())
# イベント生成を開始 (例: 1800件=30分間イベントを生成)
generateEvents(client, stream_name)
print(datetime.now())
実行したら、AWS コンソールからも確認しておきましょう
①ストリームデータの取込み
リアルタイムハブからストリームデータに接続します。
はじめて接続する場合には、接続の作成が必要です。
ストリームの存在する AWS リージョンと、この作成するイベントストリームの名前を指定します。
内容を確認して作成を開始します。
作成されたイベントストリームに移動して、取得したストリームの内容を確認します。
②保管、データ探索
データを分析するためのイベントハウスを作成し、イベントストリームに接続します。
イベントストリームに戻り、あて先としてこのイベントハウスを指定します。
構成が完了したら発行します。
初回の発行では、イベントハウスとのデータ接続を構成していきます。
新しく作成されるテーブル名を指定
プレビューを確認して終了すると、接続が構成されます。
少し経つとデータがテーブルに投入されていることが確認できます。
サンプルクエリでデータを探索し、クエリを保存しておきます。
kql クエリの開発画面に移動するので、ここで1分前の温度の変動に関する列を追加したクエリを実行します。
faker_events
| where ingestion_time() between (now(-10m) .. now())
| partition by city
(
order by timestamp desc
| extend prev_1min_temperature = next(temperature, 60)
)
| extend percent_difference_1min_temperature_ = round((round(temperature - prev_1min_temperature, 2) / prev_1min_temperature )*100 , 2)
| extend abs_temperature_prev_1min = abs(prev_1min_temperature),abs_temperature_percent_difference_1min = abs(percent_difference_1min_temperature_)
③データの可視化
リアルタイムダッシュボードを使用して、データを可視化します。
クエリを選択している状態から、ダッシュボードにピン止めボタンからダッシュボードの作成が可能です。
ダッシュボードが表示されたら、編集画面に移動します。
ダッシュボードの時間範囲スライサーを機能させるために、ベースクエリを作成します。
ベースクエリは複数のビジュアル間で共用できるデータセットの役割を果たします。赤枠のように、ダッシュボードの時間範囲スライサーと連動したパラメータがあらかじめ用意してあり、これを使ったベースクエリにします。
ビジュアルのクエリがベースクエリを参照するように変更します。
時間範囲スライサを変更すると、テーブルがフィルタされていることがわかります。(見やすくするためにサイズを調整しました。)
タイルを複製し、他の可視化も作成します。
ビジュアルの種類などを調整して、時系列グラフを作成します。
同様に、最新の変動率を示すタイルとして設定します。
_base_kinsis_events
| summarize arg_max(timestamp,percent_difference_1min_temperature_) by city
| order by city
さらに、条件付き書式を指定します。
30% を超える増減があった場合に条件付き書式が作用するようにしました。
位置を調整してダッシュボードの外観を整えます。
最後に更新間隔を調整して、完成です。
継続的更新を有効にして動くダッシュボードにしてみました。
④データの監視
最後にデータ監視を構成します。
イベントストリームに戻り、Reflec へのあて先を追加します。
refelx の編集画面に移動して、データビューを確認します。
監視対象のプロパティとして、温度、湿度、気圧を選択して、デザインモードに移動します。
city ごとの情報が可視化されます。
イベントの発生
湿度
気圧
温度
次に、トリガーを作成して、変動率による通知を構成します。
監視対象のプロパティの選択
20 度の変動があった場合に検出
アクションによりTeams の通知を構成
テストアラートを確認
保存および開始を選択すると、データの監視がスタートし、アクションが実際に実行されるようになります。
以上で Anazon Kinesis Data Streams をソースに、Real-time Intelligence でのリアルタイム処理/ストア・分析/可視化/監視・アクションの流れを構成することができました。
参考
以下を参考にさせていただきました。ありがとうございました。
-
サンプルデータ生成:MS Fabric - Real time analytics Tutorial
-
Kinesis への書き込み:Python3の対話シェル上でKinesis Streamsにレコードを書き込む