LoginSignup
2
2

More than 1 year has passed since last update.

ZeroMQを使ってPython-Node.js間でPUB/SUBする。

Last updated at Posted at 2021-06-16

ZeroMQとは

公式サイトはこちら

grpcとかddsとかと同じメッセージングミドルウェアです。
TCPベースで、他言語でのプロセス間でREQ-REPやPUB-SUBやPUSH-PULLなど様々な通信方式が手軽に実装でき、分散システムの構築に向いています。

実装

Windows10と、WSL2のUbuntu18.04上で相互に通信してみた。
思ったよりしっかり(お堅く)作られていて、変にハマってしまったので解決策と実装を共有する。
Node側はあっさり動いたがPython側がややこしい。

今回試したのは、サンプルによって"チャンネル"だったり"キー"だったり呼び方が様々だが、
所謂トピック通信で、kitty catsというトピックにメッセージをパブリッシュしている。

Node.js

  • Node.js 12.18.2
  • npm 6.14.6

Node.js側はこれを使った。npmでインストールできる。

最新版は6.xらしいが、とりあえず安定板の5.xで。
WSL2側でしか動かしていないが、意外とちゃんと動く。

サンプルのままだと、Bufferの形式のまま受け取って出力してしまうので、toString()で文字列にして出力する。

pub.js
var zmq = require('zeromq')
  , sock = zmq.socket('pub');

sock.bindSync('tcp://127.0.0.1:3000');
console.log('Publisher bound to port 3000');
sock.send(['kitty cats', 'meow!']);
var counter = 1;

setInterval(function(){
  console.log('sending a multipart message envelope');
  sock.send(['kitty cats', counter]);
  counter += 1;
}, 500);
sub.js
var zmq = require('zeromq')
  , sock = zmq.socket('sub');

sock.connect('tcp://127.0.0.1:3000');
sock.subscribe('kitty cats');
console.log('Subscriber connected to port 3000');

sock.on('message', function(topic, message) {
  console.log('received a message related to:', topic.toString(), 'containing message:', message.toString());
});

Python

  • Python 3.9.5

Pythonで使ったのはこれ。pipで普通に入れられた。

Pythonの方がややこしく、まず、普通に組むとWindows環境ではSubscriberのプロセスがCtrl-C(Keyboard Interrupt)で止められなくなる。
これはデフォルトでその仕様らしいが、使いにくいので色々調べると、allow_interrupt()というのを使えば止められるようだったのでそれを使うようにした。
これで実装すれば、エラーを吐きながらではあるがCtrl-Cでプロセスを止めることができる。
(きれいに止める必要はないのでこれで十分)

あと、time.sleep()で0.1秒待っているのは、PythonでのZeroMQが、ソケットをbind直後にメッセージを送信すると、必ず一度目は取りこぼすからである。
これもどうやら仕様らしく、本当はもっとちゃんとした書き方がありそうだが、今のところそこまでシビアな実装をする予定もないので、これで間に合わせた。
Node.jsの方はライブラリ内で一度目の送信に対して何か処理がされているのか、そのまま使っても一度目からメッセージは到達する。

おそらく、PUBを先に開始して、あとからSUBを起動するのが正しい手順(?)なんだと思うので、自分の使い方がイレギュラーなのかもしれない。

pub.py
import zmq
import time

ctx = zmq.Context()
sock = ctx.socket(zmq.PUB)
sock.bind("tcp://127.0.0.1:3000")

print("Starting loop...")
topic = "kitty cats"

i = 1
time.sleep(0.1)
while True:
    msg = "Hi for the %d:th time..." % i
    sock.send_string(f"{topic} {msg}")
    print("Sent string: %s ..." % msg)
    i += 1
    time.sleep(1)

sock.close()
ctx.term()
sub.py
import zmq
from zmq.utils.win32 import allow_interrupt
import time

ctx = zmq.Context()
sock = ctx.socket(zmq.SUB)
sock.setsockopt_string(zmq.SUBSCRIBE, "kitty cats")
def interrupt_polling():
    """Fix CTRL-C on Windows using "self pipe trick"."""
    # ctx.send_multipart(['', 'quit'])
    sock.close()
    ctx.term()

with allow_interrupt(interrupt_polling):
    sock.connect("tcp://127.0.0.1:3000")
    sock.subscribe("kitty cats")

    print("Starting receiver loop ...")
    while True:
        msg = sock.recv_string()
        print("Received string: %s ..." % msg)

    sock.close()
    ctx.term()

参考

2
2
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
2