2
1

More than 1 year has passed since last update.

IoT Core で 永続的セッションと QoS 1 を使ったメッセージの送受信方法を確認してみた

Last updated at Posted at 2022-09-10

はじめに

AWS IoT には、MQTT 永続的セッション (MQTT persistent sessions) と呼ばれる仕組みがあります。QoS レベル 1 を指定することで利用ができるものになります。IoT デバイスがなんらかの障害で通信が出来なかった期間にパブリッシュされたデータを再度取得することが出来ます。

文章だけだとよくわからないと思うので図解で説明をしてみます。まず、通常の IoT Core メッセージブローカを使ったメッセージのやりとりを見てみましょう。

image-20220910141341638.png

データを送信する IoT デバイス、IoT Core、データを受信する IoT デバイスがあったとします。データ送信元の IoT デバイスが定期的に IoT Core にセンサーデータをパブリッシュ (送信) します。データ送信先である IoT デバイスは、MQTT Topic をサブスクライブしているので、センサーデータを受信できます。

ここで、データ送信先デバイスになんらかの障害があったとします。ネットワーク的なエラーかもしれないですし、電源の一時的な不足かもしれないですし、様々な障害が起こりえます。

image-20220910141803771.png

赤色で書いている部分が、障害が発生している期間です。この間は、MQTT Topic をサブスクライブできていないので、その間のデータを受信できません。IoT Core の使い方の中には、なんらかの障害に時でもセンサーデータを受け取りたいときがあります。その時に、永続的セッションを使った仕組みを検討頂けます。(ほかにも Device Shadow や Job も検討できるのですが、ここでは取り上げません)

永続的セッションを使うと、なんらかの障害で受信できなかったメッセージを再度受信できます。永続的セッションを有効化した上で、送られてくる QoS 1 のメッセージを、AWS 側に一定期間保存してくれる機能となります。

image-20220910142919433.png

なお、永続的セッションを使った時に保存してくれる期間には上限があります。デフォルトは 1 時間となっており、7 日まで上限緩和の申請ができるようです。

image-20220910143203002.png

それでは、永続的セッションを実際にやってみましょう!

IoT デバイス : データ送信元

適当に Amazon Linux 2 の環境 (memo : Cloud9 : SugiCloud9Wotkspace) を IoT デバイスとして利用していきます。詳細な方法は他の Document などを参照頂くとして、クイックにコマンドを羅列します。

AWS IoT Device SDK Python v2 のインストールや、作業用ディレクトリの作成

pip3 install --user awsiotsdk
mkdir -p ~/environment/dummy_client/certs/
cd ~/environment/dummy_client/

AWS IoT Core 初級ハンズオンで公開されている Python のプログラムをダウンロード。これを使ってデータをパブリッシュしていきます。

wget https://awsj-iot-handson.s3-ap-northeast-1.amazonaws.com/aws-iot-core-workshop/dummy_client/device_main.py -O device_main.py

この Python のプログラムで重要なところを2つピックアップします。

329 行目あたりです。MQTTクライアントを生成しているのですが、この中で clean_session=False, を指定することで永続的セッションを利用することを宣言しています。

    mqtt_connection = mqtt_connection_builder.mtls_from_path(
        endpoint=iot_endpoint,
        cert_filepath=certificate_file,
        pri_key_filepath=private_key_file,
        client_bootstrap=client_bootstrap,
        ca_filepath=rootca_file,
        client_id=device_name,
        clean_session=False,
        keep_alive_secs=KEEP_ALIVE)

もう一つが、実際にメッセージをパブリッシュしている部分です。qos=mqtt.QoS.AT_LEAST_ONCE を指定することで、QoS 1 であることを示しており、受信側で PubAck が返されないと再送を試みます。

    while True:
        now = datetime.now().strftime('%Y-%m-%dT%H:%M:%S')
        tmp = 20 + random.randint(-5, 5)
        hum = 50 + random.randint(-10, 10)
        payload = {"DEVICE_NAME": device_name, "TIMESTAMP": now, "TEMPERATURE": tmp, "HUMIDITY": hum}
        logger.debug("  payload: %s", payload)

        mqtt_connection.publish(
            topic=topic,
            payload=json.dumps(payload),
            qos=mqtt.QoS.AT_LEAST_ONCE)

次に、IoT Core の設定や Thing の登録ですが、このあたりを参考に実施します。

その後、以下のようなコマンドを実行すると定期的にメッセージがパブリッシュされます。

