1
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

お題は不問!Qiita Engineer Festa 2024で記事投稿!
Qiita Engineer Festa20242024年7月17日まで開催中!

shadowとwillとretainを活用してAWS IoT CoreとPubSubする③pahoでpubsub

Last updated at Posted at 2024-06-26

この記事について

shadowとwillとretainを活用してAWS IoT CoreとPubSubする②AWSリソースの構築で構築した環境を使ってラズパイGW broker <-> AWS IoT間でのMQTT通信を始めます。

ここから3回に分けてpaho-mqttライブラリーでmessageのpubsub、shadowのupdate/get、retainとwillの設定をします。

今回はラズパイに接続された仮想のdevice_0のシグナルをAWS IoTとpubsubします。


ライブラリー

GitHub

GitHub: eclipse/paho.mqtt.python

PyPi

pypi: paho-mqtt 2.1.0

開発者ガイド

Eclipse Paho™ MQTT Python クライアント

callback

コールバック(callback)とは、コンピュータプログラミングにおいて、あるサブルーチン(関数など)を呼び出す際に別のサブルーチンを途中で実行するよう指定する手法

コールバックの嬉しさの一つは、connect、disconnectなどのメソッド実行をトリガーにresponse、payload、errorなどを自分の好みに出力できることです。

paho-mqttで準備されているcallback関数のうち今回のpubsubでは3種類を使います。

client.on_connect

ブローカーが接続要求に応答したときに呼び出されるコールバック

client.on_connectインスタンスで呼び出す中身としてon_connectメソッドを準備します。

呼び出されたらrespons_codeをチェックして異常の場合に出力させます。

def on_connect(client, userdata, flags, respons_code):
    if respons_code != 0:
        print(f"Restart after 120 secs due to the connection cannot be established: respons_code: {respons_code} flags: {flags}")
    print('Connected')

client.on_message

クライアントがサブスクライブしているトピックでメッセージが受信されたときに呼び出されるコールバック

client.on_messageインスタンスで呼び出す中身としてon_messageメソッドを準備します。
呼び出されたらpayloadをdecodeして出力させます。

def on_message(client, userdata, msg):
    print(f"Received message: {json.dumps(json.loads(msg.payload.decode('utf-8')), ensure_ascii=False)}\non topic: {msg.topic}\nwith QoS: {msg.qos}")

on_disconnect

クライアントがブローカーから切断したときに呼び出されるコールバック

client.on_disconnectインスタンスで呼び出す中身としてon_disconnectメソッドを準備します。
切断の正常・異常を出力させます。

def on_disconnect(client, userdata, respons_code):
    if respons_code != 0:
        print(f"Unexpected disconnection.")
    else:
        print(f"Disconnected successfully.")

customメソッド

自分使い用にまとめ直したメソッドに再構築して使います。

wifi接続判定

初期のころから常用しているメソッドです。
そもそも本来備わった手段がありますが、連続運用する際に起動時wifi接続出来なかったら自律的にリトライするために今でも使っています。

def get_ssid():
    cmd = 'iwconfig wlan0|grep ESSID'
    r = subprocess.run(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE).stdout.decode().rstrip()
    idx = r.find('ESSID:')
    if r[idx + 7:-1] == "ff/an":
        print(f"Restart after 120 secs due to the connection cannot be determined: ESSID: {r[idx + 7:-1]}")
        time.sleep(120)
        subprocess.call(["sudo", "reboot"])

MQTTクライアント作成

一意のclient idでAWS IoTに接続するクライアントobjectを作成します。

証明書と秘密鍵で認証されます。MQTTはversion 3.1.1、TLSは1.2を使用します。

def mqtt_init(clientId):
    try:
        client = mqtt.Client(client_id=clientId, protocol=mqtt.MQTTv311)
        client.tls_set(
            ca,
            certfile=cert,
            keyfile=key,
            tls_version=ssl.PROTOCOL_TLSv1_2)
        client.tls_insecure_set(True)
    except Exception as e:
        print(f"Restart after 120 secs due to launches an MQTT client and creates an object instance failed of: {e}")  
    return client

MQTT接続

mqtt_init()で作成したクライアントでAWS IoTに接続します。

