LoginSignup
3
2

More than 5 years have passed since last update.

nanomsgのpub/subとtornadoを使うとハマる!!(俺が

Posted at

お客様(=神様)から、websocketで受け取ったメッセージをnanomsgで引きずり回したい!と言われたらそういうコードを書くしか無い、我ら受託エンジニア。

websocetとnanomsgを両方使うならtornadoで非同期コードを管理したいな、と思うのは当然至極。
nanomsgは以下のコードでソケットのfile descriptorが取れるので、これをioloopにぶち込めばnon-blockingなコードが書けてウハウハだ!!
(nanomsgのドライバにはnnpyを使っている)

_socket.getsockopt(nnpy.SOL_SOCKET, nnpy.RCVFD)

と、こんなコードを書いてみた。(長いのでgist)

topicを指定しない場合は

0.001: publish> even0
0.001: on_message
0.001: subscribe> b'even0'
1.006: publish> odd 1
1.006: on_message
1.006: subscribe> b'odd 1'
2.011: publish> even2
2.011: on_message

うまく動く。
気を良くして、topicを'even'に指定してみると...

0.001: publish> even0
0.001: on_message
0.001: subscribe> b'even0'
1.005: publish> odd 1
1.005: on_message
ERROR:tornado.application:Exception in callback (<__main__.Subscriber object at 0x109f87a58>, <function wrap.<locals>.null_wrapper at 0x10a8169d8>)
Traceback (most recent call last):
  File "/Users/shn/Documents/projects/mogemoge/.env35/lib/python3.5/site-packages/tornado/ioloop.py", line 887, in start
    handler_func(fd_obj, events)
  File "/Users/shn/Documents/projects/mogemoge/.env35/lib/python3.5/site-packages/tornado/stack_context.py", line 275, in null_wrapper
    return fn(*args, **kwargs)
  File "test_pubsub.py", line 38, in on_message
    message = self._socket.recv()
  File "/Users/shn/Documents/projects/mogemoge/.env35/src/nnpy/nnpy/socket.py", line 85, in recv
    errors.convert(rc)
  File "/Users/shn/Documents/projects/mogemoge/.env35/src/nnpy/nnpy/errors.py", line 13, in convert
    raise NNError(error_no, msg)
nnpy.errors.NNError: Operation timed out
6.013: publish> even2
6.013: on_message
6.013: subscribe> b'even2'

アイエエエエ!? デッドロック!? デッドロックナンデ!?
結論から言うと、tornadoの中で、SUBソケットでrecvするときにtopic指定がある場合は、DONTWAITを指定しないとブロックする。
おそらく fdのフラグが立つ > nanomsg内でrecvするもtopic違いなので捨てる > 適合topicまで待ってしまう という挙動をしているのだろう。
なので、DONTWAITを付けてrecvを呼び出して、EAGAINの場合はスキップするというコードを書かないといけない

    def on_message(self, *args):
        print('{:.3f}: on_message'.format(elapsed_time()))
        try:
            message = self._socket.recv(nnpy.DONTWAIT)
            print('{:.3f}: subscribe> {!r}'.format(elapsed_time(), message))
        except nnpy.NNError as exc:
            if exc.error_no == nnpy.EAGAIN:
                print('{:.3f}: subscribe> got EAGAIN. skip...'.format(elapsed_time()))
            else:
                raise

ioloopがpollしているから大丈夫だろうと、blockingなrecv()を呼び出すとハマって子供が寝る時間に帰れなくなるよ、というお話でした。

tornado童貞なんでもっと良い書き方あったらおしえてくだちぃ

3
2
1

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
3
2