0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

MQTT5 Response TopicとCorrelation Data

Last updated at Posted at 2025-12-16

はじめに

今回はMQTT5でHTTPのようにRequest/Responseを実現するための規格仕様を見ていきましょう。

Request/Responseを実現する

Response Topic (Publish)

Response TopicはPublishのプロパティに付く「返送先」トピック名です。Publishに返送先を指定することでhttpやgRPCのようにRequest/Responseのニーズに応えることができます。

Correlation Data (Publish)

Correlation Dataは、どのリクエストからの応答か区別するために付与するIDを入れるための領域です。HTTPではx-request-idがその役割を果たします。レスポンスを返す主体は受け取ったままのCorrelation Dataを返すことが期待されています。改ざんを防ぐ場合にはハッシュ化や暗号化の手段を取る必要があります。

Response Information (Connack)

Response InformationはConnackに含めることができるResponse Topicを作成するbaseとなる文字列です。Response Topicをランダムに生成することは権限上できない場合が多いのでResponse Informationでクライアントに権限があるtreeのprefixを教える。

Request Response Information (Connect)

Response Informationを要求するかどうかをConnect時にクライアントが決定して送信する。

使用する順番

  1. ConnectでResponse Informationを要求する
  2. ConnackでResponse InformationをUTF-8 Encoded Stringで返す
  3. PublishでResponse Informationに基づいてResponse Topic(返送用トピック)を指定する
  4. PublishでCorrelation Dataを指定する

  1. ConnectでRequest Response Informationを要求する
  2. Connackでresponse/client123/をResponse Topicのprefixとして使ってくれということをサーバー(レスポンダ, 処理者)から通知される
  3. PublishでTopicにcmd/getStatusに対してResponse Topicとしてresponse/client123/resp、Correlation Dataとしてreq-001を入れて送信する
  4. サーバ(レスポンダ、処理者)は処理が完了したあとにresponse/client123/respに対しCorrelation Dataとしてreq-001を設定してPublishを行う

実践

  • 例では、サーバ側に1日目で使用したものを使います。

  • mochi-coはResponse Informationに対応していませんでしたのでResponse Topicはあらかじめ決めておいたprefixを使用することにします。

responder

responderはResponse Topicにあるトピックに対して中身に応じて返答します
リクエストとレスポンスの結びつきがわかるようにCorrelation Dataはそのまま返します。

import paho.mqtt.client as mqtt
from paho.mqtt.properties import Properties
from paho.mqtt.packettypes import PacketTypes
import time

BROKER = "localhost"
REQUEST_TOPIC = "cmd/#"

def on_message(client, userdata, msg):
    print(f"[Responder] 受信: {msg.topic}{msg.payload.decode()}")

    props = msg.properties

    # Response Topic なければ応答不要
    if not hasattr(props, "ResponseTopic") or props.ResponseTopic is None:
        print("[Responder] Response Topic が無いので応答しません")
        return

    response_topic = props.ResponseTopic
    correlation = props.CorrelationData

    print(f"[Responder] Response Topic = {response_topic}")
    print(f"[Responder] Correlation Data = {correlation}")

    # 応答メッセージ
    resp_payload = f"OK: {msg.payload.decode()}"

    # 応答側 Properties
    resp_props = Properties(PacketTypes.PUBLISH)
    # Correlation Data をそのままコピー
    resp_props.CorrelationData = correlation

    client.publish(
        response_topic,
        resp_payload,
        qos=0,
        properties=resp_props
    )
    print(f"[Responder] 応答送信 → {resp_payload}")


def main():
    cli = mqtt.Client(client_id="responder", protocol=mqtt.MQTTv5)
    cli.on_message = on_message

    cli.connect(BROKER, 1883, 60)
    cli.subscribe(REQUEST_TOPIC)

    print("[Responder] 起動しました")
    cli.loop_forever()


if __name__ == "__main__":
    main()

requester

Requester

  • Correlation Dataを生成
  • Response Topicを設定
  • Payloadには処理してほしいrpcのもとになるデータを送信
import paho.mqtt.client as mqtt
from paho.mqtt.packettypes import PacketTypes
from paho.mqtt.properties import Properties
import time

BROKER = "localhost"
REQUEST_TOPIC = "cmd/get"

# デフォルトの Response Topic(サーバーが Response Information を返さなかった時)
fallback_response_topic = "resp/client1"


# ======================
# コールバック
# ======================

def on_connect(client, userdata, flags, reason_code, properties):
    print(f"[Requester] CONNECT: rc={reason_code}")

    # Response Information が来た場合
    if hasattr(properties, "ResponseInformation") and properties.ResponseInformation:
        base = properties.ResponseInformation
        userdata["response_topic"] = base + "resp"
        print(f"[Requester] Response Information = {base}")
    else:
        print("[Requester] Response Information が無いので fallback を使用")
        userdata["response_topic"] = fallback_response_topic

    # Response Topic に subscribe
    resp = userdata["response_topic"]
    client.subscribe(resp)
    print(f"[Requester] SUBSCRIBE → {resp}")


def on_message(client, userdata, msg):
    print(f"[Requester] 受信: {msg.topic}{msg.payload.decode()}")
    print(f"[Requester] Correlation Data = {msg.properties.CorrelationData}")


# ======================
# メイン処理
# ======================

def main():
    userdata = {}

    # CONNECT properties: Request Response Information = 1
    connect_props = Properties(PacketTypes.CONNECT)
    connect_props.RequestResponseInformation = 1

    cli = mqtt.Client(
        client_id="requester",
        protocol=mqtt.MQTTv5,
        userdata=userdata
    )
    cli.on_connect = on_connect
    cli.on_message = on_message

    # CONNECT
    cli.connect(BROKER, 1883, 60, properties=connect_props)

    cli.loop_start()
    time.sleep(1)  # 接続待ち

    # Response Topic をコールバックで設定済み
    response_topic = userdata["response_topic"]

    # Correlation Data (任意のバイナリ)
    corr = b"req-001"

    # Request Properties
    props = Properties(PacketTypes.PUBLISH)
    props.ResponseTopic = response_topic
    props.CorrelationData = corr

    payload = "Hello Request"

    print(f"[Requester] Request 送信 → {payload}")
    cli.publish(
        REQUEST_TOPIC,
        payload,
        qos=0,
        properties=props
    )

    time.sleep(3)
    cli.loop_stop()
    cli.disconnect()


if __name__ == "__main__":
    main()

実行結果をWiresharkで見た結果です。

要求Publish

Correlation DataにValueが入っています。Response Topicも指定されています。
スクリーンショット 2025-11-23 23.05.41.jpg

応答Publish

応答のCorrelationDataの中身も下記のようになります。
スクリーンショット 2025-11-23 23.04.30.jpg

まとめ

うまく使えば、Response TopicとCorrelation DataでRequest/Responseが実装できそうですね

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?