2
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

ZeroMQを用いたブロードキャスト(マルチパートメッセージ)

Posted at

ZeroMQ(pyzmq)は使いやすいメッセージングモジュールです。

ZeroMQのPub-Sub パターンで、マルチパート・メッセージ (より複雑なデータをやり取りする場合、データの切れ目 をメッセージに含める)のプログラムの備忘録です。

Python3.5.3を用います。

ZeroMQのインストール、マルチパートなどの使い方については、Raspberry Pi3 と ZeroMQ でフォグ・コンピューティングの雰囲気を体験(前編)に詳しく書かれています。

pyzmqでは、socketクラスに send_multipart() という便利なメソッドが用意されています。

Python リストを渡すだけで、マルチパートのメッセージを送信できます。

受信でも recv_multipart() でリストを受け取れます。

ここでは、マルチパートとして、Topic(SUBSCRIBEのチャンネルになる)と辞書型データを送ります。
Topicは、1,2とします。これをランダムに選びます。

import concurrent.futures
import json
import random
import time

import zmq


def pub():
    host = '127.0.0.1'
    port = 5990

    context = zmq.Context()
    # PUBに指定します。
    sock = context.socket(zmq.PUB)
    sock.bind("tcp://{}:{}".format(host, port))
    
    #Sub側の受信準備を待つため
    time.sleep(1)
    
    id = 0
    while True:
        random_topic = str(random.randint(1, 2))
        id += 1
        msg = {'num': id, 'message': 'Hello'}
        # 送信メッセージはすべてバイト列にしなければならないので、文字列をバイト列に変換します。
        topic = random_topic.encode('utf-8')
        jmsg = json.dumps(msg).encode('utf-8')

        sock.send_multipart([topic, jmsg])

        print("Send:{}, {}".format(random_topic, str(msg)))
        time.sleep(1)


def sub_1():
    """
    Topicが’1’の時だけ受信します。
    """

    host = '127.0.0.1'
    port = 5990
    context = zmq.Context()

    # SUBに指定します。
    sock = context.socket(zmq.SUB)
    
    channel = '1'.encode('utf-8')
    # 必ずSUBSCRIBERを指定します。指定しないと受け取れません。
    sock.setsockopt(zmq.SUBSCRIBE, channel)
    sock.connect("tcp://{}:{}".format(host, port))

    while True:
        [topic, jmsg] = sock.recv_multipart()

        # バイト列から文字列に変更してからloadsしています。(3.6以降ではそのままでもOK)
        msg = json.loads(jmsg.decode())
        # 3.5では.decode()を取ってしまうとうまく受信できません

        print('received topic 1 : ', msg)
        print(msg['num'], msg['message'])

if __name__ == "__main__":
    executor = concurrent.futures.ThreadPoolExecutor(max_workers=2)
    executor.submit(sub_1)
    executor.submit(pub)

実行例

Send:'1', {'message': 'Hello', 'num': 1}
received topic 1 :  {'message': 'Hello', 'num': 1}
1 Hello
Send:'1', {'message': 'Hello', 'num': 2}
received topic 1 :  {'message': 'Hello', 'num': 2}
2 Hello
Send:'1', {'message': 'Hello', 'num': 3}
received topic 1 :  {'message': 'Hello', 'num': 3}
3 Hello
Send:'1', {'message': 'Hello', 'num': 4}
received topic 1 :  {'message': 'Hello', 'num': 4}
4 Hello
Send:'2', {'message': 'Hello', 'num': 5}
Send:'2', {'message': 'Hello', 'num': 6}
Send:'1', {'message': 'Hello', 'num': 7}
received topic 1 :  {'message': 'Hello', 'num': 7}
7 Hello
Send:'2', {'message': 'Hello', 'num': 8}

メモ

(1)Multithreadのプログラムとして、threading や multiprocessingではなく、 concurrent.futures を利用。

Python: concurrent.futures を使った並行・並列処理

これは、Python3.2 で追加された並行・並列処理用のパッケージ。
デフォルトでスレッド・プロセスプールが使えたり、マルチスレッドとマルチプロセスがほとんどコードを変えずに使い分けられる。

(2)Python3.5と3.6でjson.loads()の書きかた異なるので注意が必要。
Python3.6を境にjson.loads()の動きが変わっていたことで1週間ハマりました ToT
Python3.5以前はjson.loads()にbytes型を渡すとエラーとなる。
Python3.6以降はjson.loads()にbytes型を渡してもエラーとならない。

2
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?