LoginSignup
2
2

More than 1 year has passed since last update.

#1 IoT Hub + Stream Analytics + PowerBI でストリーミングデータ分析してみました

Last updated at Posted at 2021-06-16

STEP-1.Azure IoT Hub の設定とデータ受信のモニタリング

概要

データは鮮度が命! ということで、データストリーミングを手軽にAzureで実装をおこなってみました。構成は、Azureの 「IoT Hub + Stream Analytics + PowerBI」となります。 IoT疑似データ生成プログラムをPythonで実装し IoT Hub にデータを投げ、Stream Analytics でストリーミング+クエリ処理を行い、そのクエリ結果=分析情報をリアルタイムにPowerBIに表示してみました。

データストリーミングのデファクトは「Apache kafka(商用版:Confluent)」で、「Azure HDInsight」で Apache Kafak の実装を行えますが、それより簡易にストリーミング処理を行うためのものとなります。

本題の構成は以下となります。
image.png

  • 以下の2つのステップで順次説明します。今回は STEP-1 について説明します。
    • STEP-1.Azure IoT Hub の設定とデータ受信のモニタリング
    • STEP-2.Azure Stream Analytics と PowerBI の 設定 とリアルタイム分析の表示

ローカル環境

macOS Big Sur 11.3
python 3.8.3


IoT Hub の設定

IoT Hub の作成

Azure Portalにログインし、IoT Hubを作成します。 IoT Hub 名に「iothub-ituru-stream-test」を指定します。
image.png

スケーリングレベルとユニット等は以下の画面通りの設定を行います。
image.png

画面下部の「確認および作成」ボタンを押し、その後の検証に成功しているのであれば、画面下部の「作成」ボタンを押します。

IoT デバイス の登録

作成された IoT Hub「iothub-ituru-stream-test」画面の左側から「IoTデバイス」を選択後、画面上部の「+新規」を選択します。
image.png

新たに表示される「デバイスの作成」画面において、「デバイスID」に「dummy-device-1」を入力し、画面下部の「保存」ボタンを押します。
image.png

次に、画面の左側から「組み込みのエンドポイント」を選択後、「コンシューマグループ」に「consumerdummy」を入力します。
image.png

最後に、画面の左側から「IoTデバイス」を選択後、作成した「dummy-device-1」を選択後、表示される「プライマリ接続文字列」を控えておき、ローカルの環境変数「IOTHUB_CONNECTION_STRING」として定義しておきます(IoT疑似データ生成プログラムで使用します)。
image.png


IoTデバイスとしてのプログラム

今回は、IoTデバイスとして「IoT疑似データ生成プログラム」をPythonで作成し、手動実行することによりIoTデータを生成させます。
プログラムを実行させるために以下のライブラリをインストールしておきます。

pip install azure-iot-device

実行させるプログラムは以下となります。

IoTSampleData-v3.py
import random
import json
import time
from datetime import date, datetime
import argparse
import string
from faker.factory import Factory
import os
from azure.iot.device import IoTHubDeviceClient, Message


# IoT Hub 接続文字列の取得
IOTHUB_CONNECTION_STRING = os.environ['IOTHUB_CONNECTION_STRING']

# ダミーデータ作成のための Faker の使用
Faker = Factory.create
fake = Faker()
fake = Faker("ja_JP")

# IoT機器のダミーセクション(大文字アルファベットを定義)
section = string.ascii_uppercase


# IoT機器で送信JSONデータの作成
def iot_json_data(count, proc):
    iot_items = json.dumps({
        'items': [{
            'id': i,                            # id
            'time': generate_time(),            # データ生成時間
            'proc': proc,                       # データ生成プロセス名
            'section': random.choice(section),  # IoT機器セクション
            'iot_num': fake.zipcode(),          # IoT機器番号
            'iot_state': fake.prefecture(),     # IoT設置場所
            'vol_1': random.uniform(100, 200),  # IoT値−1
            'vol_2': random.uniform(50, 90)     # IoT値−2
            } 
            for i in range(count)
        ]
    }, ensure_ascii=False).encode('utf-8')
    return iot_items


