3
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

ZeroMQ PUB-SUB REQ-RESパターンとクラス化

Posted at

###はじめに

ZeroMQのPUB-SUBパターンを利用していましたが、たまにメッセージの落としがあったので、(そのようなことが前提のメッセージング方式ですが)落としがないようにREQ-REPに変更しました。
その際に、PUB-SUBからREQ-REP簡単に入れ替えられるようにクラス化を行いました。その覚えとして書き留めます。

###環境

Windows10 Pro 64 bit
Anaconda
python 3.7

###クラス化

ZeroMQ(TCP/IP)通信では、通信の初期化(接続)、メッセージの送信、メッセージの受信、通信の切断があります。
まず、ひな形を親クラスで作成します。この親クラスを継承して、PUB 、SUB、REQ、REPクラスを作成します。

####親クラス

import zmq

class ConSock(object):
    """
    ZeroMQ Super class
    """

    def __init__(self, kind):
        """
        kind:継承した際にそれぞれのメッセージング方式を入れます。
        ex: zmq.REP
        """
        self.context = zmq.Context()
        self.sock = self.context.socket(kind)

    def send(self, param):

        if type(param) is list:
            """
            multipart message
            param:
            [byte, byte,...]

            """
            self.sock.send_multipart(param)

        else:
            """
            param: string
            Pythonのユニコード文字列を、バイト列に変換して送信
            """
            self.sock.send_string(param)

    def recv(self, type='multipart'):
        """
        type: 'multipart' or 'string'

        """
        # print('Receive')

        if type == 'multipart':
            # recv_msg: [byte,byte...]
            recv_msg = self.sock.recv_multipart()

        elif type == 'string':
            # recv_msg : string
            recv_msg = self.sock.recv_string()

        return recv_msg

    def recv_string(self):
        recv_msg = self.sock.recv_string()
        return recv_msg

    def recv_multi(self):
        recv_msg = self.sock.recv_multipart()
        return recv_msg

    def close(self):
        self.sock.close()
        self.context.destroy()

####コードの説明

初期化(Constructor)でソケットを作成します。引数としてkindを入れておき、親クラスを継承して他のメッセージングパターを作るときにkindに通信方式を入力します。
sendメソッドについては、2つの方式を入れています。一つは、ただ文字列を送るString方式、もう一つは、リストの中にメッセージを入れて、リストを送るmalutipart方式です。
recvメソッドについては、送られてくるメッセージの方式が2つあるので、受ける方はそれに対応して2つ用意しています。recvメソッドの引数で送られてくるメッセージ方式を指定しています。そこを指定しないで、直接受け取る方式も念のために作っておきました。
メッセージの注意点は、stringで送るときは、そのままstringで送信しても問題ありません。(ZeroMQ側でbyteにして送信して、byteで受け取りstringに変換してくれます。)multipartでは、リスト内に入れる情報はbyteに変換しておかないとエラーになります。

送信受信方式についてはこちらの記事が参考になります。
Raspberry Pi3 と ZeroMQ でフォグ・コンピューティングの雰囲気を体験(前編)

####Pub-Subクラス

Pub-Sub方式では、一般にPubがサーバー側、Subがリスナー側になります。また、Pubは、メッセージ送信のみ、Sub側はメッセージ受信のみになります。

class Pub(ConSock):
    """
    Pub-Sub type
    Server side (bind side)

    """

    def __init__(self, host, port):
        super().__init__(zmq.PUB)
        self.sock.bind("tcp://{}:{}".format(host, port))


class Sub(ConSock):
    """
    Pub-Sub type
    listener side
    # Channelを指定すること
    """

    def __init__(self, host, port, channel):
        super().__init__(zmq.SUB)
        self.channel = channel
        channel_name = self.channel.encode('utf-8')
        self.sock.setsockopt(zmq.SUBSCRIBE, channel_name)
        self.sock.connect("tcp://{}:{}".format(host, port))

####コード説明

ConSock親クラスを継承します。オーバーライドしているは、初期化のところだけです。それ以外のところは、親クラスのメソッドをそのまま使います。
初期化のところでは、 親クラスの初期化メソッドを呼んできて、足りない部分を つけ足しています。(superクラスのkindにzmq.PUB)を入れてインスタンスを作っています。)
サーバー側はbind、リスナー側はconnectになっています。リスナー側は、サーバー側のHostとPortの値が入ります。
Subクラスでは、channelの指定をしなければならないので、初期化の引数にchannelが入っています。また、channelはbyteにしなければならないので、byteに変換しています。

###Req-Repクラス

Req-Rep方式では、一般にRepがサーバー側、Reqがクライアント側になります。また、メーセージはRepは受信-->送信、Reqは送信-->受信を行います。

