0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Microsoft Fabric Real-time Intelligence を使用して Amazon Kinesis Data Streams のデータを分析する

Last updated at Posted at 2024-09-18

はじめに

Microsoft Fabic の Real-Time Intelligence を使用して、 Amazon Kinesis Data Streams のデータを分析してみます。

Real-Time Intelligence とは

Apache Kafka , Azure Event Hub などのストリーム処理に不可欠なブローカーや、データベース変更イベントからストリームデータを取得してリアルタイム処理/ストア・分析/可視化/監視・アクションを行うためのエクスペリエンスです。

image.png

主なユースケースとして、IoT や、ログ解析があります。

image.png

保存先となるイベントハウスには Azure Data Explorer , Microsoft Sentinel, Azure Monitor
など、さまざまな Microsoft 製品で実証済みのログ分析エンジンである Kusto が使用されています。
これにより、非構造なログ系のデータ から 構造化データ まで、大規模なデータ低遅延 かつ 高速に 分析することが可能です。

Amazon Kinesis Data Streams のリアルタイム分析

以下の図のようなアーキテクチャでシステムを構成します。

image.png

⓪. サンプルデータ送信:

  • Fabric Spark ノートブックを使用して、サンプルデータの生成および送信を行います。

①. ストリームデータの取込み:

②. 保管とデータ探索:

  • イベントハウス にデータをストリーム処理で書き込み、クエリを通じてデータの探索と分析を行います。

③. データの可視化:

④. データの監視:

  • Reflex によってストリームデータを監視し、設定した閾値に基づく通知を構成します。

準備

Kinesis Data Streams と IAM ユーザーのセットアップ

AWS コンソール上で データストリームを作成します。

image.png

データストリームへの書き込みと読み取りを行う IAM ユーザーを作成し、ポリシーを割り当てます。
今回は以下のようにポリシーを構成しました。

実運用では、条件指定や、読取と書き込みのポリシーを分離するなど検討ください。


{
	"Version": "2012-10-17",
	"Statement": [
		{
			"Sid": "VisualEditor0",
			"Effect": "Allow",
			"Action": [
				"kinesis:PutRecord",
				"kinesis:PutRecords",
				"kinesis:ListStreams",
				"kinesis:GetRecords",
				"kinesis:GetShardIterator",
				"kinesis:DescribeStream",
				"kinesis:ListShards"
			],
			"Resource": "*"
		}
	]
}


参考

  • 書き込みで必要な権限:"kinesis:PutRecord","kinesis:PutRecords","kinesis:ListStreams"
  • 読み取りで必要な権限:"kinesis:GetRecords", "kinesis:GetShardIterator","kinesis:DescribeStream","kinesis:ListShards"

※一部余分があるかもしれません

Fabric から Kinesis Data Streams への認証方法は現在のところ、IAM ユーザーによるアクセスキー(シークレットID)とシークレットキーによる基本認証のみなので、これを記録しておきます。

image.png

⓪サンプルデータ送信

ノートブックを実行するための環境を作成します。(ノートブック上で pip コマンドを実行する場合には必須ではありません。)

image.png

aws にアクセスするための boto3 と、データ生成用の faker をインストール対象にし、公開します。

image.png

環境が公開されたら、ノートブックを作成して、環境を指定します。

image.png

image.png

ノートブックによりデータを送信します。

最初に IAM ユーザー用の情報を記載します。

今回のような簡単なデモ用途以外では、 Key Vault にシークレット情報を保存して、 Fabric 用 Microsoft Spark Utilities の資格情報ユーティリティ を使用して取得するようにしてください。

python

import os

# Set environment variables
os.environ['AWS_ACCESS_KEY_ID'] = 'アクセスキーID'
os.environ['AWS_SECRET_ACCESS_KEY'] = 'シークレットキー'


image.png

環境を作成しなかった場合には、以下のようにノートブック上でインラインインストールしておきましょう

python

%pip install faker
%pip install boto3

次のセルで、boto3 による Kinesis クライアントの生成と、データ送信用の関数を定義します。
毎秒データが送信され、100 秒に一回、異常な温度が報告されるようになっています。