cd ~/environment/dummy_client
python3 device_main.py --device_name qos-test01 --endpoint aa49ciipg36pc-ats.iot.ap-northeast-1.amazonaws.com

実行例

  • MQTT Topic data/qos-test01 にデータが流れている状態が出来ます
Admin:~/environment/dummy_client $ python3 device_main.py --device_name qos-test01 --endpoint aa49ciipg36pc-ats.iot.ap-northeast-1.amazonaws.com
AWS libcrypto resolve: searching process and loaded modules
AWS libcrypto resolve: found static aws-lc HMAC symbols
AWS libcrypto resolve: found static aws-lc libcrypto 1.1.1 EVP_MD symbols
device_name: qos-test01
endpoint: aa49ciipg36pc-ats.iot.ap-northeast-1.amazonaws.com
rootca cert: ./certs/AmazonRootCA1.pem
private key: ./certs/87f69b70ca-private.pem.key
certificate: ./certs/87f69b70ca-certificate.pem.crt
Check latest Shadow status
Subscribing to Shadow Delta events...
Thing has no shadow document. Creating with defaults...
un subscribe from get shadow events
Updating reported shadow to...
Update request published.
Subscribing to Shadow Update responses...
topic: data/qos-test01

受信テスト

ここまでの段階でマネージメントコンソールから Test してみます。このように受信できていることがわかります。

image-20220910145028198.png

IoT デバイス : データ送信先

これも同様に Amazon Linux 2 の環境を利用していきます。(memo : Stepbox)

さきほどの Python のプログラムを受信用にクイックに書き換えています。一応、全文を載せておきますが、読み飛ばして大丈夫です。

# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: MIT-0
#
# Permission is hereby granted, free of charge, to any person obtaining a copy of this
# software and associated documentation files (the "Software"), to deal in the Software
# without restriction, including without limitation the rights to use, copy, modify,
# merge, publish, distribute, sublicense, and/or sell copies of the Software, and to
# permit persons to whom the Software is furnished to do so.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED,
# INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
# PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
# HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
# OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
# SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

from __future__ import absolute_import
from __future__ import print_function

import argparse
import json
import logging
import os
import random
import signal
import sys
import time
import traceback
from datetime import datetime

from awscrt import io, mqtt
from awsiot import iotshadow, mqtt_connection_builder

# - Overview -
# This sample shows 1) how to connect AWS IoT Core. 2) How to use AWS IoT
# Device Shadow

BASE_TOPIC = "data/"
DEFAULT_WAIT_TIME = 5
SHADOW_WAIT_TIME_KEY = "wait_time"
KEEP_ALIVE = 300

mqtt_connection = None
shadow_client = None
wait_time = DEFAULT_WAIT_TIME
device_name = None

logger = logging.getLogger()
handler = logging.StreamHandler(sys.stdout)
logger.addHandler(handler)
logger.setLevel(logging.INFO)
logging.basicConfig()


def arg_check():
    """
    argument check
    """

    logging.debug("start: arg_check")
    parser = argparse.ArgumentParser()
    parser.add_argument("--device_name", required=True,
                        help="[Must], AWS IoT Core Thing Name")
    parser.add_argument("--endpoint", required=True,
                        help="[Must], AWS IoT endpoint URI")
    parser.add_argument("--root_ca", required=False,
                        help="AWS IoT Core root ca file name with path")
    parser.add_argument("--cert", required=False,
                        help="device cert file name with path")
    parser.add_argument("--private", required=False,
                        help="private cert key file name with path")
    parser.add_argument('--verbosity', choices=[x.name for x in io.LogLevel],
                        default=io.LogLevel.NoLogs.name, help='Logging level')

    args = parser.parse_args()

    log_level = getattr(io.LogLevel, args.verbosity, "error")
    io.init_logging(log_level, 'stderr')
    loglevel_map = [
        logging.INFO, logging.INFO, logging.INFO,
        logging.INFO, logging.INFO, logging.DEBUG,
        logging.DEBUG]
    logger.setLevel(loglevel_map[log_level])
    logging.basicConfig()

    cert_list = find_certs_file()
    if args.root_ca is not None:
        cert_list[0] = args.root_ca
    if args.private is not None:
        cert_list[1] = args.private
    if args.cert is not None:
        cert_list[2] = args.cert

    logging.debug(cert_list)
    file_exist_check(cert_list)

    init_dict = {
        "device_name": args.device_name,
        "endpoint": args.endpoint,
        "certs": cert_list
    }
    return init_dict