class Rep(ConSock):
    """
    REQ-REP type
    Server side (bind side)
    """
    # superクラスのkindにzmq.REPを入れてインスタンスを作っている
    # それ以外で足りないところをコンストラクタでインスタンス作成

    def __init__(self, host, port):
        super().__init__(zmq.REP)
        self.sock.bind("tcp://{}:{}".format(host, port))


class Req(ConSock):
    """
    REQ-Res type
    Client side
    """

    def __init__(self, host, port):
        super().__init__(zmq.REQ)
        self.sock.connect("tcp://{}:{}".format(host, port))

####コード説明

ConSock親クラスを継承します。オーバーライドしているは、初期化のところだけです。それ以外のところは、親クラスのメソッドをそのまま使います。
初期化のところでは、 親クラスの初期化メソッドを呼んできて、足りない部分を つけ足しています。(superクラスのkindにzmq.REP)を入れてインスタンスを作っている。)
サーバー側はbind、クライアント側はconnectになっています。クライアント側は、サーバー側のHostとPortの値が入ります。

###全クラス化コードとテストコード

"""
ZeroMQ 
Req-Rep, Pub-Sub
class

"""
import json
import random
import time

import zmq


class ConSock(object):
    """
    ZeroMQ Super class
    """

    def __init__(self, kind):
        """
        kind:継承した際にそれぞれのメッセージング方式を入れます。
        ex: zmq.REP
        """
        self.context = zmq.Context()
        self.sock = self.context.socket(kind)

    def send(self, param):

        if type(param) is list:
            """
            multipart message
            param:
            [byte, byte,...]

            """
            self.sock.send_multipart(param)

        else:
            """
            param: string
            Pythonのユニコード文字列を、バイト列に変換して送信
            """
            self.sock.send_string(param)

    def recv(self, type='multipart'):
        """
        type: 'multipart' or 'string'

        """
        # print('Receive')

        if type == 'multipart':
            # recv_msg: [byte,byte...]
            recv_msg = self.sock.recv_multipart()

        elif type == 'string':
            # recv_msg : string
            recv_msg = self.sock.recv_string()

        return recv_msg

    def recv_string(self):
        recv_msg = self.sock.recv_string()
        return recv_msg

    def recv_multi(self):
        recv_msg = self.sock.recv_multipart()
        return recv_msg

    def close(self):
        self.sock.close()
        self.context.destroy()


class Rep(ConSock):
    """
    REQ-REP type
    Server side (bind side)
    """
    # superクラスのkindにzmq.REPを入れてインスタンスを作っている
    # それ以外で足りないところをコンストラクタでインスタンス作成

    def __init__(self, host, port):
        super().__init__(zmq.REP)
        self.sock.bind("tcp://{}:{}".format(host, port))


class Req(ConSock):
    """
    REQ-REP type
    Client side

    """

    def __init__(self, host, port):
        super().__init__(zmq.REQ)
        self.sock.connect("tcp://{}:{}".format(host, port))


class Pub(ConSock):
    """
    Pub-Sub type
    Server side (bind side)

    """

    def __init__(self, host, port):
        super().__init__(zmq.PUB)
        self.sock.bind("tcp://{}:{}".format(host, port))


class Sub(ConSock):
    """
    Pub-Sub type
    listener side
    # Channelを指定すること
    """

    def __init__(self, host, port, channel):
        super().__init__(zmq.SUB)
        self.channel = channel
        channel_name = self.channel.encode('utf-8')
        self.sock.setsockopt(zmq.SUBSCRIBE, channel_name)
        self.sock.connect("tcp://{}:{}".format(host, port))


"""
Pub-Sub test code
Pubはsendのみ
Subはrecvのみ

"""


def pub():
    host = '127.0.0.1'
    port = 5500

    conn = Pub(host, port)

    # Sub側の受信準備を待つため
    time.sleep(1)

    id = 0
    for i in range(10):
        random_topic = str(random.randint(1, 2))
        id += 1
        msg = {'num': id, 'message': 'Pub Hello'}
        # 送信メッセージはすべてバイト列にしなければならないので、文字列をバイト列に変換します。
        topic = random_topic.encode('utf-8')
        jmsg = json.dumps(msg).encode('utf-8')

        conn.send([topic, jmsg])

        print("Send:{}, {}".format(random_topic, str(msg)))
        time.sleep(1)

    print('Pub server done')


def sub_1():
    """
    Topicが’1’の時だけ受信
    """

    host = '127.0.0.1'
    port = 5500

    conn = Sub(host, port, '1')

    while True:
       	[topic, jmsg] = conn.recv()

        # バイト列から文字列に変更してからloads
        msg = json.loads(jmsg.decode())

        print('received topic 1 : ', msg)
        print(msg['num'], msg['message'])


        
