Edited at

Pythonでasyncioとコルーチンを使ってスレッドっぽい非同期処理をする


TL;DR

Pythonでコルーチン使いつつ、割とスレッドモデルっぽい動作をasyncioを使って実現する。


やりたいこと


  • 複数のコルーチンが非同期に実行される前提

  • それぞれ個々のコルーチンの実行が終わった時点で、返り値を即座に受け取って処理したい


もう少し詳しく

ここの「並列で処理を行いたい(固定長)」のところに、asyncio.gatherasyncio.waitを使った例がそれぞれ載っているけど、返り値の処理はいずれも全部のコルーチンの処理が終わってから行われるようになっていた。

これを、各コルーチンが終了した時点で即座に返り値の処理もさせつつ他のコルーチンの終了も待つ、ということをしたい。


利用したバージョン

Python3.7.1 です。

ちょっとバージョンが下がると動かないかもしれない。


方式1: asyncio.as_completedを使う

こんな感じ。

import asyncio

# コルーチン1
async def test1():
await asyncio.sleep(3)
print('now in test1.')

return "test1"

# コルーチン2
async def test2():
await asyncio.sleep(1)
print('now in test2.')

return "test2"

# 結果の非同期受信用
async def fmap(f, fs):
for res in asyncio.as_completed(fs):
r = await res
f(r)

if __name__ == '__main__':
# 複数の coroutine から future のシーケンス作成
f1 = asyncio.ensure_future(test1())
f2 = asyncio.ensure_future(test2())
futures = [f1, f2]

# future シーケンスを実行して、処理が終わるごとにすぐ受信側処理もする
loop = asyncio.get_event_loop()
loop.run_until_complete(fmap(print, futures))

実行結果。

now in test2.

test2
now in test1.
test1


方式2: コールバックを使用する

import asyncio

import functools

# コルーチン1
async def test1():
await asyncio.sleep(3)
print('now in test1.')

return "test1"

# コルーチン2
async def test2():
await asyncio.sleep(1)
print('now in test2.')

return "test2"

# コールバック
def callback(f, future):
f(future.result())

if __name__ == '__main__':
# 複数の coroutine から future のシーケンス作成
f1 = asyncio.ensure_future(test1())
f2 = asyncio.ensure_future(test2())
futures = [f1, f2]
# コールバック設定
for f in futures:
f.add_done_callback(functools.partial(callback, print))

# future シーケンスを実行
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.gather(*futures))

結果は方式1と同じです。個人的には方式1の方が好き。


asyncio.as_completed を簡単に解説

方式1を書いてるときにちょっとハマったので、asyncio.as_completedの動作をわかったなりに少し解説してみる。

先程書いたコードの中でこの関数を使ってる fmap というコルーチンを見ると、

   async def fmap(f, fs):

for res in asyncio.as_completed(fs):
r = await res
f(r)

関数bodyの1行目で、asyncio.as_completed(fs) の返り値からfor文でresを拾ってきてる。実はasyncio.as_completed(fs)の型はIterator[Awaitable[...]]となっていて、awaitableオブジェクト(実はコルーチン)のイテレータになってる。

通常のイテレータはIterableな型を持つ値をくるむのだけれども、そのIterableの非同期版がAwaitableだと思えばいい。

なので、Awaitable型の変数resに対してawait文を適用し、

    await res

とすることで、さらにAwaitableでくるまれた中身の値が取り出せるという仕組み。