参考サイト
MQTTの仕様
基本的な部分が説明してあり,MQTTについての理解が深まった.
プログラムを参考にしました.
https://www.sunbit.co.jp/blog/2019/11/21802/
https://qiita.com/rui0930/items/1d139248b440650dc952
利用準備
mosquittoブローカーをpythonで利用するため,pho-mqttを使用します.
pip install pho-mqtt
今回はbroker,subscriber,publisherの全て同じ端末で動かします.
端末情報
Linux mqtt-test 4.15.0-43-generic #46~16.04.1-Ubuntu SMP Fri Dec 7 13:31:08 UTC 2018 x86_64 x86_64 x86_64 GNU/Linux
brokerの起動
今回はlocalhostでの動作確認のためデフォルトでポート,IPアドレスで起動します.設定の変更やconfigの作成は必要ないので,下記のコマンドでbrokerを起動しておきます.
mosquitto
動作プログラムの説明
publisher,subscriberともに無限ループで動作させています.
publisherの送信データとして"Topic1"に対して"hello"をループで4回送信します.
その後は,freeコマンドを実行して取得したメモリ使用量などの情報を1分ごとに"Topic/Mem"に送信し続けるという内容になっています.
topicの階層構造を変化させることでsubscriberに送信するか変化させることができます.
subscriberのプログラムは購読したtopicのデータを無限ループで取得し続けます.
subscriber プログラム
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Subscriber
import paho.mqtt.client as mqtt
# mqtt broker
MQTT_HOST = 'localhost'
MQTT_PORT = 1883
MQTT_KEEP_ALIVE = 60
# broker connection
def on_connect(mqttc, obj, flags, rc):
print("rc:" + str(rc))
# receve message
def on_message(mqttc, obj, msg):
print(msg.topic + " " + str(msg.qos) + " " + str(msg.payload))
mqttc = mqtt.Client()
mqttc.on_message = on_message
mqttc.on_connect = on_connect
mqttc.connect(MQTT_HOST, MQTT_PORT, MQTT_KEEP_ALIVE)
# topic1 を購読
mqttc.subscribe("Topic1/#")
# ループ
mqttc.loop_forever()
publisher プログラム
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Publisher
from time import sleep
import subprocess
import paho.mqtt.client as mqtt
# MQTT Broker
MQTT_HOST = 'localhost'
MQTT_PORT = 1883
MQTT_KEEP_ALIVE = 60
MQTT_TOPIC = 'Topic1'
MQTT_SUB_TOPIC_MEM='Topic1/Mem'
def on_connect(mqttc, obj, flags, rc):
print("rc:" + str(rc))
def res_cmd(cmd):
return subprocess.Popen(cmd, stdout=subprocess.PIPE,shell=True).communicate()[0]
mqttc = mqtt.Client()
mqttc.on_connect = on_connect
# ブローカー接続
mqttc.connect(MQTT_HOST, MQTT_PORT, MQTT_KEEP_ALIVE)
# 処理開始
mqttc.loop_start()
for i in range(3):
mqttc.publish(MQTT_TOPIC, 'hello')
sleep(1)
cmd = 'free'
ans_str = res_cmd(cmd)
while True:
mqttc.publish(MQTT_SUB_TOPIC_MEM, ans_str)
sleep(60)
mqttc.disconnect()