"""
Req-Rep test code
Req-Resともにsend,recvが必要。
"""


def client():
    host = '127.0.0.1'
    # host = 'localhost'
    port = 5600

    conn = Req(host, port)

    for id in range(10):
        random_topic = str(random.randint(1, 2))
        msg = {'num': id, 'message': 'Hello'}
        # 送信メッセージはすべてバイト列にしなければならないので、文字列をバイト列に変換
        topic = random_topic.encode('utf-8')
        jmsg = json.dumps(msg).encode('utf-8')

        conn.send([topic, jmsg])

        print("Send:{}, {}".format(random_topic, str(msg)))

        # Responseの受け取り
        msg = conn.recv('string')

        print('Response : ', msg)

        time.sleep(1)
    print('client done')


def server():
    """
    Server
    """
    host = '127.0.0.1'
    # host = '*'
    port = 5600

    conn = Rep(host, port)
    print("Server startup.")

    while True:
        [topic, jmsg] = conn.recv('multipart')

        # バイト列から文字列に変更してからloads
        # jmsgはJSONで送信されているため
        msg = json.loads(jmsg.decode())

        print('received topic  : ', msg)
        print(msg['num'], msg['message'])
        # Responseの内容を文字列で送信
        conn.send('OK')


if __name__ == "__main__":
    """
    Req-Res test、Pub-Sub testを両方同時に試しても動作しますが、
    個々の動きを確かめたい場合は、
    Req-Res testを行いたいときは Pub-Sub testをコメントアウトしてください。
    Pub-Sub testを行いたいときは  Req-Rep testをコメントアウトしてください。
    """
    import threading
    # Req-Res test
    thc = threading.Thread(target=client, args=()).start()
    time.sleep(0.5)
    # while loopで待ち受けている方は、daemonにしている。
    ths = threading.Thread(target=server, args=(), daemon=True).start()

    # Pub-Sub test
    # while loopで待ち受けている方は、daemonにしている。
    ths = threading.Thread(target=sub_1, args=(), daemon=True).start()
    time.sleep(0.5)
    thp = threading.Thread(target=pub, args=()).start()

####Req-Res test の結果

Send:1, {'num': 0, 'message': 'Hello'}
Server startup.
received topic  :  {'num': 0, 'message': 'Hello'}
0 Hello       
Response :  OK
Send:1, {'num': 1, 'message': 'Hello'}
received topic  :  {'num': 1, 'message': 'Hello'}
1 Hello
Response :  OK
Send:2, {'num': 2, 'message': 'Hello'}
received topic  :  {'num': 2, 'message': 'Hello'}
2 Hello
Response :  OK
Send:2, {'num': 3, 'message': 'Hello'}
received topic  :  {'num': 3, 'message': 'Hello'}
3 Hello
Response :  OK
Send:1, {'num': 4, 'message': 'Hello'}
received topic  :  {'num': 4, 'message': 'Hello'}
4 Hello
Response :  OK
Send:1, {'num': 5, 'message': 'Hello'}
received topic  :  {'num': 5, 'message': 'Hello'}
5 Hello
Response :  OK
Send:2, {'num': 6, 'message': 'Hello'}
received topic  :  {'num': 6, 'message': 'Hello'}
6 Hello
Response :  OK
Send:1, {'num': 7, 'message': 'Hello'}
received topic  :  {'num': 7, 'message': 'Hello'}
7 Hello
Response :  OK
Send:1, {'num': 8, 'message': 'Hello'}
received topic  :  {'num': 8, 'message': 'Hello'}
8 Hello
Response :  OK
Send:2, {'num': 9, 'message': 'Hello'}
received topic  :  {'num': 9, 'message': 'Hello'}
9 Hello
Response :  OK
client done

####Pub-Sub testの結果

Send:2, {'num': 1, 'message': 'Pub Hello'}
Send:2, {'num': 2, 'message': 'Pub Hello'}
Send:1, {'num': 3, 'message': 'Pub Hello'}
received topic 1 :  {'num': 3, 'message': 'Pub Hello'}
3 Pub Hello
Send:2, {'num': 4, 'message': 'Pub Hello'}
Send:2, {'num': 5, 'message': 'Pub Hello'}
Send:1, {'num': 6, 'message': 'Pub Hello'}
received topic 1 :  {'num': 6, 'message': 'Pub Hello'}
6 Pub Hello
Send:2, {'num': 7, 'message': 'Pub Hello'}
Send:2, {'num': 8, 'message': 'Pub Hello'}
Send:1, {'num': 9, 'message': 'Pub Hello'}
received topic 1 :  {'num': 9, 'message': 'Pub Hello'}
9 Pub Hello
Send:2, {'num': 10, 'message': 'Pub Hello'}
Pub server done
3
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
3
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?