STEP-1.Azure IoT Hub の設定とデータ受信のモニタリング
概要
データは鮮度が命! ということで、データストリーミングを手軽にAzureで実装をおこなってみました。構成は、Azureの 「IoT Hub + Stream Analytics + PowerBI」となります。 IoT疑似データ生成プログラムをPythonで実装し IoT Hub にデータを投げ、Stream Analytics でストリーミング+クエリ処理を行い、そのクエリ結果=分析情報をリアルタイムにPowerBIに表示してみました。
データストリーミングのデファクトは「Apache kafka(商用版:Confluent)」で、「Azure HDInsight」で Apache Kafak の実装を行えますが、それより簡易にストリーミング処理を行うためのものとなります。
- 以下の2つのステップで順次説明します。今回は STEP-1 について説明します。
- STEP-1.Azure IoT Hub の設定とデータ受信のモニタリング
- STEP-2.Azure Stream Analytics と PowerBI の 設定 とリアルタイム分析の表示
ローカル環境
macOS Big Sur 11.3
python 3.8.3
IoT Hub の設定
IoT Hub の作成
Azure Portalにログインし、IoT Hubを作成します。 IoT Hub 名に「iothub-ituru-stream-test」を指定します。
スケーリングレベルとユニット等は以下の画面通りの設定を行います。
画面下部の「確認および作成」ボタンを押し、その後の検証に成功しているのであれば、画面下部の「作成」ボタンを押します。
IoT デバイス の登録
作成された IoT Hub「iothub-ituru-stream-test」画面の左側から「IoTデバイス」を選択後、画面上部の「+新規」を選択します。
新たに表示される「デバイスの作成」画面において、「デバイスID」に「dummy-device-1」を入力し、画面下部の「保存」ボタンを押します。
次に、画面の左側から「組み込みのエンドポイント」を選択後、「コンシューマグループ」に「consumerdummy」を入力します。
最後に、画面の左側から「IoTデバイス」を選択後、作成した「dummy-device-1」を選択後、表示される「プライマリ接続文字列」を控えておき、ローカルの環境変数「IOTHUB_CONNECTION_STRING」として定義しておきます(IoT疑似データ生成プログラムで使用します)。
IoTデバイスとしてのプログラム
今回は、IoTデバイスとして「IoT疑似データ生成プログラム」をPythonで作成し、手動実行することによりIoTデータを生成させます。
プログラムを実行させるために以下のライブラリをインストールしておきます。
pip install azure-iot-device
実行させるプログラムは以下となります。
import random
import json
import time
from datetime import date, datetime
import argparse
import string
from faker.factory import Factory
import os
from azure.iot.device import IoTHubDeviceClient, Message
# IoT Hub 接続文字列の取得
IOTHUB_CONNECTION_STRING = os.environ['IOTHUB_CONNECTION_STRING']
# ダミーデータ作成のための Faker の使用
Faker = Factory.create
fake = Faker()
fake = Faker("ja_JP")
# IoT機器のダミーセクション(大文字アルファベットを定義)
section = string.ascii_uppercase
# IoT機器で送信JSONデータの作成
def iot_json_data(count, proc):
iot_items = json.dumps({
'items': [{
'id': i, # id
'time': generate_time(), # データ生成時間
'proc': proc, # データ生成プロセス名
'section': random.choice(section), # IoT機器セクション
'iot_num': fake.zipcode(), # IoT機器番号
'iot_state': fake.prefecture(), # IoT設置場所
'vol_1': random.uniform(100, 200), # IoT値−1
'vol_2': random.uniform(50, 90) # IoT値−2
}
for i in range(count)
]
}, ensure_ascii=False).encode('utf-8')
return iot_items
# IoT機器で計測されたダミーデータの生成時間
def generate_time():
dt_time = datetime.now()
gtime = json_trans_date(dt_time)
return gtime
# date, datetimeの変換関数
def json_trans_date(obj):
# 日付型を文字列に変換
if isinstance(obj, (datetime, date)):
return obj.isoformat()
# 上記以外は対象外.
raise TypeError ("Type %s not serializable" % type(obj))
# メイン : ターミナル出力用
def tm_main(count, proc, wait):
print('ターミナル 出力')
iotjsondata = iot_json_data(count, proc)
json_dict = json.loads(iotjsondata)
# pprint.pprint(json_dict)
for item in json_dict['items']:
# print(json.dumps(item).encode('utf-8'))
print(item)
time.sleep(wait)
# メイン : IoTHub 出力用
def iothub_main(count, proc, wait):
print('IoTHub 出力')
iotjsondata = iot_json_data(count, proc)
json_dict = json.loads(iotjsondata)
client = iothub_client_init()
for item in json_dict['items']:
message = Message(json.dumps(item).encode('utf-8'))
try:
print("Sending message: {}".format(message))
client.send_message(message)
print ("Message sent successfully" )
time.sleep(wait)
except KeyError:
print ("Message sent Error!!!" )
break
except KeyboardInterrupt:
print ( "IoTHubClient Interrupt Stopped" )
break
# Create an IoT Hub client(IoT Hub への接続)
def iothub_client_init():
client = IoTHubDeviceClient.create_from_connection_string(IOTHUB_CONNECTION_STRING)
return client
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='IoT機器のなんちゃってダミーデータの生成')
parser.add_argument('--count', type=int, default=10, help='データ作成件数')
parser.add_argument('--proc', type=str, default='111', help='データ作成プロセス名')
parser.add_argument('--mode', type=str, default='tm', help='tm(ターミナル出力)/ ih(IoTHub出力)')
parser.add_argument('--wait', type=float, default=1, help='データWait時間(x.x秒)')
args = parser.parse_args()
start = time.time()
if (args.mode == 'ih'):
iothub_main(args.count, args.proc, args.wait)
else :
tm_main(args.count, args.proc, args.wait)
making_time = time.time() - start
print("")
print(f"データ作成件数:{args.count}")
print("データ作成時間:{0}".format(making_time) + " [sec]")
print("")
IoT Hub でのデータ受信モニタリング
まず最初に IoT Hub の状態を確認します。
$ az iot hub device-identity show --hub-name iothub-ituru-stream-test --device-id dummy-device-1 --output table
CloudToDeviceMessageCount ConnectionState ConnectionStateUpdatedTime DeviceId GenerationId Hub LastActivityTime Status StatusUpdatedTime
--------------------------- ----------------- ---------------------------- -------------- ------------------ ------------------------------------------ ---------------------------- -------- --------------------
0 Disconnected 2021-06-14T14:50:20.9681534Z dummy-device-1 637585869793538871 iothub-ituru-stream-test.azure-devices.net 2021-06-14T14:50:21.5394413Z enabled 0001-01-01T00:00:00Z
次に、「iot hub monitor-events」コマンドを用いて、IoT Hubデバイスを通じて取得されているデータをリアルタイムで確認(モニターイベント)できるようにします。
$ az iot hub monitor-events -n iothub-ituru-stream-test -d dummy-device-1
Starting event monitor, filtering on device: dummy-device-1, use ctrl-c to stop...
デバイス(IoTデータ生成プログラム)から IoT Hub へのデータ送信
該当のPythonプログラムのヘルプを表示してみます。
$ python IoTSampleData-v3.py -h
usage: IoTSampleData-v3.py [-h] [--count COUNT] [--proc PROC] [--mode MODE] [--wait WAIT]
IoT機器のなんちゃってダミーデータの生成
optional arguments:
-h, --help show this help message and exit
--count COUNT データ作成件数
--proc PROC データ作成プロセス名
--mode MODE tm(ターミナル出力)/ ih(IoTHub出力)
--wait WAIT データWait時間(x.x秒)
このPythonプログラムを実行してみます。1秒間隔で3件のデータを IoT Hub へ送信してみます。
$ python IoTSampleData-v3.py --count 3 --mode ih --wait 1.0
IoTHub 出力
Sending message: b'{"id": 0, "time": "2021-06-15T14:20:17.997329", "proc": "111", "section": "S", "iot_num": "514-4962", "iot_state": "\\u9752\\u68ee\\u770c", "vol_1": 178.55781562020292, "vol_2": 81.28006310245719}'
Message sent successfully
Sending message: b'{"id": 1, "time": "2021-06-15T14:20:17.997442", "proc": "111", "section": "O", "iot_num": "572-1109", "iot_state": "\\u795e\\u5948\\u5ddd\\u770c", "vol_1": 178.2040351086569, "vol_2": 69.07364125041418}'
Message sent successfully
Sending message: b'{"id": 2, "time": "2021-06-15T14:20:17.997486", "proc": "111", "section": "S", "iot_num": "682-3917", "iot_state": "\\u7fa4\\u99ac\\u770c", "vol_1": 158.80337403531485, "vol_2": 71.39085509733772}'
Message sent successfully
データ作成件数:3
データ作成時間:3.758289098739624 [sec]
送信されたデータは、同時に IoT Hub のモニターイベントで確認できます。
Starting event monitor, filtering on device: dummy-device-1, use ctrl-c to stop...
{
"event": {
"origin": "dummy-device-1",
"module": "",
"interface": "",
"component": "",
"payload": "{\"id\": 0, \"time\": \"2021-06-15T14:20:17.997329\", \"proc\": \"111\", \"section\": \"S\", \"iot_num\": \"514-4962\", \"iot_state\": \"\\u9752\\u68ee\\u770c\", \"vol_1\": 178.55781562020292, \"vol_2\": 81.28006310245719}"
}
}
{
"event": {
"origin": "dummy-device-1",
"module": "",
"interface": "",
"component": "",
"payload": "{\"id\": 1, \"time\": \"2021-06-15T14:20:17.997442\", \"proc\": \"111\", \"section\": \"O\", \"iot_num\": \"572-1109\", \"iot_state\": \"\\u795e\\u5948\\u5ddd\\u770c\", \"vol_1\": 178.2040351086569, \"vol_2\": 69.07364125041418}"
}
}
{
"event": {
"origin": "dummy-device-1",
"module": "",
"interface": "",
"component": "",
"payload": "{\"id\": 2, \"time\": \"2021-06-15T14:20:17.997486\", \"proc\": \"111\", \"section\": \"S\", \"iot_num\": \"682-3917\", \"iot_state\": \"\\u7fa4\\u99ac\\u770c\", \"vol_1\": 158.80337403531485, \"vol_2\": 71.39085509733772}"
}
}
ここまでで、IoTデバイス(Pythonプログラム)から送信されたデータが IoT Hub で正常に受信できるところまでを確認しました。
次回について
次のステップでは、Azure Stream Analytics でストリーミング処理をおこない、PowerBI でその分析結果をリアルタイムに表示してみます。
参考情報
以下の情報を参考にさせていただきました。感謝申し上げます。
IoT HubとCosmos DBをStream Analyticsでつないでみた
Azure IoT HubにRaspberry Piで収集した温度データを送ってみた
CO2濃度をAzureで可視化してみた
本題のステップ
STEP-1.Azure IoT Hub の設定とデータ受信のモニタリング
STEP-2.Azure Stream Analytics と PowerBI の 設定 とリアルタイム分析の表示