1つのコード中でsubscriberを立ち上げておく為には、ループを作る前に、接続時subscriberとon_messageを呼び出しておく必要があります。

過去に他の方の記事でも散見した記憶がありますがはまりポイントだと思います。

【訂正:2024.6.30】試行錯誤の結果の思い込みでした。必要なのはclientの作成->接続の確立->subscriber起動の順番です。最初の組み合わせでも可能ですが、本稿での説明ではmain()の中でsub()メソッドを呼び出す構成に変更します。

通常はこのメソッドの中でループ開始が呼び出されることが多いですが、本稿では一つのフローでpubsubしたいのでこのメソッドから外します。

def mqtt_connect(client):
    client.on_connect = on_connect
    client.on_message = on_message
    client.connect(endpoint, port, keepalive=60)

切断

client idによる接続をdisconnectメソッドで切断します。

disconnectメソッドでの送信ナシでの接続は異常終了と認識されます。この切断時の正常・異常の違いを使ってwillの送信が行われます。

def mqtt_disconnect(client):
    client.on_disconnect = on_disconnect
    client.disconnect()
    client.loop_stop()

topic発行(publish)

いわゆるpubです。「topicを発行」「topicを購読」という言い方に慣れるまで相当時間がかかりました苦笑

今回は仮想のホスト名:MQTTServer1のラズパイGW/brokerから、device_0のシグナル:msgを以下のメソッドでAWS IoTにpubします。

説明を端折りましたがqos=1で。

def mqtt_publish(client, msg):
    data = {}
    _topic = topic + "/" + str(device_no)
    print("Publishing to topic:", _topic)
    data['Timestamp'] = int(time.time())
    data['hostname'] = os.uname()[1]
    data['device_no'] = device_no
    data['msg'] = msg
    client.publish(_topic, json.dumps(data, default=json_serial), qos=1)
    print(_topic, data)

def json_serial(para):
    return para.isoformat()

topic購読(subcrribe)

いわゆるsubです。

以下のメソッドで自分が接続しているMQTTネットワーク内でtopicにpubされたメッセージをsubします。

def mqtt_subscribe(client):
    _topic = topic + "/" + str(device_no)
    print("Subscribing to topic:", _topic)
    client.subscribe(_topic, qos=1)

ラズパイGW <-> AWS IoT でpubsub

AWS IoTとport 8883で接続するための認証情報とラズパイの識別子(client id=hostname)を準備します。

topicはベースとなる階層を固定します。この下にdevice Noであったりprocess idであったりサブの階層をつなげます。

接続情報
endpoint = 'xxxxxxxxxxxxx-ats.iot.ap-northeast-1.amazonaws.com'
hostname = os.uname()[1]
port = 8883
cert = f'./cert/MQTTServer1-certificate.pem.crt'
key = f'./cert/MQTTServer1-private.pem.key'
ca = f'./cert/AmazonRootCA1.pem'

device_no = 0
topic = f'MQTTServer/MQTTServer1'

今回、device_0のシグナルは以下とします。
こんなシグナル出すdeviceって一体…ヨシッとして下さいm(__)m

デバイスシグナル
msg = "AWS IoT 接続ヨシッ👉"

ここまでで準備した認証情報とメソッドを組み合わせるとラズパイとAWS IoTが接続されたNWでpubsubが可能になります。

以下を実行すると接続後にsubscriberが待ち受けした状態となり、pubされた後に自分自身でもmessageをsubして回線を切断します。

上手くいかないときは1~3秒くらいのtime.sleepを挟むとpubsubが出来ると思います。

clientid = f"{hostname}"
time.sleep(5)
try:
    #wifi接続チェック
    get_ssid()
    #クライアント作成
    client = mqtt_init(clientid)
    #client idで接続
    mqtt_connect(client)
    #sub開始
    mqtt_subscribe(client)
    #クライアント接続ループ開始
    client.loop_start()
    #message発行
    mqtt_publish(client, msg)
    #切断
    mqtt_disconnect(client)
except KeyboardInterrupt:
    mqtt_disconnect(client)
    time.sleep(3)
    sys.exit()

Demo code 全体

コードを全てつなげると以下のようになります。

pubsub_paho.py
#!/usr/bin/python
# -*- coding: utf-8 -*-
import os
import sys
import json
import ssl
import time
import subprocess
import paho.mqtt.client as mqtt


