はじめに
DatabricksでIoTデータのストリーミング処理の検証しようとしたところ、サンプルデータを用意するのはなかなか難しく...
ということで、家に余っていたRaspberry Piにセンサーをつけて、簡単なCO2気温湿度の計測モニターを作りました。
作成物
Raspberry Pi 4とAzure Databricks/その他サービスを利用し、部屋のCO2・気温・湿度を計測するダッシュボードを作りました。
加えて、Databricksの生成AI機能を使用し、各データを自然言語で分析できるAI Botを作成しました。画像の例では、「6月で最も暑かった日はいつですか?」と聞くと、その日付と最高気温を返答します。
これらの機能はDatabricksの機能であるAI/BI DashboardやAI/BI Genieで実装しています。後続の解説で、データ取り込みから実装までを解説します。
設計
システムの構成図は下記の通りとなります。
①Raspberry Pi + SCD30で気温・湿度・CO2データを提起取得
②Azure IoT Hubでキャプチャし、Azure Delta Lake Storage Gen2に格納
③Databricks Delta Live Tablesで加工
④Databricks AI/BI DashboardやAI/BI Genieで可視化(上記「作成物」で紹介したもの)
使用するリソース・ハードウェアの詳細は下記の通りです。
# | リソース | 説明 |
---|---|---|
1 | Raspberry Pi 4 | マイコンボード。Minecraftのマルチサーバー環境を潰して再利用 |
2 | Grove Base Hat for Raspberry Pi | Groveモジュールの接続基盤。後続のSCD30をI2Cポートで接続する用に使用 |
3 | Grove SCD30 | CO2・温湿度センサー |
4 | Azure Databricks | 弊社サービス。今回はストリーミングされるセンサーデータのパイプライン加工・BI可視化に使用 |
5 | Azure IoT Hub | Azureのメッセージハブ。Raspberry Piを接続しデータをキャプチャする際に使用。 |
6 | Azure Data Lake Storage Gen2 | 通称ADLS。IoT Hubでキャプチャしたデータの一次格納に使用 |
実装
1. Azureリソースの準備
センサーデータの取り込みに必要なAzureリソース(ストレージアカウント、IoT Hub)の作成手順を解説します。サブスクリプション、リソースグループの作成手順は省略しています。
1-1. ストレージアカウント(Azure Data Lake Storage)の作成
最初にセンサーデータを保管するストレージを準備します。今回は秒単位で連携される半構造化データを扱うため、Azure Data Lake Storageを作成します。
こちらの手順に従ってストレージアカウントを作成します。
ストレージアカウントを作成したらコンテナーも追加しておきます。
1-2. Azure IoT Hubの作成
Raspberry Piのデータ送信をキャプチャするIoT Hubを作成します。
Hub自体の作成と、新デバイスの登録の両方を行います。
今回はHub名をskato-iot
、デバイスIDをraspi-scd30
としました。Raspberry PiとIoT Hubの接続にはデバイスのプライマリ接続文字列を使います。
1-3. エンドポイント・ルーティング設定
IoT Hubによってキャプチャされたデータが自動でADLSに流入するように設定します。
こちらの手順にそってエンドポイント、ルーティングを設定します。エンドポイントを作成する際、エンドポイントの種類を「ストレージ」とし、前の手順で作成したストレージアカウントとコンテナーを選択します。エンコードはJSONを選択しました。
(「チュートリアル: IoT Hub メッセージ ルーティングを使用してデバイス データを Azure Storage に送信する」より)
これでAzure側の設定が整いました!
2. Raspberry Piでの実装
Raspberry PiとCO2センサーの組み立て・実装を行います。
2-1. 組み立て・初期設定
Raspberry PiにGrove Base HATをセットし、I2C端子にSCD30を繋げます。端子の口が多いですが、、直感的に組めると思います。
組み上がったものがこちらです。このままだと埃被ってしまいそうだが、いい感じの箱はないだろうか・・・
2-2. 環境構築
Raspberry Piにログインし以下の準備を進めます。
2-2-1. 本体設定(SSH/I2C等)
SSHログインやセンサー受信周りの設定を行います。特にSSH周りはセキュリティ設定を重点的に行います。
- I2C通信の有効化
- 公開鍵認証の設定: ポート変更、鍵の配置、sshconfigファイルの作成など
- ファイアウォールの設定: ローカルネットワーク以外からのアクセスをブロック
- 環境変数の登録:
.bashrc
を編集してAzure IoT Hubのデバイス接続文字列を環境変数IOTHUB_CONNSTR
として登録する
2-2-2. Python仮想環境構築・パッケージインストール
Raspberry Piの環境を汚染しないように仮想環境を構築します。今回はPoetryとpyenvを使用しました。
特別にインストールする必要があるパッケージは下記の通りです。
ライブラリ | 用途 |
---|---|
azure-iot-hub |
azure周りの操作 |
azure-iot-device |
同上 |
grove.py |
IoTセンサーとの連携 |
smbus2 |
同上 |
grove.py
のインストールには少し苦戦しました。通常のpoetry add
ではなぜか使用したいモジュール(grove_co2_scd30.py
)が同梱されなかったので、Github公式レポジトリのソースをクローンし、自作パッケージをインストールする要領で導入しました。具体、下記の手順となります。
①grove.pyリポジトリをクローンする
②grove
ディレクトリ配下をsrcフォルダに入れる
下記の構造になるようにgroveディレクトリ配下を配置します。
├── ./README.md
├── ./poetry.lock
├── ./pyproject.toml
├── ./run.py
├── ./src
│ └── ./src/grove
│ ├── ./src/grove/__init__.py
│ ├── ./src/grove/grove_co2_scd30.py # 使用するモジュール
│ ├── ...
③pyproject.tomlを編集
[tool.poetry]
name = "co2-sensor"
version = "0.1.0"
description = ""
authors = ["Your Name <you@example.com>"]
readme = "README.md"
packages = [
{ include = "grove", from = "src"}
]
[tool.poetry.dependencies]
python = "^3.11"
smbus2 = "^0.4.3"
azure-iot-hub = "^2.6.1"
azure-iot-device = "^2.13.0"
2-3. Co2センサー検証・キャリブレーション
センサーが正しく動作するかを検証します。モジュールgrove_co2_scd30.py
をシェル上で実行すると1秒おきにCO2、気温、湿度を出力します。
(pj-co2-device-py3.11) skato@raspberrypi:~/pj-co2-device $ python src/grove/grove_co2_scd30.py
CO2 concentration is 544.2 ppm
Temperature in Celsius is 30.21 C
Relative Humidity is 59.02 %
CO2 concentration is 544.6 ppm
Temperature in Celsius is 30.21 C
Relative Humidity is 58.98 %
CO2 concentration is 544.6 ppm
...
うまくデータが取得できているようです!が、気温(temperature)が手元の温度計より約3℃ほど高く判定されているようだったので、取得する気温の値を強制的に補正(キャリブレーション)するスクリプトを書き、一度だけ実行します。
from grove.grove_co2_scd30 import GroveCo2Scd30
T_offset = 3 # 気温計の補正値
sensor = GroveCo2Scd30()
sensor.set_temperature_offset(T_offset)
print(f"SCD30センサーのオフセットを実行しました。value: {T_offset}")
CO2濃度のキャリブレーションに関しては、ASC(AutoSelfCalibration)が導入時点で有効化されています。Seeed Studio 公式サイトによると、キャリブレーションには7日間以上の通電と、毎日1時間以上外気にさらす必要があるため、使用開始する際には留意する必要があります。
When activated for the first time a period of minimum 7 days is needed so that the algorithm can find its initial parameter set for ASC. The sensor has to be exposed to fresh air for at least 1 hour every day. Also during that period, the sensor may not be disconnected from the power supply, otherwise the procedure to find calibration parameters is aborted and has to be restarted from the beginning.
2-4. スクリプト作成
5秒ごとにIoT Hubにセンサーデータを送信するスクリプトrun.py
を作成します。
from grove.grove_co2_scd30 import GroveCo2Scd30
from azure.iot.device.aio import IoTHubDeviceClient
from azure.iot.device import Message
import asyncio
import os
import time
import json
CONNECTION_STRING = os.environ.get('IOTHUB_CONNSTR')
sensor = GroveCo2Scd30()
async def send_telemetry():
device_client = IoTHubDeviceClient.create_from_connection_string(CONNECTION_STRING)
await device_client.connect()
print("Connection Success")
while True:
if sensor.get_data_ready_status():
co2, temperature, humidity = sensor.read()
msg = Message(json.dumps(f'{{"co2":{co2}, "temperature":{temperature},"humidity":{humidity}}}'))
msg.content_encoding = "utf-8"
msg.content_type = "application/json;charset=utf-8"
await device_client.send_message(msg)
time.sleep(5)
if __name__ == "__main__":
asyncio.run(send_telemetry())
Azure PortalのIoT Hubに移動し、メトリックにおけるメッセージ使用数に変動があれば処理成功です!
念の為ストレージアカウントのストレージブラウザーを参照し、連携されたjsonファイルも確認しておきます。
{"EnqueuedTimeUtc":"2024-06-02T01:39:32.7880000Z","Properties":{},"SystemProperties":{"connectionDeviceId":"raspi-scd30","connectionAuthMethod":"{\"scope\":\"device\",\"type\":\"sas\",\"issuer\":\"iothub\",\"acceptingIpFilterRule\":null}","connectionDeviceGenerationId":"638528887605426197","contentType":"application/json;charset=utf-8","contentEncoding":"utf-8","enqueuedTime":"2024-06-02T01:39:32.7880000Z"},"Body":"{\"co2\":1109.05322265625, \"temperature\":24.136266708374023,\"humidity\":61.00395965576172}"}
...
2-5. 永続化
以上でIoT Hubへのデータ連携は完了ですが、このままではRaspberry PiへのSSH接続を閉じてしまうとpythonプロセスも終了してしまうため、処理の永続化が必要です。
今回はお手軽なscreenコマンドを使って仮想端末を立ち上げることにしました。
Raspberry Piで下記コマンドを実行しscreenをインストールします。
$ sudo apt-get install screen
後はscreen
を起動後、run.py
を実行した後に仮想端末からデタッチ(Ctrl + A、Ctrl + D)することで、SSH接続からログアウトした後でもプロセスを動かし続けることができます。
3. Databricksでの実装
いよいよDatabricks側の実装に移ります。機能紹介(宣伝)を多く含みますが、Databricksをご利用したことない方も是非機能について知っていただければと思います!
Databricksはデータの収集・分析から機械学習モデルの開発・運用までを一元的に行える統合データ分析プラットフォームです。Apache Sparkを基盤とし、大量データの高速処理やリアルタイム分析が可能です。クラウド上でのスケーラブルな環境を提供し、他サービスとのコラボレーション機能も備えています。
3-1. データカタログ作成(Unity Catalog)
IoT Hubのデータを取り込む前に、Databricksのデータカタログ機能Unity Catalog
上でカタログを作成します。
参考: Unity Catalog説明
Unity Catalogは、Databricksの環境内でデータ資産を一元的に管理するためのツールです。これにより、データの検索、アクセス制御、監査、データの流れ(データリネージ)を一つの場所で行うことができます。
https://docs.databricks.com/ja/data-governance/unity-catalog/index.html
Unity Catalogではデータ/AI資産をカタログ・スキーマ・テーブル等の3階層の名前空間で管理します。今回はカタログを便宜上skato
とおきました。
Unity Catalogの詳細については下記の記事をご覧ください!
3-2. パイプライン構築(Delta Live Tables)
ADLSからデータを読み込み、データを加工するパイプラインをDelta Live Tables
を使って実装します。
参考: Delta Live Tables説明
Delta Live Tablesは、データを取り込み、変換し、保存する一連の処理(パイプライン)を簡単に作成できるフレームワークです。Pyspark/SQLの文法で取得したいデータを仕様を宣言的に定義することで、簡単にパイプラインを作成することができます。
https://docs.databricks.com/ja/delta-live-tables/index.html#what-is-delta-live-tables
今回はBronze→Silver→Goldの順で、ローデータから10分毎の移動平均データを計算するパイプラインを構築しました。ソースコードやパイプラインのイメージは下記をご参照ください。
Raspberry Piデータの接続・加工と、後続でもご紹介するダッシュボード作成に関する詳細は、社内有志が執筆した書籍はじめてのデータウェアハウス ーDatabricksではじめるデータ基盤ガイドーの第5章(Databricks SQL と Raspberry Pi を使って IoT データダッシュボードを作る)が詳しいです。
詳細の説明が気になる方は是非ご購入いただければと思います!
3-3. ダッシュボード作成(AI/BI Dashboard)
冒頭で紹介したダッシュボードを作成します。前に作ったsilver_data
とgold_data_ma10min
(10分毎の移動平均データ)を読み込み、分別・日別の計測値をモニターします。
詳細な手順はDatabricksドキュメント - ダッシュボードをご参照ください。
なお、本機能は生成AIサポートと統合されており、プロンプトによって図表を作成することも可能です!
3-4. データ分析用生成AIボット作成(AI/BI Genie)
Chatbotライクなデータ可視化ツールAI/BI Genieを使用し、気温・湿度・CO2データを自然言語で分析するボットを作成します。
RAGやFine Tuningといった実装は不要で、使用するテーブルを登録するだけでRoom(ボット)を作成することができます。詳細な手順はDatabricksドキュメント - AI/BI Genieスペースでの作業をご参照ください。
今回は下記の画像の通り、iot_silver
テーブルを分析するRoomを作成しました。
Roomを作成後、テキストベースで分析したいことを入力してデータを探索することが可能です。「6月で最も暑かった日はいつですか?」と入力すると、その回答と回答の算出に作成したSQL文を答えてくれます。
AI/BI DashboardやAI/BI Genieの詳しいウォークスルーについてはこちらの記事もご参照ください!
3-5. (おまけ)加湿器の操作
おまけで、SwitchBot製の加湿器を特定湿度でオンオフするスクリプトを書きました。サンプルコードはこちらをご参照ください。
Switch Bot用のアクセストークンはSwitchBot トークンの取得方法を参照して取得しました。
取得したトークンはDatabricksのシークレットに登録しておきます。
Databricks Workflow
のようなオーケストレーターと組み合わせると、実装したDLTパイプラインや加湿器操作のスクリプトを定期的に稼働させることが可能です。
おわりに
作ったものの説明は以上となります!今回実装しなかったアイデアについてもメモ書きします。
①機械学習との組み合わせ
例えばProphet等のモデルで1h内の値を予測して表示することが可能です。一方でデータを見るとエアコンや部屋の人数、換気等に対して敏感に反応していたため、外的イベント込みの予測は難しいと判断し実装保留しました。
もっとデータがたまれば、時系列クラスタリングや因果推論等を試すと面白い分析ができるかと思います。
②CO2が一定濃度になったらSlackで換気の通知を出す
Databricksのアラート機能で、SlackのWebhookを設定すれば通知を送る事が可能です。
Databricksの無料トライアルを試してみたい方はぜひこちらのリンクからご登録お願いします!