def file_exist_check(cert_list):
    """
    Check the files exists
    all certs must placed in ./certs directory

    Parameters
    ----------
    cert_list: Array
    """

    for file in cert_list:
        if not os.path.exists(file):
            # if file not found, raise
            logger.error("cert file not found:%s", file)
            raise RuntimeError("file_not_exists")


def find_certs_file():
    """
    Find the certificates file from ./certs directory

    Returns
    ----------
    file_list: Array
        0: Root CA Cert, 1: private key, 2: certificate
    """

    certs_dir = "./certs"
    file_list = ["AmazonRootCA1.pem", "private.pem", "certificate.crt"]
    for _, _, names in os.walk(certs_dir):
        for file in names:
            if "AmazonRootCA1.pem" in file:
                file_list[0] = certs_dir + "/" + file
            elif "private" in file:
                file_list[1] = certs_dir + "/" + file
            elif "certificate" in file:
                file_list[2] = certs_dir + "/" + file

    return file_list


def device_main():
    """
    main loop for dummy device
    """
    global device_name, mqtt_connection, shadow_client

    init_info = arg_check()
    device_name = init_info['device_name']
    iot_endpoint = init_info['endpoint']
    rootca_file = init_info['certs'][0]
    private_key_file = init_info['certs'][1]
    certificate_file = init_info['certs'][2]

    logger.info("device_name: %s", device_name)
    logger.info("endpoint: %s", iot_endpoint)
    logger.info("rootca cert: %s", rootca_file)
    logger.info("private key: %s", private_key_file)
    logger.info("certificate: %s", certificate_file)

    # Spin up resources
    event_loop_group = io.EventLoopGroup(1)
    host_resolver = io.DefaultHostResolver(event_loop_group)
    client_bootstrap = io.ClientBootstrap(event_loop_group, host_resolver)

    mqtt_connection = mqtt_connection_builder.mtls_from_path(
        endpoint=iot_endpoint,
        cert_filepath=certificate_file,
        pri_key_filepath=private_key_file,
        client_bootstrap=client_bootstrap,
        ca_filepath=rootca_file,
        client_id=device_name,
        clean_session=False,
        keep_alive_secs=KEEP_ALIVE)

    connected_future = mqtt_connection.connect()
    shadow_client = iotshadow.IotShadowClient(mqtt_connection)
    connected_future.result()

    topic = "data/qos-test01"
    logging.info("topic: %s", topic)
    while True:
        subscribe_future, packet_id = mqtt_connection.subscribe(
            topic=topic,
            qos=mqtt.QoS.AT_LEAST_ONCE,
            callback=on_message_received
        )
        subscribe_result = subscribe_future.result()

        time.sleep(wait_time)


def on_message_received(topic, payload, **kwargs):
    print("Received message from topic '{}': {}".format(topic, payload))


def exit_sample(msg_or_exception):
    """
    Exit sample with cleaning

    Parameters
    ----------
    msg_or_exception: str or Exception
    """
    if isinstance(msg_or_exception, Exception):
        logger.error("Exiting sample due to exception.")
        traceback.print_exception(
            msg_or_exception.__class__, msg_or_exception, sys.exc_info()[2])
    else:
        logger.info("Exiting: %s", msg_or_exception)

    if not mqtt_connection:
        logger.info("Disconnecting...")
        mqtt_connection.disconnect()
    sys.exit(0)


def exit_handler(_signal, frame):
    """
    Exit sample
    """
    exit_sample(" Key abort")


if __name__ == "__main__":
    signal.signal(signal.SIGINT, exit_handler)

    device_main()

重要なポイントをいくつか取り上げます。データ送信側と同様ですが、clean_session=False, にします。

    mqtt_connection = mqtt_connection_builder.mtls_from_path(
        endpoint=iot_endpoint,
        cert_filepath=certificate_file,
        pri_key_filepath=private_key_file,
        client_bootstrap=client_bootstrap,
        ca_filepath=rootca_file,
        client_id=device_name,
        clean_session=False,
        keep_alive_secs=KEEP_ALIVE)

