前提
- この記事では、RaspberryPiとAWS IoTが既に接続可能であること。
- 接続方法はこちらのQiita記事を参照してください。
- MQTTプロトコルによるPub/SubのPythonプログラムを記述します。
環境
- RaspberryPi 3B:
- Raspbian GNU/Linux 10 (buster)
- Python 2.7.16 (default, Oct 10 2019, 22:02:15)
- OpenSSL 1.1.1d 10 Sep 2019
- インターネットに接続できる環境。(AWSのIoTサービスに接続できる)
- AWSのアカウントがあること。
- AWS IoTサービスへ接続できること。
- 下記は2020年4月時点の情報です。
プログラム構成
- メッセージをPublishするプログラム: MQTTpub_test1_qiita.py
- タイムスタンプをメッセージに格納し送信する。
- タイムスタンプはtime()関数で取得できるUNIX時間(エポック秒)を使う。
- メッセージをSubscribeするプログラム: MQTTsub_test1_qiita.py
-
メッセージに格納されたタイムスタンプと受信した時のタイムスタンプとの差を見る。タイムラグを計測する。
![PubSub図1.png](https://qiita-image-store.s3.ap-northeast-1.amazonaws.com/0/225748/8fd33674-f56b-920b-a090-a3dff8b2b2d0.png)
-
プログラム
- 以下、RaspberryPiで実行するプログラムです。
- Publishプログラム
MQTTpub_test1_qiita.py
from AWSIoTPythonSDK.MQTTLib import AWSIoTMQTTClient
import os
import json
import time
CLIENT_ID = "mqtt_pub1"
IOT_ENDPOINT_URL = "XXX-XXX.XXX.ap-northeast-1.amazonaws.com"
IOT_ENDPOINT_PORT = 8883
PATH = os.getcwd()
ROOT_CA_PATH = PATH + "/XXX-CA1.pem"
PRIVATE_KEY_PATH = PATH + "/XXX-private.pem.key"
CERTIFICATE_PATH = PATH + "/XXX-certificate.pem.crt"
KEEP_ALIVE_TIME = 60
TOPIC = "rasp3-mono/timestamp"
myMQTTClient = AWSIoTMQTTClient(CLIENT_ID)
myMQTTClient.configureEndpoint(IOT_ENDPOINT_URL, IOT_ENDPOINT_PORT)
myMQTTClient.configureCredentials(ROOT_CA_PATH, PRIVATE_KEY_PATH, CERTIFICATE_PATH)
myMQTTClient.configureOfflinePublishQueueing(-1)
myMQTTClient.configureDrainingFrequency(2)
myMQTTClient.configureConnectDisconnectTimeout(10)
myMQTTClient.configureMQTTOperationTimeout(5)
def GetTimeStamp():
return time.time()
if __name__ == '__main__':
myMQTTClient.connect(KEEP_ALIVE_TIME)
num = 0
while num < 10:
message = {}
message['num'] = num
message['time'] = GetTimeStamp()
payload = json.dumps(message)
print("payload=", payload)
myMQTTClient.publish(TOPIC, payload, 1)
num += 1
time.sleep(5)
- Subscribe プログラム
MQTTsub_test1_qiita.py
from AWSIoTPythonSDK.MQTTLib import AWSIoTMQTTClient
import os
import json
import time
CLIENT_ID = "mqtt_sub1"
IOT_ENDPOINT_URL = "XXX-XXX.XXX.ap-northeast-1.amazonaws.com"
IOT_ENDPOINT_PORT = 8883
PATH = os.getcwd()
ROOT_CA_PATH = PATH + "/XXX-CA1.pem"
PRIVATE_KEY_PATH = PATH + "/XXX-private.pem.key"
CERTIFICATE_PATH = PATH + "/XXX-certificate.pem.crt"
KEEP_ALIVE_TIME = 60
TOPIC = "rasp3-mono/timestamp"
myMQTTClient = AWSIoTMQTTClient(CLIENT_ID)
myMQTTClient.configureEndpoint(IOT_ENDPOINT_URL, IOT_ENDPOINT_PORT)
myMQTTClient.configureCredentials(ROOT_CA_PATH, PRIVATE_KEY_PATH, CERTIFICATE_PATH)
myMQTTClient.configureOfflinePublishQueueing(-1)
myMQTTClient.configureDrainingFrequency(2)
myMQTTClient.configureConnectDisconnectTimeout(10)
myMQTTClient.configureMQTTOperationTimeout(5)
def customCallback(client, userdata, message):
dict_message = json.loads(message.payload)
print("num = {}, Time Difference = {}".format(dict_message['num'], time.time() - dict_message['time']))
if __name__ == '__main__':
myMQTTClient.connect(KEEP_ALIVE_TIME)
num = 0
while num < 12:
myMQTTClient.subscribe(TOPIC, 1, customCallback)
num += 1
time.sleep(5)
実行例
- Subscribeプログラムの実行例
- タイムラグが表示される
- プログラムの考慮点
- Publishプログラムで、送信するメッセージはjson形式を利用した。
- Subscribeプログラムで受信したjson形式を辞書型に戻している。
- XXXは利用環境に合わせてください。