PythonでのノンブロックングI/Oなプログラミングとして、Eventlet動作を試してみた。ただ、Eventletの振る舞いを理解しつつ、スクラッチから書き上げるのは大変なので、OpenStack Nove_compute側の実装コードを参考にした。Nove_compute側でのLive_migration
(1) Eventletサンプルコード
まずは、ユーザ側スレッド(グリーンスレッド)を単体で操作させるだけのサンプルプログラム。。。
sample1.py
import eventlet
import time
eventlet.monkey_patch()
def _sample_processing():
print("### Sample processing thread has started")
for count in range(5):
print("." *count)
time.sleep(0.5)
print("### Sample processing thread has finished")
def start_sample():
opthread = eventlet.spawn(_sample_processing)
opthread.wait()
if __name__ == '__main__':
start_sample()
動かしてみる
$ python sample1.py
### Sample processing thread has started
.
..
...
....
### Sample processing thread has finished
つづいて、複数のグリーンスレッドを複数動作させて、動作を連携させたい場合を想定して、少し、拡張。。。
sample2.py
import eventlet
import time
eventlet.monkey_patch()
def _sample_processing():
print("### Sample processing thread has started")
for count in range(5):
print("." *count)
time.sleep(0.5)
print("### Sample processing thread has finished")
def start_sample():
opthread = eventlet.spawn(_sample_processing)
finish_event = eventlet.event.Event()
def thread_finished(thread, event):
print("### Sample processing thread notification")
event.send()
opthread.link(thread_finished, finish_event)
time.sleep(0)
_sample_processing_monitor(finish_event)
def _sample_processing_monitor(finish_event):
while True:
if not finish_event.ready():
print("+++ thread is still running!! [{}]".format(finish_event._result))
else:
print("+++ thread is done!! [{}]".format(finish_event._result))
break
time.sleep(1)
if __name__ == '__main__':
start_sample()
動かしてみた
$ python sample2.py
### Sample processing thread has started
+++ thread is still running!! [NOT_USED]
.
+++ thread is still running!! [NOT_USED]
..
...
+++ thread is still running!! [NOT_USED]
....
### Sample processing thread has finished
### Sample processing thread notification
+++ thread is done!! [None]
いい感じで、_sample_processing側のグリーンスレッドの動作完了が観測できるようになった。
(2) Eventクラス内部構造のDeepDive
すこし、EventletのEventクラスの内部機構をチェックしてみると、、、
class Event(object):
...
_result = None
_exc = None
def __init__(self):
self._waiters = set()
self.reset()
def __str__(self):
params = (self.__class__.__name__, hex(id(self)),
self._result, self._exc, len(self._waiters))
return '<%s at %s result=%r _exc=%r _waiters[%d]>' % params
def reset(self):
assert self._result is not NOT_USED, 'Trying to re-reset() a fresh event.'
self._result = NOT_USED
self._exc = None
def ready(self):
return self._result is not NOT_USED
...
def send(self, result=None, exc=None):
assert self._result is NOT_USED, 'Trying to re-send() an already-triggered event.'
self._result = result
if exc is not None and not isinstance(exc, tuple):
exc = (exc, )
self._exc = exc
hub = hubs.get_hub()
for waiter in self._waiters:
hub.schedule_call_global(
0, self._do_send, self._result, self._exc, waiter)
となっている。
最初、sample2.pyで、Eventクラスをインスタンス化しただけだと、
self._result = NOT_USED
としてインスタンス変数が保持されるが、その後、event.send()が起動されると、
self._result = None
に変化する。
最終的に、監視側(_sample_processing_monitor)で、self._resultの内容をルックアップすることによって、_sample_processing側のグリーンスレッドの動作完了が判定できるようになるようだ。