endpoint = 'xxxxxxxxxxxxx-ats.iot.ap-northeast-1.amazonaws.com'
hostname = os.uname()[1]
port = 8883
cert = f'./cert/MQTTServer1-certificate.pem.crt'
key = f'./cert/MQTTServer1-private.pem.key'
ca = f'./cert/AmazonRootCA1.pem'

device_no = 0
topic = f'MQTTServer/MQTTServer1'

msg = "AWS IoT 接続ヨシッ👉"


def on_connect(client, userdata, flags, respons_code):
    if respons_code != 0:
        print(f"Restart after 120 secs due to the connection cannot be established: respons_code: {respons_code} flags: {flags}")
    print('Connected')

def on_disconnect(client, userdata, respons_code):
    if respons_code != 0:
        print(f"Unexpected disconnection.")
    else:
        print(f"Disconnected successfully.")

def on_message(client, userdata, msg):
    print(f"Received message: {json.dumps(json.loads(msg.payload.decode('utf-8')), ensure_ascii=False)}\non topic: {msg.topic}\nwith QoS: {msg.qos}")
    return

def get_ssid():
    cmd = 'iwconfig wlan0|grep ESSID'
    r = subprocess.run(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE).stdout.decode().rstrip()
    idx = r.find('ESSID:')
    if r[idx + 7:-1] == "ff/an":
        print(f"Restart after 120 secs due to the connection cannot be determined: ESSID: {r[idx + 7:-1]}")
        time.sleep(120)
        subprocess.call(["sudo", "reboot"])

def mqtt_init(clientId):
    try:
        client = mqtt.Client(client_id=clientId, protocol=mqtt.MQTTv311)
        client.tls_set(
            ca,
            certfile=cert,
            keyfile=key,
            tls_version=ssl.PROTOCOL_TLSv1_2)
        client.tls_insecure_set(True)
    except Exception as e:
        print(f"Restart after 120 secs due to launches an MQTT client and creates an object instance failed of: {e}")  
    return client

def mqtt_connect(client):
    client.on_connect = on_connect
    client.on_message = on_message
    client.connect(endpoint, port, keepalive=60)

def mqtt_disconnect(client):
    client.on_disconnect = on_disconnect
    client.disconnect()
    client.loop_stop()

def mqtt_publish(client, msg):
    data = {}
    _topic = topic + "/" + str(device_no)
    print("Publishing to topic:", _topic)
    data['Timestamp'] = int(time.time())
    data['hostname'] = os.uname()[1]
    data['device_no'] = device_no
    data['msg'] = msg
    client.publish(_topic, json.dumps(data, default=json_serial), qos=1)
    print(_topic, data)
    return

def mqtt_subscribe(client):
    _topic = topic + "/" + str(device_no)
    print("Subscribing to topic:", _topic)
    client.subscribe(_topic, qos=1)
    return

def json_serial(para):
    return para.isoformat()


if __name__ == '__main__':
    clientid = f"{hostname}"
    time.sleep(5)
    try:
        get_ssid()
        client = mqtt_init(clientid)
        mqtt_connect(client)
        mqtt_subscribe(client)
        client.loop_start()
        mqtt_publish(client, msg)
        mqtt_disconnect(client)
    except KeyboardInterrupt:
        mqtt_disconnect(client)
        time.sleep(3)
        sys.exit()

実行結果

~ $ python pubsub_paho.py
・
・
Subscribing to topic: MQTTServer/MQTTServer1/0
Publishing to topic: MQTTServer/MQTTServer1/0
Connected
MQTTServer/MQTTServer1/0 {'Timestamp': 1719403174, 'hostname': 'MQTTServer1', 'device_no': 0, 'msg': 'AWS IoT 接続ヨシッ👉'}
Received message: {"Timestamp": 1719403174, "hostname": "MQTTServer1", "device_no": 0, "msg": "AWS IoT 接続ヨシッ👉"}
on topic: MQTTServer/MQTTServer1/0
with QoS: 1
Disconnected successfully.
~ $ 

AWS IoTのテストクライアント上でもsubscribeを確認できました。


次回

shadowとwillとretainを活用してAWS IoT CoreとPubSubする④pahoでshadow update

1
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?