MQTT Topic を "data/qos-test01" と指定したうえで、qos=mqtt.QoS.AT_LEAST_ONCE, を指定してサブスクライブします。これにより永続的セッションが有効になります。

    topic = "data/qos-test01"
    logging.info("topic: %s", topic)
    while True:
        subscribe_future, packet_id = mqtt_connection.subscribe(
            topic=topic,
            qos=mqtt.QoS.AT_LEAST_ONCE,
            callback=on_message_received
        )
        subscribe_result = subscribe_future.result()

        time.sleep(wait_time)

動作確認

まずデータ送信元で、以下のコマンドを使っている状態です。5秒おきに JSON メッセージがパブリッシュされます。

cd ~/environment/dummy_client
python3 device_main.py --device_name qos-test01 --endpoint aa49ciipg36pc-ats.iot.ap-northeast-1.amazonaws.com

次に、データ送信先で以下のコマンドを使って、サブスクライブします。

cd ~/temp/iot-qos-test/
python3 iot-subscribe-test.py --device_name qos-subscribe01 --endpoint aa49ciipg36pc-ats.iot.ap-northeast-1.amazonaws.com

実行例 : このように5秒をおきに JSON データが流れます。

Received message from topic 'data/qos-test01': b'{"DEVICE_NAME": "qos-test01", "TIMESTAMP": "2022-09-10T06:03:31", "TEMPERATURE": 22, "HUMIDITY": 58}'
Received message from topic 'data/qos-test01': b'{"DEVICE_NAME": "qos-test01", "TIMESTAMP": "2022-09-10T06:03:36", "TEMPERATURE": 20, "HUMIDITY": 49}'
Received message from topic 'data/qos-test01': b'{"DEVICE_NAME": "qos-test01", "TIMESTAMP": "2022-09-10T06:03:41", "TEMPERATURE": 21, "HUMIDITY": 54}'
Received message from topic 'data/qos-test01': b'{"DEVICE_NAME": "qos-test01", "TIMESTAMP": "2022-09-10T06:03:46", "TEMPERATURE": 23, "HUMIDITY": 60}'
Received message from topic 'data/qos-test01': b'{"DEVICE_NAME": "qos-test01", "TIMESTAMP": "2022-09-10T06:03:51", "TEMPERATURE": 18, "HUMIDITY": 41}'
Received message from topic 'data/qos-test01': b'{"DEVICE_NAME": "qos-test01", "TIMESTAMP": "2022-09-10T06:03:56", "TEMPERATURE": 20, "HUMIDITY": 45}'
Received message from topic 'data/qos-test01': b'{"DEVICE_NAME": "qos-test01", "TIMESTAMP": "2022-09-10T06:04:01", "TEMPERATURE": 19, "HUMIDITY": 56}'
Received message from topic 'data/qos-test01': b'{"DEVICE_NAME": "qos-test01", "TIMESTAMP": "2022-09-10T06:04:06", "TEMPERATURE": 24, "HUMIDITY": 53}'

ここで Python プログラムを停止して、2分ほど待機したあとに再度接続します。すると、以下のメッセージが表示されます。切断中に流れているメッセージを受信できます。

  • 順番はランダム。受信順とはならない
  • たまったデータは、1秒おきに 10 個ずつ受信できる
