###はじめに
ZeroMQのPUB-SUBパターンを利用していましたが、たまにメッセージの落としがあったので、(そのようなことが前提のメッセージング方式ですが)落としがないようにREQ-REPに変更しました。
その際に、PUB-SUBからREQ-REP簡単に入れ替えられるようにクラス化を行いました。その覚えとして書き留めます。
###環境
Windows10 Pro 64 bit
Anaconda
python 3.7
###クラス化
ZeroMQ(TCP/IP)通信では、通信の初期化(接続)、メッセージの送信、メッセージの受信、通信の切断があります。
まず、ひな形を親クラスで作成します。この親クラスを継承して、PUB 、SUB、REQ、REPクラスを作成します。
####親クラス
import zmq
class ConSock(object):
"""
ZeroMQ Super class
"""
def __init__(self, kind):
"""
kind:継承した際にそれぞれのメッセージング方式を入れます。
ex: zmq.REP
"""
self.context = zmq.Context()
self.sock = self.context.socket(kind)
def send(self, param):
if type(param) is list:
"""
multipart message
param:
[byte, byte,...]
"""
self.sock.send_multipart(param)
else:
"""
param: string
Pythonのユニコード文字列を、バイト列に変換して送信
"""
self.sock.send_string(param)
def recv(self, type='multipart'):
"""
type: 'multipart' or 'string'
"""
# print('Receive')
if type == 'multipart':
# recv_msg: [byte,byte...]
recv_msg = self.sock.recv_multipart()
elif type == 'string':
# recv_msg : string
recv_msg = self.sock.recv_string()
return recv_msg
def recv_string(self):
recv_msg = self.sock.recv_string()
return recv_msg
def recv_multi(self):
recv_msg = self.sock.recv_multipart()
return recv_msg
def close(self):
self.sock.close()
self.context.destroy()
####コードの説明
初期化(Constructor)でソケットを作成します。引数としてkindを入れておき、親クラスを継承して他のメッセージングパターを作るときにkindに通信方式を入力します。
sendメソッドについては、2つの方式を入れています。一つは、ただ文字列を送るString方式、もう一つは、リストの中にメッセージを入れて、リストを送るmalutipart方式です。
recvメソッドについては、送られてくるメッセージの方式が2つあるので、受ける方はそれに対応して2つ用意しています。recvメソッドの引数で送られてくるメッセージ方式を指定しています。そこを指定しないで、直接受け取る方式も念のために作っておきました。
メッセージの注意点は、stringで送るときは、そのままstringで送信しても問題ありません。(ZeroMQ側でbyteにして送信して、byteで受け取りstringに変換してくれます。)multipartでは、リスト内に入れる情報はbyteに変換しておかないとエラーになります。
送信受信方式についてはこちらの記事が参考になります。
Raspberry Pi3 と ZeroMQ でフォグ・コンピューティングの雰囲気を体験(前編)
####Pub-Subクラス
Pub-Sub方式では、一般にPubがサーバー側、Subがリスナー側になります。また、Pubは、メッセージ送信のみ、Sub側はメッセージ受信のみになります。
class Pub(ConSock):
"""
Pub-Sub type
Server side (bind side)
"""
def __init__(self, host, port):
super().__init__(zmq.PUB)
self.sock.bind("tcp://{}:{}".format(host, port))
class Sub(ConSock):
"""
Pub-Sub type
listener side
# Channelを指定すること
"""
def __init__(self, host, port, channel):
super().__init__(zmq.SUB)
self.channel = channel
channel_name = self.channel.encode('utf-8')
self.sock.setsockopt(zmq.SUBSCRIBE, channel_name)
self.sock.connect("tcp://{}:{}".format(host, port))
####コード説明
ConSock親クラスを継承します。オーバーライドしているは、初期化のところだけです。それ以外のところは、親クラスのメソッドをそのまま使います。
初期化のところでは、 親クラスの初期化メソッドを呼んできて、足りない部分を つけ足しています。(superクラスのkindにzmq.PUB)を入れてインスタンスを作っています。)
サーバー側はbind、リスナー側はconnectになっています。リスナー側は、サーバー側のHostとPortの値が入ります。
Subクラスでは、channelの指定をしなければならないので、初期化の引数にchannelが入っています。また、channelはbyteにしなければならないので、byteに変換しています。
###Req-Repクラス
Req-Rep方式では、一般にRepがサーバー側、Reqがクライアント側になります。また、メーセージはRepは受信-->送信、Reqは送信-->受信を行います。
class Rep(ConSock):
"""
REQ-REP type
Server side (bind side)
"""
# superクラスのkindにzmq.REPを入れてインスタンスを作っている
# それ以外で足りないところをコンストラクタでインスタンス作成
def __init__(self, host, port):
super().__init__(zmq.REP)
self.sock.bind("tcp://{}:{}".format(host, port))
class Req(ConSock):
"""
REQ-Res type
Client side
"""
def __init__(self, host, port):
super().__init__(zmq.REQ)
self.sock.connect("tcp://{}:{}".format(host, port))
####コード説明
ConSock親クラスを継承します。オーバーライドしているは、初期化のところだけです。それ以外のところは、親クラスのメソッドをそのまま使います。
初期化のところでは、 親クラスの初期化メソッドを呼んできて、足りない部分を つけ足しています。(superクラスのkindにzmq.REP)を入れてインスタンスを作っている。)
サーバー側はbind、クライアント側はconnectになっています。クライアント側は、サーバー側のHostとPortの値が入ります。
###全クラス化コードとテストコード
"""
ZeroMQ
Req-Rep, Pub-Sub
class
"""
import json
import random
import time
import zmq
class ConSock(object):
"""
ZeroMQ Super class
"""
def __init__(self, kind):
"""
kind:継承した際にそれぞれのメッセージング方式を入れます。
ex: zmq.REP
"""
self.context = zmq.Context()
self.sock = self.context.socket(kind)
def send(self, param):
if type(param) is list:
"""
multipart message
param:
[byte, byte,...]
"""
self.sock.send_multipart(param)
else:
"""
param: string
Pythonのユニコード文字列を、バイト列に変換して送信
"""
self.sock.send_string(param)
def recv(self, type='multipart'):
"""
type: 'multipart' or 'string'
"""
# print('Receive')
if type == 'multipart':
# recv_msg: [byte,byte...]
recv_msg = self.sock.recv_multipart()
elif type == 'string':
# recv_msg : string
recv_msg = self.sock.recv_string()
return recv_msg
def recv_string(self):
recv_msg = self.sock.recv_string()
return recv_msg
def recv_multi(self):
recv_msg = self.sock.recv_multipart()
return recv_msg
def close(self):
self.sock.close()
self.context.destroy()
class Rep(ConSock):
"""
REQ-REP type
Server side (bind side)
"""
# superクラスのkindにzmq.REPを入れてインスタンスを作っている
# それ以外で足りないところをコンストラクタでインスタンス作成
def __init__(self, host, port):
super().__init__(zmq.REP)
self.sock.bind("tcp://{}:{}".format(host, port))
class Req(ConSock):
"""
REQ-REP type
Client side
"""
def __init__(self, host, port):
super().__init__(zmq.REQ)
self.sock.connect("tcp://{}:{}".format(host, port))
class Pub(ConSock):
"""
Pub-Sub type
Server side (bind side)
"""
def __init__(self, host, port):
super().__init__(zmq.PUB)
self.sock.bind("tcp://{}:{}".format(host, port))
class Sub(ConSock):
"""
Pub-Sub type
listener side
# Channelを指定すること
"""
def __init__(self, host, port, channel):
super().__init__(zmq.SUB)
self.channel = channel
channel_name = self.channel.encode('utf-8')
self.sock.setsockopt(zmq.SUBSCRIBE, channel_name)
self.sock.connect("tcp://{}:{}".format(host, port))
"""
Pub-Sub test code
Pubはsendのみ
Subはrecvのみ
"""
def pub():
host = '127.0.0.1'
port = 5500
conn = Pub(host, port)
# Sub側の受信準備を待つため
time.sleep(1)
id = 0
for i in range(10):
random_topic = str(random.randint(1, 2))
id += 1
msg = {'num': id, 'message': 'Pub Hello'}
# 送信メッセージはすべてバイト列にしなければならないので、文字列をバイト列に変換します。
topic = random_topic.encode('utf-8')
jmsg = json.dumps(msg).encode('utf-8')
conn.send([topic, jmsg])
print("Send:{}, {}".format(random_topic, str(msg)))
time.sleep(1)
print('Pub server done')
def sub_1():
"""
Topicが’1’の時だけ受信
"""
host = '127.0.0.1'
port = 5500
conn = Sub(host, port, '1')
while True:
[topic, jmsg] = conn.recv()
# バイト列から文字列に変更してからloads
msg = json.loads(jmsg.decode())
print('received topic 1 : ', msg)
print(msg['num'], msg['message'])
"""
Req-Rep test code
Req-Resともにsend,recvが必要。
"""
def client():
host = '127.0.0.1'
# host = 'localhost'
port = 5600
conn = Req(host, port)
for id in range(10):
random_topic = str(random.randint(1, 2))
msg = {'num': id, 'message': 'Hello'}
# 送信メッセージはすべてバイト列にしなければならないので、文字列をバイト列に変換
topic = random_topic.encode('utf-8')
jmsg = json.dumps(msg).encode('utf-8')
conn.send([topic, jmsg])
print("Send:{}, {}".format(random_topic, str(msg)))
# Responseの受け取り
msg = conn.recv('string')
print('Response : ', msg)
time.sleep(1)
print('client done')
def server():
"""
Server
"""
host = '127.0.0.1'
# host = '*'
port = 5600
conn = Rep(host, port)
print("Server startup.")
while True:
[topic, jmsg] = conn.recv('multipart')
# バイト列から文字列に変更してからloads
# jmsgはJSONで送信されているため
msg = json.loads(jmsg.decode())
print('received topic : ', msg)
print(msg['num'], msg['message'])
# Responseの内容を文字列で送信
conn.send('OK')
if __name__ == "__main__":
"""
Req-Res test、Pub-Sub testを両方同時に試しても動作しますが、
個々の動きを確かめたい場合は、
Req-Res testを行いたいときは Pub-Sub testをコメントアウトしてください。
Pub-Sub testを行いたいときは Req-Rep testをコメントアウトしてください。
"""
import threading
# Req-Res test
thc = threading.Thread(target=client, args=()).start()
time.sleep(0.5)
# while loopで待ち受けている方は、daemonにしている。
ths = threading.Thread(target=server, args=(), daemon=True).start()
# Pub-Sub test
# while loopで待ち受けている方は、daemonにしている。
ths = threading.Thread(target=sub_1, args=(), daemon=True).start()
time.sleep(0.5)
thp = threading.Thread(target=pub, args=()).start()
####Req-Res test の結果
Send:1, {'num': 0, 'message': 'Hello'}
Server startup.
received topic : {'num': 0, 'message': 'Hello'}
0 Hello
Response : OK
Send:1, {'num': 1, 'message': 'Hello'}
received topic : {'num': 1, 'message': 'Hello'}
1 Hello
Response : OK
Send:2, {'num': 2, 'message': 'Hello'}
received topic : {'num': 2, 'message': 'Hello'}
2 Hello
Response : OK
Send:2, {'num': 3, 'message': 'Hello'}
received topic : {'num': 3, 'message': 'Hello'}
3 Hello
Response : OK
Send:1, {'num': 4, 'message': 'Hello'}
received topic : {'num': 4, 'message': 'Hello'}
4 Hello
Response : OK
Send:1, {'num': 5, 'message': 'Hello'}
received topic : {'num': 5, 'message': 'Hello'}
5 Hello
Response : OK
Send:2, {'num': 6, 'message': 'Hello'}
received topic : {'num': 6, 'message': 'Hello'}
6 Hello
Response : OK
Send:1, {'num': 7, 'message': 'Hello'}
received topic : {'num': 7, 'message': 'Hello'}
7 Hello
Response : OK
Send:1, {'num': 8, 'message': 'Hello'}
received topic : {'num': 8, 'message': 'Hello'}
8 Hello
Response : OK
Send:2, {'num': 9, 'message': 'Hello'}
received topic : {'num': 9, 'message': 'Hello'}
9 Hello
Response : OK
client done
####Pub-Sub testの結果
Send:2, {'num': 1, 'message': 'Pub Hello'}
Send:2, {'num': 2, 'message': 'Pub Hello'}
Send:1, {'num': 3, 'message': 'Pub Hello'}
received topic 1 : {'num': 3, 'message': 'Pub Hello'}
3 Pub Hello
Send:2, {'num': 4, 'message': 'Pub Hello'}
Send:2, {'num': 5, 'message': 'Pub Hello'}
Send:1, {'num': 6, 'message': 'Pub Hello'}
received topic 1 : {'num': 6, 'message': 'Pub Hello'}
6 Pub Hello
Send:2, {'num': 7, 'message': 'Pub Hello'}
Send:2, {'num': 8, 'message': 'Pub Hello'}
Send:1, {'num': 9, 'message': 'Pub Hello'}
received topic 1 : {'num': 9, 'message': 'Pub Hello'}
9 Pub Hello
Send:2, {'num': 10, 'message': 'Pub Hello'}
Pub server done