Posted at

PythonでのノンブロックングI/OなEventlet動作を試してみた

More than 1 year has passed since last update.

PythonでのノンブロックングI/Oなプログラミングとして、Eventlet動作を試してみた。ただ、Eventletの振る舞いを理解しつつ、スクラッチから書き上げるのは大変なので、OpenStack Nove_compute側の実装コードを参考にした。Nove_compute側でのLive_migration


(1) Eventletサンプルコード

まずは、ユーザ側スレッド(グリーンスレッド)を単体で操作させるだけのサンプルプログラム。。。


sample1.py

import eventlet

import time
eventlet.monkey_patch()

def _sample_processing():
print("### Sample processing thread has started")
for count in range(5):
print("." *count)
time.sleep(0.5)
print("### Sample processing thread has finished")

def start_sample():
opthread = eventlet.spawn(_sample_processing)
opthread.wait()

if __name__ == '__main__':
start_sample()


動かしてみる

$ python sample1.py 

### Sample processing thread has started

.
..
...
....
### Sample processing thread has finished

つづいて、複数のグリーンスレッドを複数動作させて、動作を連携させたい場合を想定して、少し、拡張。。。


sample2.py

import eventlet

import time
eventlet.monkey_patch()

def _sample_processing():
print("### Sample processing thread has started")
for count in range(5):
print("." *count)
time.sleep(0.5)
print("### Sample processing thread has finished")

def start_sample():
opthread = eventlet.spawn(_sample_processing)
finish_event = eventlet.event.Event()

def thread_finished(thread, event):
print("### Sample processing thread notification")
event.send()
opthread.link(thread_finished, finish_event)

time.sleep(0)
_sample_processing_monitor(finish_event)

def _sample_processing_monitor(finish_event):
while True:
if not finish_event.ready():
print("+++ thread is still running!! [{}]".format(finish_event._result))
else:
print("+++ thread is done!! [{}]".format(finish_event._result))
break
time.sleep(1)

if __name__ == '__main__':
start_sample()


動かしてみた

$ python sample2.py 

### Sample processing thread has started

+++ thread is still running!! [NOT_USED]
.
+++ thread is still running!! [NOT_USED]
..
...
+++ thread is still running!! [NOT_USED]
....
### Sample processing thread has finished
### Sample processing thread notification
+++ thread is done!! [None]

いい感じで、_sample_processing側のグリーンスレッドの動作完了が観測できるようになった。


(2) Eventクラス内部構造のDeepDive

すこし、EventletのEventクラスの内部機構をチェックしてみると、、、

class Event(object):

...

_result = None
_exc = None

def __init__(self):
self._waiters = set()
self.reset()

def __str__(self):
params = (self.__class__.__name__, hex(id(self)),
self._result, self._exc, len(self._waiters))
return '<%s at %s result=%r _exc=%r _waiters[%d]>' % params

def reset(self):
assert self._result is not NOT_USED, 'Trying to re-reset() a fresh event.'
self._result = NOT_USED
self._exc = None

def ready(self):
return self._result is not NOT_USED

...

def send(self, result=None, exc=None):
assert self._result is NOT_USED, 'Trying to re-send() an already-triggered event.'
self._result = result
if exc is not None and not isinstance(exc, tuple):
exc = (exc, )
self._exc = exc
hub = hubs.get_hub()
for waiter in self._waiters:
hub.schedule_call_global(
0, self._do_send, self._result, self._exc, waiter)

となっている。

最初、sample2.pyで、Eventクラスをインスタンス化しただけだと、

self.result = NOT_USED

としてインスタンス変数が保持されるが、その後、event.send()が起動されると、

self.
result = None

に変化する。

最終的に、監視側(sample_processing_monitor)で、self.resultの内容をルックアップすることによって、sampleprocessing側のグリーンスレッドの動作完了が判定できるようになるようだ。