ZeroMQ(pyzmq)は使いやすいメッセージングモジュールです。
ZeroMQのPub-Sub パターンで、マルチパート・メッセージ (より複雑なデータをやり取りする場合、データの切れ目 をメッセージに含める)のプログラムの備忘録です。
Python3.5.3を用います。
ZeroMQのインストール、マルチパートなどの使い方については、Raspberry Pi3 と ZeroMQ でフォグ・コンピューティングの雰囲気を体験(前編)に詳しく書かれています。
pyzmqでは、socketクラスに send_multipart() という便利なメソッドが用意されています。
Python リストを渡すだけで、マルチパートのメッセージを送信できます。
受信でも recv_multipart() でリストを受け取れます。
ここでは、マルチパートとして、Topic(SUBSCRIBEのチャンネルになる)と辞書型データを送ります。
Topicは、1,2とします。これをランダムに選びます。
import concurrent.futures
import json
import random
import time
import zmq
def pub():
host = '127.0.0.1'
port = 5990
context = zmq.Context()
# PUBに指定します。
sock = context.socket(zmq.PUB)
sock.bind("tcp://{}:{}".format(host, port))
#Sub側の受信準備を待つため
time.sleep(1)
id = 0
while True:
random_topic = str(random.randint(1, 2))
id += 1
msg = {'num': id, 'message': 'Hello'}
# 送信メッセージはすべてバイト列にしなければならないので、文字列をバイト列に変換します。
topic = random_topic.encode('utf-8')
jmsg = json.dumps(msg).encode('utf-8')
sock.send_multipart([topic, jmsg])
print("Send:{}, {}".format(random_topic, str(msg)))
time.sleep(1)
def sub_1():
"""
Topicが’1’の時だけ受信します。
"""
host = '127.0.0.1'
port = 5990
context = zmq.Context()
# SUBに指定します。
sock = context.socket(zmq.SUB)
channel = '1'.encode('utf-8')
# 必ずSUBSCRIBERを指定します。指定しないと受け取れません。
sock.setsockopt(zmq.SUBSCRIBE, channel)
sock.connect("tcp://{}:{}".format(host, port))
while True:
[topic, jmsg] = sock.recv_multipart()
# バイト列から文字列に変更してからloadsしています。(3.6以降ではそのままでもOK)
msg = json.loads(jmsg.decode())
# 3.5では.decode()を取ってしまうとうまく受信できません
print('received topic 1 : ', msg)
print(msg['num'], msg['message'])
if __name__ == "__main__":
executor = concurrent.futures.ThreadPoolExecutor(max_workers=2)
executor.submit(sub_1)
executor.submit(pub)
実行例
Send:'1', {'message': 'Hello', 'num': 1}
received topic 1 : {'message': 'Hello', 'num': 1}
1 Hello
Send:'1', {'message': 'Hello', 'num': 2}
received topic 1 : {'message': 'Hello', 'num': 2}
2 Hello
Send:'1', {'message': 'Hello', 'num': 3}
received topic 1 : {'message': 'Hello', 'num': 3}
3 Hello
Send:'1', {'message': 'Hello', 'num': 4}
received topic 1 : {'message': 'Hello', 'num': 4}
4 Hello
Send:'2', {'message': 'Hello', 'num': 5}
Send:'2', {'message': 'Hello', 'num': 6}
Send:'1', {'message': 'Hello', 'num': 7}
received topic 1 : {'message': 'Hello', 'num': 7}
7 Hello
Send:'2', {'message': 'Hello', 'num': 8}
メモ
(1)Multithreadのプログラムとして、threading や multiprocessingではなく、 concurrent.futures を利用。
Python: concurrent.futures を使った並行・並列処理
これは、Python3.2 で追加された並行・並列処理用のパッケージ。
デフォルトでスレッド・プロセスプールが使えたり、マルチスレッドとマルチプロセスがほとんどコードを変えずに使い分けられる。
(2)Python3.5と3.6でjson.loads()の書きかた異なるので注意が必要。
Python3.6を境にjson.loads()の動きが変わっていたことで1週間ハマりました ToT
Python3.5以前はjson.loads()にbytes型を渡すとエラーとなる。
Python3.6以降はjson.loads()にbytes型を渡してもエラーとならない。