TL;DR
Pythonでコルーチン使いつつ、割とスレッドモデルっぽい動作をasyncio
を使って実現する。
やりたいこと
- 複数のコルーチンが非同期に実行される前提
- それぞれ個々のコルーチンの実行が終わった時点で、返り値を即座に受け取って処理したい
もう少し詳しく
- 参考:Pythonにおける非同期処理: asyncio逆引きリファレンス(すごくタメになるのでオススメ)
ここの「並列で処理を行いたい(固定長)」のところに、asyncio.gather
とasyncio.wait
を使った例がそれぞれ載っているけど、返り値の処理はいずれも__全部のコルーチンの処理が終わってから__行われるようになっていた。
これを、__各コルーチンが終了した時点で即座に__返り値の処理もさせつつ他のコルーチンの終了も待つ、ということをしたい。
利用したバージョン
Python3.7.1 です。
ちょっとバージョンが下がると動かないかもしれない。
方式1: asyncio.as_completed
を使う
こんな感じ。
import asyncio
# コルーチン1
async def test1():
await asyncio.sleep(3)
print('now in test1.')
return "test1"
# コルーチン2
async def test2():
await asyncio.sleep(1)
print('now in test2.')
return "test2"
# 結果の非同期受信用
async def fmap(f, fs):
for res in asyncio.as_completed(fs):
r = await res
f(r)
if __name__ == '__main__':
# 複数の coroutine から future のシーケンス作成
f1 = asyncio.ensure_future(test1())
f2 = asyncio.ensure_future(test2())
futures = [f1, f2]
# future シーケンスを実行して、処理が終わるごとにすぐ受信側処理もする
loop = asyncio.get_event_loop()
loop.run_until_complete(fmap(print, futures))
実行結果。
now in test2.
test2
now in test1.
test1
方式2: コールバックを使用する
import asyncio
import functools
# コルーチン1
async def test1():
await asyncio.sleep(3)
print('now in test1.')
return "test1"
# コルーチン2
async def test2():
await asyncio.sleep(1)
print('now in test2.')
return "test2"
# コールバック
def callback(f, future):
f(future.result())
if __name__ == '__main__':
# 複数の coroutine から future のシーケンス作成
f1 = asyncio.ensure_future(test1())
f2 = asyncio.ensure_future(test2())
futures = [f1, f2]
# コールバック設定
for f in futures:
f.add_done_callback(functools.partial(callback, print))
# future シーケンスを実行
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.gather(*futures))
結果は方式1と同じです。個人的には方式1の方が好き。
asyncio.as_completed
を簡単に解説
方式1を書いてるときにちょっとハマったので、asyncio.as_completed
の動作をわかったなりに少し解説してみる。
先程書いたコードの中でこの関数を使ってる fmap
というコルーチンを見ると、
async def fmap(f, fs):
for res in asyncio.as_completed(fs):
r = await res
f(r)
関数bodyの1行目で、asyncio.as_completed(fs)
の返り値からfor
文でres
を拾ってきてる。実はasyncio.as_completed(fs)
の型はIterator[Awaitable[...]]
となっていて、awaitableオブジェクト(実はコルーチン)のイテレータになってる。
通常のイテレータはIterable
な型を持つ値をくるむのだけれども、そのIterable
の非同期版がAwaitable
だと思えばいい。
なので、Awaitable
型の変数res
に対してawait
文を適用し、
await res
とすることで、さらにAwaitable
でくるまれた中身の値が取り出せるという仕組み。