# IoT機器で計測されたダミーデータの生成時間
def generate_time():
    dt_time = datetime.now()
    gtime = json_trans_date(dt_time)
    return gtime

# date, datetimeの変換関数
def json_trans_date(obj):
    # 日付型を文字列に変換
    if isinstance(obj, (datetime, date)):
        return obj.isoformat()
    # 上記以外は対象外.
    raise TypeError ("Type %s not serializable" % type(obj))


# メイン : ターミナル出力用
def tm_main(count, proc, wait):
    print('ターミナル 出力')
    iotjsondata = iot_json_data(count, proc)
    json_dict = json.loads(iotjsondata)

    # pprint.pprint(json_dict)
    for item in json_dict['items']:
        # print(json.dumps(item).encode('utf-8'))
        print(item)
        time.sleep(wait)


# メイン : IoTHub 出力用
def iothub_main(count, proc, wait):
    print('IoTHub 出力')
    iotjsondata = iot_json_data(count, proc)
    json_dict = json.loads(iotjsondata)

    client = iothub_client_init()
    for item in json_dict['items']:
        message = Message(json.dumps(item).encode('utf-8'))

        try:
            print("Sending message: {}".format(message))
            client.send_message(message)
            print ("Message sent successfully" )
            time.sleep(wait)
        except KeyError:
            print ("Message sent Error!!!" )
            break
        except KeyboardInterrupt:
            print ( "IoTHubClient Interrupt Stopped" )
            break

# Create an IoT Hub client(IoT Hub への接続)
def iothub_client_init():
    client = IoTHubDeviceClient.create_from_connection_string(IOTHUB_CONNECTION_STRING)
    return client


if __name__ == '__main__':
    parser = argparse.ArgumentParser(description='IoT機器のなんちゃってダミーデータの生成')
    parser.add_argument('--count', type=int, default=10, help='データ作成件数')
    parser.add_argument('--proc', type=str, default='111', help='データ作成プロセス名')
    parser.add_argument('--mode', type=str, default='tm', help='tm(ターミナル出力)/ ih(IoTHub出力)')
    parser.add_argument('--wait', type=float, default=1, help='データWait時間(x.x秒)')
    args = parser.parse_args()

    start = time.time()

    if (args.mode == 'ih'): 
        iothub_main(args.count, args.proc, args.wait)
    else :
        tm_main(args.count, args.proc, args.wait)

    making_time = time.time() - start

    print("")
    print(f"データ作成件数:{args.count}")
    print("データ作成時間:{0}".format(making_time) + " [sec]")
    print("")

IoT Hub でのデータ受信モニタリング

まず最初に IoT Hub の状態を確認します。

$ az iot hub device-identity show --hub-name iothub-ituru-stream-test --device-id dummy-device-1 --output table

CloudToDeviceMessageCount    ConnectionState    ConnectionStateUpdatedTime    DeviceId        GenerationId        Hub                                         LastActivityTime              Status    StatusUpdatedTime
---------------------------  -----------------  ----------------------------  --------------  ------------------  ------------------------------------------  ----------------------------  --------  --------------------
0                            Disconnected       2021-06-14T14:50:20.9681534Z  dummy-device-1  637585869793538871  iothub-ituru-stream-test.azure-devices.net  2021-06-14T14:50:21.5394413Z  enabled   0001-01-01T00:00:00Z

次に、「iot hub monitor-events」コマンドを用いて、IoT Hubデバイスを通じて取得されているデータをリアルタイムで確認(モニターイベント)できるようにします。

$ az iot hub monitor-events -n iothub-ituru-stream-test -d dummy-device-1                                      

Starting event monitor, filtering on device: dummy-device-1, use ctrl-c to stop...

デバイス(IoTデータ生成プログラム)から IoT Hub へのデータ送信

該当のPythonプログラムのヘルプを表示してみます。

$ python IoTSampleData-v3.py -h

usage: IoTSampleData-v3.py [-h] [--count COUNT] [--proc PROC] [--mode MODE] [--wait WAIT]

