asyncioにおけるイベント・ループの流れ
イベント・ループの仕事
先に結論を書いておきます.
- イベント・ループはrun_onceの繰り返し
- ループではハンドルとして登録したコールバックを呼び出している
- ネイティブ・コルーティンもsend(None)で処理を進める
対象コード
コルーティンはsendでデータを送れるのでしたが, asyncioではevent_loopでコルーティンを管理します. 以下のコードの処理をデバッガーで追いながらコルーティンがどうやってイベント・ループ内で実行されるのかを確認します.
import asyncio
async def main():
print("OK Google. Wake me up in 2 seconds")
await asyncio.sleep(2)
print("Good Morning")
if __name__ == "__main__":
asyncio.run(main())
###Q. イベント・ループはどのようにして作られるのか?
asyncio.runを呼ぶとイベント・ループを生成する必要があります. 内部では以下のような関数が呼ばれています.
メソッド | 役割 |
---|---|
new_event_loop | イベント・ループの生成 |
run_until_complete | イベント・ループの実行 |
def new_event_loop():
"""Equivalent to calling get_event_loop_policy().new_event_loop()."""
return get_event_loop_policy().new_event_loop()
イベント・ループへのアクセスはイベント・ループ・ポリシーという得体の知れないものを通じて行います. 色々説明しても分かりにくいですが, 以下のようなインターフェースをもつオブジェクトです.1
- new_event_loop
- get_event_loop
- set_event_loop
ポリシーはイベント・ループを管理するオブジェクトとして定義されているようです. new_event_loopというのがイベントを新規に生成するメソッドです. _loop_factoryに処理を委譲しています.
def new_event_loop(self):
"""Create a new event loop.
You must call set_event_loop() to make this the current event
loop.
"""
return self._loop_factory()
factoryという名前はいかにも怪しそうです. OSにもよりますがMacの場合デフォルトのポリシーは_UnixDefaultEventLoopPolicyが指定されているようです. ここで実際の_loop_factoryが設定されます.
_loop_factory = _UnixSelectorEventLoop
ようやくイベント・ループの定義っぽい名前を持つクラスが出てきました. この継承を遡っていくとAbstractEventLoopという抽象クラスに辿り着きます.
以下のようなループっぽいメソッドが定義されています.
- run_foever
- run_until_complete
- stop
- close
これらはドキュメントのEvent Loop Methodsで説明されているメソッドですね. さて流れをおさらいしましょう.
- asyncio.run関数が呼ばれる
- new_event_loop関数が呼ばれる
- get_event_loop_policy関数でイベント・ループ・ポリシーを取得/生成
- デフォルト・ポリシーは_UnixDefaultEventLoopPolicy
- ループ・ファクトリとして_UnixSelectorEventLoopが設定される
- _UnixDefaultEventLoopPolicyのインスタンスが返される
- BaseDefaultEventLoopPolicyのnew_event_loopが呼び出さる.
- _UnixSelectorEventLoopのインスタンスが生成される.
ここで生成されるイベント・ループの正体は_UnixSelectorEventLoopということになります. そして_loop_factoryはクラスだったわけです. デバッガーで値を表示するとよく分かります.
# self._loop_factory
<asyncio.unix_events._UnixDefaultEventLoopPolicy object at 0x10fb4f2d0>
# loop = events.new_event_loop()
<_UnixSelectorEventLoop running=False closed=False debug=False>
次はこのイベント・ループをセットします.
###Q. イベント・ループの設定
ポリシーはイベント・ループを管理するオブジェクトでした. 管理対象であるループはこのクラスに持たせるのが良さそうです. 実際ループは_localというプロパティに渡されます. これはスレッド・ローカルのサブクラスとして定義されています. threading.local関数は他のスレッドからは操作できないスレッド・ローカルなデータを生成するようです.
さてこの場合のスレッドは何を指すのでしょうか? ここまでスレッドを明示的に生成した記憶はありません. threadingには_MainThreadクラスがあって, pythonインタープリタが起動したスレッドを指すようです. アンダースコアから分かるように直接操作することは避けるべきクラスのようです. 深追いせずにスレッドがメイン・スレッドか調べるだけにしておきましょう.2
threading.current_thread() is threading.main_thread() # True
現在のスレッドがメイン・スレッドであることが分かりました. ループへの操作はメイン・スレッド外からはできないようになっているようです.
###Q. run_until_completeとは?
実際のコルーティンをループに渡しているのがrun_until_completeメソッドです. このメソッドは未来の値が格納されるFutureオブジェクトを引数に取ります. 今回はmainというコルーティンが渡されています.
# loop.run_until_complete(future)
return loop.run_until_complete(main)
このままではFutureオブジェクトという条件を満たしていませんが, run_until_completeではensure_futureでコルーティンををタスクというクラスでラップしています.
def run_until_complete(self, future):
...
new_task = not futures.isfuture(future) # (*)
future = tasks.ensure_future(future, loop=self)
...
future.add_done_callback(_run_until_complete_cb)
...
try:
self.run_forever()
except:
...
finally:
future.remove_done_callback(_run_until_complete_cb)
...
return future.result()
戻り値はfutureのresultのようです. つまりFutureやTaskを実行した結果を返してくれるのがrun_until_completeということのようです.
###Q. タスクとは?
A Future-like object that runs a Python coroutine.3
コルーティンを実行できるフューチャー風のオブジェクトのようです. Taskのクラス定義を見ると_PyFutureというクラスを継承しています.
class Task(futures._PyFuture):
次にFutureクラスを見ると
_PyFuture = Future
とありどうもTaskはFutureを継承しているクラスのようです. ensure_future関数はFutureかコルーティンをとりFutureであることを保証してくれます. Futureならそのまま返し, コルーティンならcreate_taskでタスクを作って返します. create_taskはTaskクラスのインスタンスを生成して返します.
しかし生成されたタスクがループに渡されるという処理はありません. どうやってループは実行するタスクを知るのでしょうか? Taskクラスのイニシャライザを見るとcall_soonというのが見つかります.
self._loop.call_soon(self.__step, context=self._context)
_register_task(self)
asyncioにはcallという接尾辞が着く関数がいくつかあります. ここでcall系メソッドを整理しておきましょう.
名称 | 用途 |
---|---|
call_soon | コールバックを次のイテレーションですぐに呼び出す |
call_later | コールバックを指定した時間だけ呼び出しを遅らせる |
call_at | コールバックを指定した時間に呼び出す |
なんとなくcall_laterはasyncio.sleepと被ってるような気がします. さてどうなんでしょうか?
何れにしてもこれらのメソッドにコールバックを登録してrun_foreverでループを始動すると所定のタイミングで呼び出してくれるわけです. call_soonは登録するとイベント・ループの次のイテレーションで呼び出されますので, すぐにお呼びがかかるわけです.
call_soonの実態である_call_soonを見てみましょう. この関数はコールバック関数をHandleクラスでラップし, _readyというdequeに追加することです.
# self._ready = collections.deque()
def _call_soon(self, callback, args, context):
handle = events.Handle(callback, args, self, context)
...
self._ready.append(handle) # (*)
return handle
つまりTaskの生成 = コールバックの登録となっているのでループに知らせる必要はないわけです. ひとまず_redyに格納されているコールバックをReady Callbackとでも呼びましょう. Taskクラスの場合__stepというメソッドがこれに当たります. mainコルーティンもこの中で処理されます.
__stepメソッドは色々やっていてよく分からない部分も多いのですがいくつか見慣れたメソッド呼び出しがあります.
if exc is None:
# We use the `send` method directly, because coroutines
# don't have `__iter__` and `__next__` methods.
result = coro.send(None)
else:
result = coro.throw(exc)
これは前回のコルーティンでやったsendメソッドとthrowメソッドです. ネイティブ・コルーティンといえど処理の実行は同様に行われるようです.
###Q. イベント・ループはどうやって起動するのか?
いよいよ実際のループを始動します. run_foreverメソッドはループを始動しますが, ループとは_run_onceというメソッドを繰り返し呼び出すことを指すようです.
while True:
self._run_once()
if self._stopping:
break
_run_onceには終わりの方にあるfor-inループでReady Callbackを順次処理しています.
for i in range(ntodo):
handle = self._ready.popleft()
if self._debug:
# do debugging things
else:
handle._run()
Handleはキャンセルに使うとドキュメントには書いていましたが内部ではコールバックの実行も担うようです. Taskの場合はコルーティンを操作する__stepコールバックのことでした.
ではハンドルのrunメソッドは何をやっているのでしょうか.
self._context.run(self._callback, *self._args)
コンテクストというのが出てきましたがデバッガから実行すると要は第1引数のコールバックを実行します. この行にステップ・インするとmainコルーティンの内部に処理が移行するというわけです. ようやくasyncio.sleepが呼び出されます.
###Q. asyncio.sleepとは?
sleepコルーティンのコードは以下のようになります.
async def sleep(delay, result=None, *, loop=None):
"""Coroutine that completes after a given time (in seconds)."""
if delay <= 0:
await __sleep0()
return result
if loop is None:
loop = events.get_event_loop()
future = loop.create_future()
h = loop.call_later(delay,
futures._set_result_unless_cancelled,
future, result)
try:
return await future
finally:
h.cancel()
パッと見てわかるようにcall_laterで呼び出しを遅延しているだけです. しかしsleepコルーティンにはコールバックなどは渡されていません. 何を遅延実行させようというのでしょうか? よく見るとfutureのメソッドがコールバックとして渡されています.
内部で別のfutureを作って, call_laterで登録しているだけでした. 実はcall_laterは内部でcall_atを呼び出しているだけだったりします.
# loop.call_at(when, callback, *args, context=None)
timer = self.call_at(self.time() + delay, callback, *args,
context=context)
今度は_scheduledにTimeHandleのインスタンスが登録されます. 現在の時点からdelayの分だけ進んだ時刻で呼び出すという処理です. コールバック関数をヒープ・キューにプッシュします.
# call_at
timer = events.TimerHandle(when, callback, args, self, context)
...
heapq.heappush(self._scheduled, timer)
timer._scheduled = True
TimerHandleはHandleを継承しているので_runメソッドがあります. つまりコールバックを呼び出すことができます. Ready Callbackと分けるために, Scheduled Callbackとでも呼びましょう. イベント・ループで処理すべきコールバックが二つあることが分かります.
名称 | 説明 |
---|---|
Ready Callback | 次のイテレーションですぐ呼び出される |
Scheduled Callback | スケジュール通りに呼び出される |
ここではFutureクラスの_set_result_unless_cancelledというメソッドを呼び出しています.
このメソッドは単純にFutureに結果を設定するset_resultメソッドを呼び出しているだけです. ただFutureがキャンセルされていないことを保証します.
def cancelled(self):
"""Return True if the future was cancelled."""
return self._state == _CANCELLED
このメソッドを見ると分かるのはFutureクラスはステートで色々な判断を行なっているということです. 未来が決定したことを判断するdoneメソッドも同様です.
def done(self):
"""Return True if the future is done.
Done means either that a result / exception are available, or that the
future was cancelled.
"""
return self._state != _PENDING
さてset_resultメソッドですが単純に結果をFutureに設定するというFutureの基本的な機能を呼び出しているようです. 次のループでこれは処理されます. _run_onceに戻りましょう.
else:
event_list = self._selector.select(timeout)
self._process_events(event_list)
I/O多重化というそうですが, とりあえず無視してevent_listがこの時点では空のリストになることを押さえておきましょう. この次のwhileループでScheduled Callback(ここではlater callbacks)が処理されます.
# Handle 'later' callbacks that are ready.
end_time = self.time() + self._clock_resolution
while self._scheduled:
handle = self._scheduled[0]
if handle._when >= end_time:
break
handle = heapq.heappop(self._scheduled)
handle._scheduled = False
self._ready.append(handle)
時間が来たコールバックをReady Callbackとして再登録します. call_soonの時と同じです.
###Q ループはどうやって止めているのか?
デバッガーで処理を追うとself._stoppingがいつの間にかTrueになっていたりします. 終わる直前の_run_onceをみてみると, _run_until_complete_cbというコールバックが呼び出されているのが分かります.
これはrun_until_complete関数でadd_done_callbackを通じて設定されたコールバックです.
# run_until_complete
future.add_done_callback(_run_until_complete_cb)
_run_until_complete_cbはfutureインスタンスを通してループを取得し停止します.
# _run_until_complete_cb
futures._get_loop(fut).stop()
BaseEventLoopのstopメソッドでself._stoppingが真に設定されます.
def stop(self):
"""Stop running the event loop.
Every callback already scheduled will still run. This simply informs
run_forever to stop looping after a complete iteration.
"""
self._stopping = True
次のループでbreakで無限ループから抜け出せます.
おさらい
色々処理が飛び分かりづらい点やよく分からなかった点もありますがコールスタックを見ると処理の流れを追いやすいかもしれません.
やってることはrun_foreverでイベント・ループを回し, call_soon, call_later, そしてcall_atなどで呼び出しのタイミングをコントロールするということのようです. またネイティブ・コルーティンは__stepメソッド内部でsendを使ってコントロールしていることが分かりました.
よく分からなかったところ && 無視したところ
色々あるが気がづいた点を挙げておきたいと思います.
Taskクラスの__stepメソッド
内部でsend(None)するなどネイティブ・コルーティンへの通信をしているのだが, ちゃんと読んでない.
add_done_callbackで登録してコールバックが呼び出される流れ
mainの最後の行が実行されるとそのままcall_soonが起動するのだが, どこがその引き金になっているのかが分からない. いや引き金は分かる. _context.runなのだが, call_soonが呼び出されるためにはfuture.done()がTrue出ないといけないのだがこれがどこで設定されるのかが追えなかった.
I/O多重化
event_listは今回は空のリストだったがなんか大事感じがする. I/O多重化関係らしいことは分かるのだが深掘りしていない.
イベント・ループのイベントとは?
これもI/O多重化と関係するようでよく分かりませんでした. イベントと呼べる処理は今回のコードからはたどり着きませんでした.
contextvarsモジュール
contextvarsモジュールから実際のコルーティンが呼び出されているのだがいまいちどういうモジュールなのかが分からない. 別スレッドからの呼び出しで使われるらしい