Received message from topic 'data/qos-test01': b'{"DEVICE_NAME": "qos-test01", "TIMESTAMP": "2022-09-10T06:04:21", "TEMPERATURE": 17, "HUMIDITY": 43}'
Received message from topic 'data/qos-test01': b'{"DEVICE_NAME": "qos-test01", "TIMESTAMP": "2022-09-10T06:04:31", "TEMPERATURE": 15, "HUMIDITY": 42}'
Received message from topic 'data/qos-test01': b'{"DEVICE_NAME": "qos-test01", "TIMESTAMP": "2022-09-10T06:04:41", "TEMPERATURE": 22, "HUMIDITY": 58}'
Received message from topic 'data/qos-test01': b'{"DEVICE_NAME": "qos-test01", "TIMESTAMP": "2022-09-10T06:04:36", "TEMPERATURE": 18, "HUMIDITY": 49}'
Received message from topic 'data/qos-test01': b'{"DEVICE_NAME": "qos-test01", "TIMESTAMP": "2022-09-10T06:04:11", "TEMPERATURE": 15, "HUMIDITY": 51}'
Received message from topic 'data/qos-test01': b'{"DEVICE_NAME": "qos-test01", "TIMESTAMP": "2022-09-10T06:04:51", "TEMPERATURE": 16, "HUMIDITY": 58}'
Received message from topic 'data/qos-test01': b'{"DEVICE_NAME": "qos-test01", "TIMESTAMP": "2022-09-10T06:04:26", "TEMPERATURE": 25, "HUMIDITY": 53}'
Received message from topic 'data/qos-test01': b'{"DEVICE_NAME": "qos-test01", "TIMESTAMP": "2022-09-10T06:04:16", "TEMPERATURE": 21, "HUMIDITY": 51}'
Received message from topic 'data/qos-test01': b'{"DEVICE_NAME": "qos-test01", "TIMESTAMP": "2022-09-10T06:04:46", "TEMPERATURE": 18, "HUMIDITY": 57}'
Received message from topic 'data/qos-test01': b'{"DEVICE_NAME": "qos-test01", "TIMESTAMP": "2022-09-10T06:04:56", "TEMPERATURE": 23, "HUMIDITY": 46}'
Received message from topic 'data/qos-test01': b'{"DEVICE_NAME": "qos-test01", "TIMESTAMP": "2022-09-10T06:05:21", "TEMPERATURE": 16, "HUMIDITY": 58}'
Received message from topic 'data/qos-test01': b'{"DEVICE_NAME": "qos-test01", "TIMESTAMP": "2022-09-10T06:05:11", "TEMPERATURE": 24, "HUMIDITY": 56}'
Received message from topic 'data/qos-test01': b'{"DEVICE_NAME": "qos-test01", "TIMESTAMP": "2022-09-10T06:05:01", "TEMPERATURE": 20, "HUMIDITY": 45}'
Received message from topic 'data/qos-test01': b'{"DEVICE_NAME": "qos-test01", "TIMESTAMP": "2022-09-10T06:05:26", "TEMPERATURE": 22, "HUMIDITY": 55}'
Received message from topic 'data/qos-test01': b'{"DEVICE_NAME": "qos-test01", "TIMESTAMP": "2022-09-10T06:05:46", "TEMPERATURE": 20, "HUMIDITY": 48}'
Received message from topic 'data/qos-test01': b'{"DEVICE_NAME": "qos-test01", "TIMESTAMP": "2022-09-10T06:05:36", "TEMPERATURE": 24, "HUMIDITY": 41}'
Received message from topic 'data/qos-test01': b'{"DEVICE_NAME": "qos-test01", "TIMESTAMP": "2022-09-10T06:05:06", "TEMPERATURE": 25, "HUMIDITY": 40}'
Received message from topic 'data/qos-test01': b'{"DEVICE_NAME": "qos-test01", "TIMESTAMP": "2022-09-10T06:05:16", "TEMPERATURE": 19, "HUMIDITY": 54}'
Received message from topic 'data/qos-test01': b'{"DEVICE_NAME": "qos-test01", "TIMESTAMP": "2022-09-10T06:05:31", "TEMPERATURE": 19, "HUMIDITY": 49}'
Received message from topic 'data/qos-test01': b'{"DEVICE_NAME": "qos-test01", "TIMESTAMP": "2022-09-10T06:05:41", "TEMPERATURE": 17, "HUMIDITY": 55}'

わかったこと

  • 永続セッションは、IoT デバイスがオフラインのときにパブリッシュされたメッセージを、一定時間 AWS 側に保管してくれる機能となる。IoT デバイスが再接続したときに保管されているメッセージを、後からサブスクライブ可能。

  • サブスクライブ側が、新規に接続する前に流れているメッセージは取得できない

    • あくまで、一回接続してから切断している期間のメッセージを後から取得可能な機能になる
  • 永続的セッションを使って保存しているメッセージの期間には、上限がある。デフォルトで1時間、7日までで調整可能

    • https://docs.aws.amazon.com/ja_jp/general/latest/gr/iot-core.html#message-broker-limits

    • メッセージブローカーが MQTT 永続的セッションを保存する期間。有効期間は、メッセージブローカーがセッションを検出し、切断されたときに開始されます。有効期間が経過すると、メッセージブローカーはセッションを終了し、関連付けられたキューメッセージを破棄します。1 時間から 7 日までで値を調整できます。

  • 1個の MQTT Topic に、複数の IoT デバイスが接続してるとき、それぞれの IoT デバイスごとに永続的セッションが保存される。

    • IoT デバイスの識別は、device id が使われる
  • 永続的セッションで読み込むメッセージは順不同。順番通りにはならない。

    • 例えば時刻のデータも合わせてパブリッシュすることで、デバイス側で調整可能

参考 URL

2
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
1