LoginSignup
3

More than 3 years have passed since last update.

Pythonによる簡易的なPub/Subプログラムのメモ

Posted at

参考サイト

MQTTの仕様
基本的な部分が説明してあり,MQTTについての理解が深まった.

プログラムを参考にしました.
https://www.sunbit.co.jp/blog/2019/11/21802/
https://qiita.com/rui0930/items/1d139248b440650dc952

利用準備

mosquittoブローカーをpythonで利用するため,pho-mqttを使用します.

pip install pho-mqtt

今回はbroker,subscriber,publisherの全て同じ端末で動かします.

端末情報
Linux mqtt-test 4.15.0-43-generic #46~16.04.1-Ubuntu SMP Fri Dec 7 13:31:08 UTC 2018 x86_64 x86_64 x86_64 GNU/Linux

brokerの起動
今回はlocalhostでの動作確認のためデフォルトでポート,IPアドレスで起動します.設定の変更やconfigの作成は必要ないので,下記のコマンドでbrokerを起動しておきます.

mosquitto

動作プログラムの説明
publisher,subscriberともに無限ループで動作させています.
publisherの送信データとして"Topic1"に対して"hello"をループで4回送信します.
その後は,freeコマンドを実行して取得したメモリ使用量などの情報を1分ごとに"Topic/Mem"に送信し続けるという内容になっています.
topicの階層構造を変化させることでsubscriberに送信するか変化させることができます.

subscriberのプログラムは購読したtopicのデータを無限ループで取得し続けます.

subscriber プログラム

 #!/usr/bin/env python
 # -*- coding: utf-8 -*-

#Subscriber
import paho.mqtt.client as mqtt

#mqtt broker
MQTT_HOST = 'localhost'
MQTT_PORT = 1883
MQTT_KEEP_ALIVE = 60

#broker connection
def on_connect(mqttc, obj, flags, rc):
    print("rc:" + str(rc))

#receve message
def on_message(mqttc, obj, msg):
    print(msg.topic + " " + str(msg.qos) + " " + str(msg.payload))

mqttc = mqtt.Client()
mqttc.on_message = on_message
mqttc.on_connect = on_connect
mqttc.connect(MQTT_HOST, MQTT_PORT, MQTT_KEEP_ALIVE)


#topic1 を購読
mqttc.subscribe("Topic1/#")

#ループ
mqttc.loop_forever()

publisher プログラム

 #!/usr/bin/env python
 # -*- coding: utf-8 -*-

#Publisher
from time import sleep
import subprocess
import paho.mqtt.client as mqtt

#MQTT Broker
MQTT_HOST = 'localhost'
MQTT_PORT = 1883
MQTT_KEEP_ALIVE = 60
MQTT_TOPIC = 'Topic1'
MQTT_SUB_TOPIC_MEM='Topic1/Mem'

def on_connect(mqttc, obj, flags, rc):
    print("rc:" + str(rc))

def res_cmd(cmd):
    return subprocess.Popen(cmd, stdout=subprocess.PIPE,shell=True).communicate()[0]

mqttc = mqtt.Client()
mqttc.on_connect = on_connect

#ブローカー接続
mqttc.connect(MQTT_HOST, MQTT_PORT, MQTT_KEEP_ALIVE)
#処理開始
mqttc.loop_start()
for i in range(3):
    mqttc.publish(MQTT_TOPIC, 'hello')
    sleep(1)

cmd = 'free'
ans_str = res_cmd(cmd)
while True:
    mqttc.publish(MQTT_SUB_TOPIC_MEM, ans_str)
    sleep(60)

mqttc.disconnect()

実行結果

subscriber
スクリーンショット 2020-06-25 20.10.55.png

publisher
スクリーンショット 2020-06-25 20.10.39.png

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
3