ZeroMQのDEALER - ROUTERパターン
ZeroMQの利用パターンとして、DEALER - ROUTERパターンというものがあります。
Shared Queue (DEALER and ROUTER sockets)
セクションとしては、「共有キュー」となっていますね。
以下のようにクライアントがバックエンドに仕事を依頼するような構成を考えた場合に、
間にメッセージキューを挟み込むことで、バックエンドのワーカーに関する情報を隠すことができるようです。
クライアントが増え、タスクが増えてバックエンドのワーカーを追加することになっても、クライアントは間にいるブローカーを把握すればよいというわけですね。
また、クライアントから見ると、REQ - REPパターンのようにリクエストに対してレスポンスが返ってくるモデルになります。
事実、クライアントはREQ、ワーカーはREPとなり、間にいるブローカーがDEALERとROUTERの役割を担います。
今回は、簡単なechoアプリケーションを作成して、DEALER - ROUTERパターンを試してみることにします。
言語には、Pythonを使用します。
環境
確認した環境は、こちらです。
$ lsb_release -a
No LSB modules are available.
Distributor ID: Ubuntu
Description: Ubuntu 18.04.2 LTS
Release: 18.04
Codename: bionic
$ python3 -V
Python 3.6.7
ZeroMQライブラリのインストールとバージョン。
$ pip3 install pyzmq
Installing collected packages: pyzmq
Successfully installed pyzmq-18.0.1
この環境で確認していきたいと思います。
クライアントを書く
それでは、最初にクライアントを書いていきましょう。
client.py
import sys
import zmq
def start_client():
context = zmq.Context()
socket = context.socket(zmq.REQ)
socket.connect("tcp://localhost:5556")
while True:
print("Enter message:")
message = sys.stdin.readline()
socket.send_string(message.strip())
recv_message = socket.recv_string()
print("Receive message = %s" % recv_message)
socket.close()
context.destroy()
if __name__ == "__main__":
start_client()
標準入力からメッセージを読み込み、ブローカーへメッセージを送信するようにしています。
socket_type
はREQで、今回の接続先としては5556
ポートを利用しています。
socket = context.socket(zmq.REQ)
socket.connect("tcp://localhost:5556")
ワーカーを書く
続いてワーカーを書きます。
worker.py
import sys
import zmq
def start_worker(worker_name):
context = zmq.Context()
socket = context.socket(zmq.REP)
socket.connect("tcp://localhost:5557")
print("Start worker: %s" % worker_name)
while True:
message = socket.recv_string()
print("Receive message = %s" % message)
reply_message = "Reply %s from %s" % (message, worker_name)
socket.send_string(reply_message)
socket.close()
context.destroy()
if __name__ == "__main__":
start_worker(sys.argv[1])
ワーカーのsocket_type
はREPとして作成します。また、接続先のポートは5557
としました。
socket = context.socket(zmq.REP)
socket.connect("tcp://localhost:5557")
また、どのワーカーからメッセージが返ってきたかわかるようにするために、ワーカーの名前をリプライするメッセージに含めるようにしました。
message = socket.recv_string()
print("Receive message = %s" % message)
reply_message = "Reply %s from %s" % (message, worker_name)
socket.send_string(reply_message)
ワーカーの名前は、コマンドライン引数で指定します。
if __name__ == "__main__":
start_worker(sys.argv[1])
ブローカーを書く
最後は、ブローカーです。
broker.py
import zmq
def start_broker():
context = zmq.Context()
router_socket = context.socket(zmq.ROUTER)
dealer_socket = context.socket(zmq.DEALER)
router_socket.bind("tcp://*:5556")
dealer_socket.bind("tcp://*:5557")
poller = zmq.Poller()
poller.register(router_socket, zmq.POLLIN)
poller.register(dealer_socket, zmq.POLLIN)
print("Start broker")
while True:
socks = dict(poller.poll())
if socks.get(router_socket) == zmq.POLLIN:
message = router_socket.recv_multipart()
dealer_socket.send_multipart(message)
if socks.get(dealer_socket) == zmq.POLLIN:
message = dealer_socket.recv_multipart()
router_socket.send_multipart(message)
router_socket.close()
dealer_socket.close()
context.destroy()
if __name__ == "__main__":
start_broker()
ROUTERが5556
ポートに、DEALERが5557
ポートにバインドするようにしました。
router_socket = context.socket(zmq.ROUTER)
dealer_socket = context.socket(zmq.DEALER)
router_socket.bind("tcp://*:5556")
dealer_socket.bind("tcp://*:5557")
つまり、クライアント(REQ
)が接続するのはROUTERで、ワーカー(REP
)が接続するのはDEALERだということですね。
ブローカーは、この2つのソケットをポーリングして監視し、DEALER、ROUTERのどちらのソケットかを見分けて処理を行うことになります。
poller = zmq.Poller()
poller.register(router_socket, zmq.POLLIN)
poller.register(dealer_socket, zmq.POLLIN)
print("Start broker")
while True:
socks = dict(poller.poll())
if socks.get(router_socket) == zmq.POLLIN:
message = router_socket.recv_multipart()
dealer_socket.send_multipart(message)
if socks.get(dealer_socket) == zmq.POLLIN:
message = dealer_socket.recv_multipart()
router_socket.send_multipart(message)
今回は、どちらもメッセージを受信して別のソケットに送るだけですが…。
socket_type
でいくと、
- REQ → ROUTER → DEALER
- DEALER → REP → ROUTER → REQ
となるわけですね。
ちなみに、クライアントもワーカーもsocket.recv_string
、socket.send_string
で簡易に済ませているのですが、ブローカーは同じようにするエラーになったので、ガイドブックに習ってsocket.recv_multipart
、socket.send_multipart
を使うようにしました。
if socks.get(router_socket) == zmq.POLLIN:
message = router_socket.recv_multipart()
dealer_socket.send_multipart(message)
if socks.get(dealer_socket) == zmq.POLLIN:
message = dealer_socket.recv_multipart()
router_socket.send_multipart(message)
確認
それでは、動作確認してみましょう。
ブローカーを起動。
$ python3 broker.py
Start broker
ワーカーを3つ起動。
$ python3 worker.py worker-1
Start worker: worker-1
$ python3 worker.py worker-2
Start worker: worker-2
$ python3 worker.py worker-3
Start worker: worker-3
クライアントを起動。
$ python3 client.py
Enter message:
メッセージを送ってみます。
Enter message:
Hello
Receive message = Reply Hello from worker-1
Enter message:
こんにちは
Receive message = Reply こんにちは from worker-2
Enter message:
こんにちは、世界
Receive message = Reply こんにちは、世界 from worker-3
Enter message:
Hello World
Receive message = Reply Hello World from worker-1
Enter message:
各ワーカーが処理した結果が返ってくるようですね。
順番を見る限りは、ラウンドロビン?
ワーカー側のログは、こんな感じです。
Start worker: worker-1
Receive message = Hello
Receive message = Hello World
Start worker: worker-2
Receive message = こんにちは
Start worker: worker-3
Receive message = こんにちは、世界
それっぽく動作がわかった気がします。
参考
ただ、ガイドブックはCのコードで、Pythonでどう書くかを調べるのにやや困ったのですが、ガイドブックとサンプルコードの読み方に途中で気づきました…。
ガイドブックに記載されているCのソースファイル名の拡張子を変えて、読み替えればいいんですね…。