お客様(=神様)から、websocketで受け取ったメッセージをnanomsgで引きずり回したい!と言われたらそういうコードを書くしか無い、我ら受託エンジニア。
websocetとnanomsgを両方使うならtornadoで非同期コードを管理したいな、と思うのは当然至極。
nanomsgは以下のコードでソケットのfile descriptorが取れるので、これをioloopにぶち込めばnon-blockingなコードが書けてウハウハだ!!
(nanomsgのドライバにはnnpyを使っている)
_socket.getsockopt(nnpy.SOL_SOCKET, nnpy.RCVFD)
と、こんなコードを書いてみた。(長いのでgist)
topicを指定しない場合は
0.001: publish> even0
0.001: on_message
0.001: subscribe> b'even0'
1.006: publish> odd 1
1.006: on_message
1.006: subscribe> b'odd 1'
2.011: publish> even2
2.011: on_message
うまく動く。
気を良くして、topicを'even'に指定してみると...
0.001: publish> even0
0.001: on_message
0.001: subscribe> b'even0'
1.005: publish> odd 1
1.005: on_message
ERROR:tornado.application:Exception in callback (<__main__.Subscriber object at 0x109f87a58>, <function wrap.<locals>.null_wrapper at 0x10a8169d8>)
Traceback (most recent call last):
File "/Users/shn/Documents/projects/mogemoge/.env35/lib/python3.5/site-packages/tornado/ioloop.py", line 887, in start
handler_func(fd_obj, events)
File "/Users/shn/Documents/projects/mogemoge/.env35/lib/python3.5/site-packages/tornado/stack_context.py", line 275, in null_wrapper
return fn(*args, **kwargs)
File "test_pubsub.py", line 38, in on_message
message = self._socket.recv()
File "/Users/shn/Documents/projects/mogemoge/.env35/src/nnpy/nnpy/socket.py", line 85, in recv
errors.convert(rc)
File "/Users/shn/Documents/projects/mogemoge/.env35/src/nnpy/nnpy/errors.py", line 13, in convert
raise NNError(error_no, msg)
nnpy.errors.NNError: Operation timed out
6.013: publish> even2
6.013: on_message
6.013: subscribe> b'even2'
アイエエエエ!? デッドロック!? デッドロックナンデ!?
結論から言うと、tornadoの中で、SUBソケットでrecvするときにtopic指定がある場合は、DONTWAITを指定しないとブロックする。
おそらく fdのフラグが立つ > nanomsg内でrecvするもtopic違いなので捨てる > 適合topicまで待ってしまう という挙動をしているのだろう。
なので、DONTWAITを付けてrecvを呼び出して、EAGAINの場合はスキップするというコードを書かないといけない
def on_message(self, *args):
print('{:.3f}: on_message'.format(elapsed_time()))
try:
message = self._socket.recv(nnpy.DONTWAIT)
print('{:.3f}: subscribe> {!r}'.format(elapsed_time(), message))
except nnpy.NNError as exc:
if exc.error_no == nnpy.EAGAIN:
print('{:.3f}: subscribe> got EAGAIN. skip...'.format(elapsed_time()))
else:
raise
ioloopがpollしているから大丈夫だろうと、blockingなrecv()を呼び出すとハマって子供が寝る時間に帰れなくなるよ、というお話でした。
tornado童貞なんでもっと良い書き方あったらおしえてくだちぃ