3
4

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

ZeroMQのDEALER - ROUTERパターンをPythonで試す

Last updated at Posted at 2019-04-09

ZeroMQのDEALER - ROUTERパターン

ZeroMQの利用パターンとして、DEALER - ROUTERパターンというものがあります。

Shared Queue (DEALER and ROUTER sockets)

セクションとしては、「共有キュー」となっていますね。

以下のようにクライアントがバックエンドに仕事を依頼するような構成を考えた場合に、

Request Distribution

間にメッセージキューを挟み込むことで、バックエンドのワーカーに関する情報を隠すことができるようです。

Request-Reply Broker

クライアントが増え、タスクが増えてバックエンドのワーカーを追加することになっても、クライアントは間にいるブローカーを把握すればよいというわけですね。

また、クライアントから見ると、REQ - REPパターンのようにリクエストに対してレスポンスが返ってくるモデルになります。

事実、クライアントはREQ、ワーカーはREPとなり、間にいるブローカーがDEALERとROUTERの役割を担います。

今回は、簡単なechoアプリケーションを作成して、DEALER - ROUTERパターンを試してみることにします。

言語には、Pythonを使用します。

環境

確認した環境は、こちらです。

$ lsb_release -a
No LSB modules are available.
Distributor ID:	Ubuntu
Description:	Ubuntu 18.04.2 LTS
Release:	18.04
Codename:	bionic


$ python3 -V
Python 3.6.7

ZeroMQライブラリのインストールとバージョン。

$ pip3 install pyzmq

Installing collected packages: pyzmq
Successfully installed pyzmq-18.0.1

この環境で確認していきたいと思います。

クライアントを書く

それでは、最初にクライアントを書いていきましょう。

client.py

import sys
import zmq

def start_client():
    context = zmq.Context()
    socket = context.socket(zmq.REQ)
    socket.connect("tcp://localhost:5556")

    while True:
        print("Enter message:")
        message = sys.stdin.readline()
        socket.send_string(message.strip())

        recv_message = socket.recv_string()
        print("Receive message = %s" % recv_message)

    socket.close()
    context.destroy()

if __name__ == "__main__":
    start_client()

標準入力からメッセージを読み込み、ブローカーへメッセージを送信するようにしています。

socket_typeはREQで、今回の接続先としては5556ポートを利用しています。

    socket = context.socket(zmq.REQ)
    socket.connect("tcp://localhost:5556")

ワーカーを書く

続いてワーカーを書きます。

worker.py

import sys
import zmq

def start_worker(worker_name):
    context = zmq.Context()
    socket = context.socket(zmq.REP)
    socket.connect("tcp://localhost:5557")

    print("Start worker: %s" % worker_name)

    while True:
        message = socket.recv_string()
        print("Receive message = %s" % message)

        reply_message = "Reply %s from %s" % (message, worker_name)

        socket.send_string(reply_message)

    socket.close()
    context.destroy()

if __name__ == "__main__":
    start_worker(sys.argv[1])

ワーカーのsocket_typeはREPとして作成します。また、接続先のポートは5557としました。

    socket = context.socket(zmq.REP)
    socket.connect("tcp://localhost:5557")

また、どのワーカーからメッセージが返ってきたかわかるようにするために、ワーカーの名前をリプライするメッセージに含めるようにしました。

        message = socket.recv_string()
        print("Receive message = %s" % message)

        reply_message = "Reply %s from %s" % (message, worker_name)

        socket.send_string(reply_message)

ワーカーの名前は、コマンドライン引数で指定します。

if __name__ == "__main__":
    start_worker(sys.argv[1])

ブローカーを書く

最後は、ブローカーです。

broker.py

import zmq