IoT機器のなんちゃってダミーデータの生成

optional arguments:
  -h, --help     show this help message and exit
  --count COUNT  データ作成件数
  --proc PROC    データ作成プロセス名
  --mode MODE    tm(ターミナル出力)/ ih(IoTHub出力)
  --wait WAIT    データWait時間(x.x秒)

このPythonプログラムを実行してみます。1秒間隔で3件のデータを IoT Hub へ送信してみます。

$ python IoTSampleData-v3.py --count 3 --mode ih --wait 1.0

IoTHub 出力
Sending message: b'{"id": 0, "time": "2021-06-15T14:20:17.997329", "proc": "111", "section": "S", "iot_num": "514-4962", "iot_state": "\\u9752\\u68ee\\u770c", "vol_1": 178.55781562020292, "vol_2": 81.28006310245719}'
Message sent successfully
Sending message: b'{"id": 1, "time": "2021-06-15T14:20:17.997442", "proc": "111", "section": "O", "iot_num": "572-1109", "iot_state": "\\u795e\\u5948\\u5ddd\\u770c", "vol_1": 178.2040351086569, "vol_2": 69.07364125041418}'
Message sent successfully
Sending message: b'{"id": 2, "time": "2021-06-15T14:20:17.997486", "proc": "111", "section": "S", "iot_num": "682-3917", "iot_state": "\\u7fa4\\u99ac\\u770c", "vol_1": 158.80337403531485, "vol_2": 71.39085509733772}'
Message sent successfully

データ作成件数:3
データ作成時間:3.758289098739624 [sec]

送信されたデータは、同時に IoT Hub のモニターイベントで確認できます。

Starting event monitor, filtering on device: dummy-device-1, use ctrl-c to stop...
{
    "event": {
        "origin": "dummy-device-1",
        "module": "",
        "interface": "",
        "component": "",
        "payload": "{\"id\": 0, \"time\": \"2021-06-15T14:20:17.997329\", \"proc\": \"111\", \"section\": \"S\", \"iot_num\": \"514-4962\", \"iot_state\": \"\\u9752\\u68ee\\u770c\", \"vol_1\": 178.55781562020292, \"vol_2\": 81.28006310245719}"
    }
}
{
    "event": {
        "origin": "dummy-device-1",
        "module": "",
        "interface": "",
        "component": "",
        "payload": "{\"id\": 1, \"time\": \"2021-06-15T14:20:17.997442\", \"proc\": \"111\", \"section\": \"O\", \"iot_num\": \"572-1109\", \"iot_state\": \"\\u795e\\u5948\\u5ddd\\u770c\", \"vol_1\": 178.2040351086569, \"vol_2\": 69.07364125041418}"
    }
}
{
    "event": {
        "origin": "dummy-device-1",
        "module": "",
        "interface": "",
        "component": "",
        "payload": "{\"id\": 2, \"time\": \"2021-06-15T14:20:17.997486\", \"proc\": \"111\", \"section\": \"S\", \"iot_num\": \"682-3917\", \"iot_state\": \"\\u7fa4\\u99ac\\u770c\", \"vol_1\": 158.80337403531485, \"vol_2\": 71.39085509733772}"
    }
}

ここまでで、IoTデバイス(Pythonプログラム)から送信されたデータが IoT Hub で正常に受信できるところまでを確認しました。


次回について

次のステップでは、Azure Stream Analytics でストリーミング処理をおこない、PowerBI でその分析結果をリアルタイムに表示してみます。

参考情報

以下の情報を参考にさせていただきました。感謝申し上げます。
IoT HubとCosmos DBをStream Analyticsでつないでみた
Azure IoT HubにRaspberry Piで収集した温度データを送ってみた
CO2濃度をAzureで可視化してみた

本題のステップ

STEP-1.Azure IoT Hub の設定とデータ受信のモニタリング
STEP-2.Azure Stream Analytics と PowerBI の 設定 とリアルタイム分析の表示

2
2
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
2