はじめに
前回に、センサーデータをAWS IoT Coreに送信してみました。はじめは苦戦しましたが、結構簡単に設定できて感動しました。
今回は送ったデータをサブスクライブするプログラムを作成します。
AWS IoT Core
AWSにログインして、IoT Coreを開きます。
IoT Coreでサブスクライブするための設定をします。
ポリシーの設定
まずは、サブスクライブ用のクライアントIDを決めてください。
クライアントIDが決まったら、前回同様にサイドバーの【セキュリティ】 > 【ポリシー】を選択します。
遷移した画面に、ポリシー一覧が表示されるので、【[前回作成したモノの名前]-Policy】を選択してください。
ポリシーの内容が表示されます。画面下部に全てのバージョンという項目がありますので、【アクティブ】になっているものの横にチェックをつけて、【バージョンを編集】を押下します。
ここでは接続設定とtopicへのサブスクライブ許可を設定します。
"iot:Subscribe"に読み込み可能のtopic名を設定します。
(基本的にpublishと同じtopic名が入るはずです)
その次に、iot:Connectに接続するクライアント名を設定します。
下記にサブスクライブ用のクライアントIDが【subscribe_id】、topic名が【test/aaa】とした場合の例を記載します。
(前回との差分は、iot:Connectの部分にサブスクライブ用のクライアントIDを追加している部分です)
例の中で使用している情報
- topic名 : test/aaa
- AWSのアカウントID : XXXXXXXXXXXX
- サブスクライブ用クライアントID : subscribe_id
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"iot:Publish",
"iot:Receive",
"iot:PublishRetain"
],
"Resource": "arn:aws:iot:ap-northeast-1:XXXXXXXXXXXX:topic/test/aaa"
},
{
"Effect": "Allow",
"Action": "iot:Subscribe",
"Resource": "arn:aws:iot:ap-northeast-1:XXXXXXXXXXXX:topicfilter/test/aaa"
},
{
"Effect": "Allow",
"Action": "iot:Connect",
"Resource": [
"arn:aws:iot:ap-northeast-1:XXXXXXXXXXXX:client/client_test_id",
"arn:aws:iot:ap-northeast-1:XXXXXXXXXXXX:client/subscribe_id"
]
}
]
}
クライアントIDは任意の名前で構いませんが、publish用のものと被らないようにしてください。
例えば、クライアントIDがpublishとsubscribeで同一のものを使用すると、片方が認証されて片方が強制ログアウトされます。(体験談)
その他
前回と同じエンドポイントと証明書を使用しますので準備してください。
これらはサブスクライブプログラムで使用します。
以上でAWS側の設定は終了です。
今回のサブスクライブプログラム
環境構築
サブスクライブ用の環境を構築します。
検証環境
- python 3.10.13
- paho-mqtt 1.6.1
- psycopg2-binary 2.9.9
- PostgreSQL 14.10
※.PostgreSQLとPythonはインストールされているものとします。
※.paho-mqttは最近更新されたようですが、メソッドが若干変わっていたので今回は前のバージョンを使用します。
pythonパッケージ
任意の仮想環境を有効化して下記コマンドを入力します。
(venv)$ pip install paho-mqtt==1.6.1
(venv)$ pip install psycopg2-binary==2.9.9
PostgreSQL
DB作成
$ createdb -U postgres --encoding=utf8 --locale=ja_JP.UTF-8 --template=template0 iot_db
テーブル作成
SQL開発ソフト等で下記のSQLを実行してテーブルを作成します。
CREATE TABLE IF NOT EXISTS
light_value
(
id BIGSERIAL NOT NULL PRIMARY KEY,
topic_name VARCHAR(10),
light_val INTEGER,
pub_timestamp TIMESTAMP
)
;
テーブル作成まで終わったら、実際にDBとテーブルが作成されていることを確認してみましょう。(重要)
ディレクトリ構成
cert/ (directory)
∟ iot20240214.cert.pem (デバイス証明書)
∟ iot20240214.private.key (暗号化キー)
∟ root-CA.crt (ルート証明書)
subscribe.py (サブスクライブ用プログラム)
※.AWSで作成したモノの名前が「iot20240214」の場合でファイル名を付けています。
サブスクライブプログラム
下記のPythonプログラムを使用します。
なお、mqttで送られてくるjsonは前回作成したものと同じものとします。
ちなみにsubscribe.pyという名前にしました。(先程のディレクトリ構成と同じ)
import os
import json
from paho.mqtt.client import Client
import psycopg2
# AWS IoT Core connection information
CLIENT_ID = 'subscribe_id'
TOPIC = 'test/aaa'
MQTT_HOST = YYYYYYYYYY # AWS IoT Coreエンドポイント名を入れてください
MQTT_PORT = 8883
ROOT_CERT_FILE_PATH = "./cert/root-CA.crt"
DEVICE_CERT_FILE_PATH = "./cert/test20240214.cert.pem"
PRIVATE_KEY_FILE_PATH = "./cert/test20240214.private.key"
# クライアントインスタンスの作成
client = Client(client_id=CLIENT_ID)
# コールバック関数
def on_connect(client, userdata, flags, rc):
print("Connected with result code "+str(rc))
client.subscribe(TOPIC)
def on_message(client, userdata, msg):
"""
メッセージ受信時の処理
"""
insert_record_to_database(msg)
def insert_record_to_database(message):
"""
jsonデータをパースし、適切にデータベースへの書き込む関数
"""
try:
res = json.loads(message.payload)
payload = res['payload']
except Exception as e:
print(f"jsonデコード失敗. error message: {e}")
return
# クエリを作成
try:
query: str = f"""
INSERT INTO light_value
(
topic_name,
light_val,
pub_timestamp
)
VALUES
(
'{message.topic}',
'{payload['lightVal']}',
'{payload['timestamp']}'
);
"""
except Exception as e:
print(f"jsonからの値取り出し失敗. error message: {e}")
return
# DBとのconnectionを作成
try:
connection = connect_database()
except Exception as e:
print(f"DB接続失敗. error message: {e}")
return
# クエリを実行
try:
execute_query(connection, query)
except Exception as e:
# closeするためにreturnしない。
print(f"DB書き込み失敗. error message: {e}")
# connectionクローズ
try:
connection.close()
except Exception as e:
print(f'DB切断失敗. error message: {e}')
def connect_database():
connection = psycopg2.connect(
host = os.environ['iot_db_host'],
dbname = os.environ['iot_db_name'],
port = os.environ['iot_db_port'],
user = os.environ['iot_db_user_name'],
password = os.environ['iot_db_user_pass']
)
return connection
def execute_query(connection, query) -> None:
with connection.cursor() as cur:
cur.execute(query)
connection.commit()
# TLS接続の設定
client.tls_set(
ca_certs=ROOT_CERT_FILE_PATH,
certfile=DEVICE_CERT_FILE_PATH,
keyfile=PRIVATE_KEY_FILE_PATH
)
client.on_connect = on_connect
client.on_message = on_message
# 接続
client.connect(MQTT_HOST, MQTT_PORT)
# 永久ループで待機
client.loop_forever()
確認
これで全ての準備が整いました。
早速サブスクライブプログラムを動かして、DBへのデータ登録ができていることを確認しましょう。
起動は下記のコマンドです。
$ source /path/to/venv/bin/activate ← 任意の仮想環境パス
(venv)$ cd /path/to/sub_program/path ← subscribe.pyが存在するディレクトリ
(venv)$ python subscribe.py
無事に環境構築が済んでいればデータベースへの登録が行えるはずです。
以上です。
次回以降の記事で、フリーのBIツールを紹介してもらったのでこれを触ってみようと思います。