def start_broker():
    context = zmq.Context()
    router_socket = context.socket(zmq.ROUTER)
    dealer_socket = context.socket(zmq.DEALER)

    router_socket.bind("tcp://*:5556")
    dealer_socket.bind("tcp://*:5557")

    poller = zmq.Poller()
    poller.register(router_socket, zmq.POLLIN)
    poller.register(dealer_socket, zmq.POLLIN)

    print("Start broker")

    while True:
        socks = dict(poller.poll())

        if socks.get(router_socket) == zmq.POLLIN:
            message = router_socket.recv_multipart()
            dealer_socket.send_multipart(message)

        if socks.get(dealer_socket) == zmq.POLLIN:
            message = dealer_socket.recv_multipart()
            router_socket.send_multipart(message)

    router_socket.close()
    dealer_socket.close()
    context.destroy()

if __name__ == "__main__":
    start_broker()

ROUTERが5556ポートに、DEALERが5557ポートにバインドするようにしました。

    router_socket = context.socket(zmq.ROUTER)
    dealer_socket = context.socket(zmq.DEALER)

    router_socket.bind("tcp://*:5556")
    dealer_socket.bind("tcp://*:5557")

つまり、クライアント(REQ)が接続するのはROUTERで、ワーカー(REP)が接続するのはDEALERだということですね。

ブローカーは、この2つのソケットをポーリングして監視し、DEALER、ROUTERのどちらのソケットかを見分けて処理を行うことになります。

    poller = zmq.Poller()
    poller.register(router_socket, zmq.POLLIN)
    poller.register(dealer_socket, zmq.POLLIN)

    print("Start broker")

    while True:
        socks = dict(poller.poll())

        if socks.get(router_socket) == zmq.POLLIN:
            message = router_socket.recv_multipart()
            dealer_socket.send_multipart(message)

        if socks.get(dealer_socket) == zmq.POLLIN:
            message = dealer_socket.recv_multipart()
            router_socket.send_multipart(message)

今回は、どちらもメッセージを受信して別のソケットに送るだけですが…。

socket_typeでいくと、

  • REQ → ROUTER → DEALER
  • DEALER → REP → ROUTER → REQ

となるわけですね。

ちなみに、クライアントもワーカーもsocket.recv_stringsocket.send_stringで簡易に済ませているのですが、ブローカーは同じようにするエラーになったので、ガイドブックに習ってsocket.recv_multipartsocket.send_multipartを使うようにしました。

        if socks.get(router_socket) == zmq.POLLIN:
            message = router_socket.recv_multipart()
            dealer_socket.send_multipart(message)

        if socks.get(dealer_socket) == zmq.POLLIN:
            message = dealer_socket.recv_multipart()
            router_socket.send_multipart(message)

確認

それでは、動作確認してみましょう。

ブローカーを起動。

$ python3 broker.py 
Start broker

ワーカーを3つ起動。

$ python3 worker.py worker-1
Start worker: worker-1


$ python3 worker.py worker-2
Start worker: worker-2


$ python3 worker.py worker-3
Start worker: worker-3

クライアントを起動。

$ python3 client.py 
Enter message:

メッセージを送ってみます。

Enter message:
Hello   
Receive message = Reply Hello from worker-1
Enter message:
こんにちは
Receive message = Reply こんにちは from worker-2
Enter message:
こんにちは、世界
Receive message = Reply こんにちは、世界 from worker-3
Enter message:
Hello World
Receive message = Reply Hello World from worker-1
Enter message:

各ワーカーが処理した結果が返ってくるようですね。

順番を見る限りは、ラウンドロビン?

ワーカー側のログは、こんな感じです。

Start worker: worker-1
Receive message = Hello
Receive message = Hello World


Start worker: worker-2
Receive message = こんにちは


Start worker: worker-3
Receive message = こんにちは、世界

それっぽく動作がわかった気がします。

参考

ØMQガイドブック(日本語版)

ただ、ガイドブックはCのコードで、Pythonでどう書くかを調べるのにやや困ったのですが、ガイドブックとサンプルコードの読み方に途中で気づきました…。

ガイドブックに記載されているCのソースファイル名の拡張子を変えて、読み替えればいいんですね…。

3
4
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
3
4

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?