stream_name には作成したストリーム名を指定してください。

python

import boto3
import json
import random
import time
from datetime import datetime
from faker import Faker
import logging

# Fakerインスタンスを作成
fake = Faker()

# Kinesisクライアントを作成
client = boto3.client('kinesis', region_name='ap-northeast-3')
stream_name = 'fabric-stream-source'

cities = [
    {"city": "Tokyo", "country": "JP", "latitude": "35.682839", "longitude": "139.759455"},
    {"city": "New York", "country": "US", "latitude": "40.712776", "longitude": "-74.005974"},
    {"city": "Paris", "country": "FR", "latitude": "48.856613", "longitude": "2.352222"}
]

def generate_sensor_event_data(event_count):
    """
    センサーイベントデータを生成します。100回に1回、異常な温度データを生成します。
    
    Parameters:
    - event_count: 現在のイベントの数(異常データを挿入するためのカウンター)
    
    Returns:
    - event_data: 生成されたセンサーイベントデータ
    """
    
    # 100回に1回異常な温度データを生成
    if event_count % 100 == 0:
        # 異常な温度範囲(例えば、-10°C や 50°C)
        temperature = round(random.choice([-10.0, 50.0]), 2)
    else:
        # 通常の温度範囲(15.0°C ~ 30.0°C)
        temperature = round(random.uniform(15.0, 30.0), 2)

    # 3つの都市からランダムに選択
    location_data = random.choice(cities)

    return {
        "event_id": fake.uuid4(),
        "timestamp": datetime.now().isoformat(),
        "temperature": temperature,
        "humidity": round(random.uniform(30.0, 80.0), 2),
        "pressure": round(random.uniform(950.0, 1050.0), 2),
        "latitude": location_data["latitude"],  # 緯度
        "longitude": location_data["longitude"],  # 経度
        "city": location_data["city"],  # 都市名
        "country": location_data["country"]  # 国名
    }

def sendToKinesis(client, stream_name, data):
    """
    Kinesisへデータを送信する関数
    
    Parameters:
    - client: Kinesisクライアント
    - stream_name: Kinesisのストリーム名
    - data: 送信するデータ
    """
    try:
        serialized_data = json.dumps(data)  # データをJSONにシリアライズ
        client.put_record(
            StreamName=stream_name,
            PartitionKey=str(random.randrange(0, 100)),
            Data=serialized_data
        )
    except Exception as e:
        logging.error(f"Failed to send data to Kinesis: {e}")

def generateEvents(client, stream_name, total_events=1800):
    """
    センサーイベントを生成し、Kinesisに送信する関数
    
    Parameters:
    - client: Kinesisクライアント
    - stream_name: Kinesisのストリーム名
    - total_events: 生成するイベントの総数
    """
    try:
        for event_count in range(1, total_events + 1):
            event = generate_sensor_event_data(event_count)  # 異常値を含むセンサーデータを生成
            sendToKinesis(client, stream_name, event)  # Kinesisに送信
            print(event)
            time.sleep(1)  # データ送信間隔(1秒ごと)
    except KeyboardInterrupt:
        logging.info("Process interrupted by user.")






イベント生成を開始します。

python

print(datetime.now())

# イベント生成を開始 (例: 1800件=30分間イベントを生成)
generateEvents(client, stream_name)

print(datetime.now())

image.png

実行したら、AWS コンソールからも確認しておきましょう

image.png

image.png

①ストリームデータの取込み

リアルタイムハブからストリームデータに接続します。

image.png

image.png

はじめて接続する場合には、接続の作成が必要です。

image.png

ストリームの存在する AWS リージョンと、この作成するイベントストリームの名前を指定します。

image.png

内容を確認して作成を開始します。

image.png

image.png

作成されたイベントストリームに移動して、取得したストリームの内容を確認します。

image.png

②保管、データ探索

データを分析するためのイベントハウスを作成し、イベントストリームに接続します。

image.png

image.png

データベースのOneLake availability を有効にすると イベントハウス上のデータベースから OneLake にデータが同期され、レイクハウスショートカットで利用できるようになります。

image.png

イベントストリームに戻り、あて先としてこのイベントハウスを指定します。

