2
3

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

MQTT待ち受け方法改善

Posted at

はじめに

こちらは以下の記事の続きになります

【MQTT】AWS IoT CoreとSoracom Beamを使ってデバイスとお喋りしてみた

結論からですが、こちらの記事で書いたMQTT待ち受け方法では、安定して動作しませんでした。
ネットワークの問題があり、wifiや有線での環境であれば問題ない可能性はあります。

ただ、SoracomのSIMを使わない選択肢はなかったので
色々改善を試みた結果、以下の方法が一番安定して動作しました

安定版ソース

mqtt_sub.py
import paho.mqtt.client as mqtt
import subprocess
import json
import requests
import logging

LOG_DIR = '/var/log/fluent/'
LOG_FILE = LOG_DIR + 'aws_iot_sub.log'
LOG_FORMAT = '[%(asctime)s %(levelname)s] %(message)s'

SORACOM_METADATA_URL = 'http://metadata.soracom.io/v1/subscriber'

logging.basicConfig(filename=LOG_FILE, level=logging.DEBUG, format=LOG_FORMAT)
logger = logging.getLogger("logger")

def get_imsi():
    subscriber = json.loads(requests.get(SORACOM_METADATA_URL).text)
    return subscriber["imsi"]

def on_connect(client, userdata, flag, rc):
    logger.info("Connected with result code " + str(rc))
    device_imsi = get_imsi()

    logger.info("start subscribe : " + str(rc))
    client.subscribe(str(device_imsi), 1)

def on_disconnect(client, userdata, flag, rc):
    logger.error("Unexpected disconnection. rc: " + str(rc))

def on_message(client, userdata, message):
    msg_pay = json.loads(message.payload)["message"]
    logger.info('received data = ' + msg_pay)
    if msg_pay == "REBOOT":
        reboot = "sudo reboot"
        subprocess.call(reboot, shell=True)
    else:
        pass

def on_subscribe(client, userdata, mid, granted_qos):
    logger.info("on_subscribe mid : " + str(mid) + " granted_qos : " + str(granted_qos))

def on_unsubscribe(client, userdata, mid):
    logger.info("on_unsubscribe : " + str(mid))

def main():
    try:
        client = mqtt.Client()
        client.on_connect = on_connect
        client.on_disconnect = on_disconnect
        client.on_message = on_message
        client.on_subscribe = on_subscribe
        client.on_unsubscribe = on_unsubscribe

        client.connect("beam.soracom.io", 1883)

        logger.info("start loop forever")

     # ネットワークが切れた時のエラーハンドリングはpahoに任せる
        client.loop_forever()
    except Exception as err:
        logger.error("main exception: " + err)

if __name__ == "__main__":
    main()

改善前のだめだったところ

前提条件として、SoracomのSIMで通信を行うデバイスで動作させるプログラムです。
どのSIMでも同様だとは思いますが、SoracomのSIMでは、ネットワーク状況が悪かったり、セッションが切れることがあります。
その際のエラーハンドリングとして、以下対策を行なっていました。

while True:
    try:
        mqttClient.subscribe(str(imsi), 1, customCallback)
        time.sleep(1)
    except Exception as err:
        pass

電波が悪い際にsubscribeがExceptionとなりpythonが落ちるためtry/exceptでループするように実装していましたが、こちらが強引でした。

初めの何度かは想定通りに、再度subscribeを行うのですが、
何回も繰り返していくうちにプロセス自体が死んでしまい、待ち受けが行えない状態が発生していました。

エラーの内容として出力されているのは、

"Subscribe error:

のみでこれといった原因はわかりませんでした。

そのため、ネットワークのエラーが発生したとしても、自力で復旧してくれるライブラリを探しました。

改善後のいいところ

まず、ネットワークが悪い状況になってもプロセスは死なず、復旧すれば、再びMQTTを待ち受けできる状態となります。
それが数時間続いたとしても同様です。

今回は、自前で復旧する処理を実装したくなかったので、その観点でライブラリを選びました。

client.loop_forever()

とすれば、ネットワークが悪くなったとしても、自動で復旧制御をしてくれます

ここ数ヶ月運用していますが、特に問題は起こっていません。
AWS SDK(AWSIoTPythonSDK)を使っていて、安定しない場合は、pahoも検討してみてください。

よりMQTTライフを!!

2
3
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
3

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?