image.png

image.png

image.png

構成が完了したら発行します。

image.png

初回の発行では、イベントハウスとのデータ接続を構成していきます。

image.png

新しく作成されるテーブル名を指定

image.png

プレビューを確認して終了すると、接続が構成されます。

image.png

image.png

少し経つとデータがテーブルに投入されていることが確認できます。

image.png

サンプルクエリでデータを探索し、クエリを保存しておきます。

image.png

image.png

kql クエリの開発画面に移動するので、ここで1分前の温度の変動に関する列を追加したクエリを実行します。

kql

faker_events
| where ingestion_time() between (now(-10m) .. now())
| partition by city
(
   order by timestamp desc
    | extend prev_1min_temperature = next(temperature, 60)
)
| extend percent_difference_1min_temperature_ = round((round(temperature - prev_1min_temperature, 2) / prev_1min_temperature )*100 , 2) 
| extend abs_temperature_prev_1min = abs(prev_1min_temperature),abs_temperature_percent_difference_1min = abs(percent_difference_1min_temperature_)

image.png

③データの可視化

リアルタイムダッシュボードを使用して、データを可視化します。

クエリを選択している状態から、ダッシュボードにピン止めボタンからダッシュボードの作成が可能です。

image.png

同様に このクエリから Power BI レポートも作成可能です。Office 製品との統合や、Power BI ビジュアルの要求がある場合にはこちらを使いましょう。

image.png

image.png

ダッシュボードが表示されたら、編集画面に移動します。

image.png

ダッシュボードの時間範囲スライサーを機能させるために、ベースクエリを作成します。

image.png

ベースクエリは複数のビジュアル間で共用できるデータセットの役割を果たします。赤枠のように、ダッシュボードの時間範囲スライサーと連動したパラメータがあらかじめ用意してあり、これを使ったベースクエリにします。

image.png

ビジュアルのクエリがベースクエリを参照するように変更します。

image.png

image.png

時間範囲スライサを変更すると、テーブルがフィルタされていることがわかります。(見やすくするためにサイズを調整しました。)

Image from Gyazo

タイルを複製し、他の可視化も作成します。

image.png

ビジュアルの種類などを調整して、時系列グラフを作成します。

image.png

他にも様々なグラフを選択できます。

image.png

image.png

同様に、最新の変動率を示すタイルとして設定します。

image.png

kql

_base_kinsis_events
| summarize arg_max(timestamp,percent_difference_1min_temperature_) by city
| order by city

さらに、条件付き書式を指定します。

image.png

30% を超える増減があった場合に条件付き書式が作用するようにしました。

image.png

image.png

位置を調整してダッシュボードの外観を整えます。

image.png

最後に更新間隔を調整して、完成です。

image.png

継続的更新を有効にして動くダッシュボードにしてみました。

Image from Gyazo

④データの監視

最後にデータ監視を構成します。

イベントストリームに戻り、Reflec へのあて先を追加します。

image.png

image.png

あて先でレイクハウスを追加することで、ラムダアーキテクチャ を構成可能です。

image.png

image.png

refelx の編集画面に移動して、データビューを確認します。

image.png

image.png

監視対象のプロパティとして、温度、湿度、気圧を選択して、デザインモードに移動します。

image.png

city ごとの情報が可視化されます。

イベントの発生

image.png

湿度

image.png

気圧

image.png

温度

image.png

次に、トリガーを作成して、変動率による通知を構成します。

監視対象のプロパティの選択

image.png

20 度の変動があった場合に検出

image.png

様々な条件を指定できます。

image.png

アクションによりTeams の通知を構成

image.png

Teams,E-mail,Fabric パイプラインの起動,Power Automate(要プレミアムコネクタ)などが選択できます。

image.png

テストアラートを確認

image.png

image.png

保存および開始を選択すると、データの監視がスタートし、アクションが実際に実行されるようになります。

image.png

以上で Anazon Kinesis Data Streams をソースに、Real-time Intelligence でのリアルタイム処理/ストア・分析/可視化/監視・アクションの流れを構成することができました。

参考

以下を参考にさせていただきました。ありがとうございました。

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?