9
8

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

Pythonでasyncioとコルーチンを使ってスレッドっぽい非同期処理をする

Last updated at Posted at 2018-11-15

TL;DR

Pythonでコルーチン使いつつ、割とスレッドモデルっぽい動作をasyncioを使って実現する。

やりたいこと

  • 複数のコルーチンが非同期に実行される前提
  • それぞれ個々のコルーチンの実行が終わった時点で、返り値を即座に受け取って処理したい

もう少し詳しく

ここの「並列で処理を行いたい(固定長)」のところに、asyncio.gatherasyncio.waitを使った例がそれぞれ載っているけど、返り値の処理はいずれも__全部のコルーチンの処理が終わってから__行われるようになっていた。
これを、__各コルーチンが終了した時点で即座に__返り値の処理もさせつつ他のコルーチンの終了も待つ、ということをしたい。

利用したバージョン

Python3.7.1 です。
ちょっとバージョンが下がると動かないかもしれない。

方式1: asyncio.as_completedを使う

こんな感じ。

import asyncio

# コルーチン1
async def test1():
    await asyncio.sleep(3)
    print('now in test1.')

    return "test1"

# コルーチン2
async def test2():
    await asyncio.sleep(1)
    print('now in test2.')

    return "test2"

# 結果の非同期受信用
async def fmap(f, fs):
    for res in asyncio.as_completed(fs):
        r = await res
        f(r)


if __name__ == '__main__':
    # 複数の coroutine から future のシーケンス作成
    f1 = asyncio.ensure_future(test1())
    f2 = asyncio.ensure_future(test2())
    futures = [f1, f2]

    # future シーケンスを実行して、処理が終わるごとにすぐ受信側処理もする
    loop = asyncio.get_event_loop()
    loop.run_until_complete(fmap(print, futures))

実行結果。

now in test2.
test2
now in test1.
test1

方式2: コールバックを使用する

import asyncio
import functools

# コルーチン1
async def test1():
    await asyncio.sleep(3)
    print('now in test1.')

    return "test1"

# コルーチン2
async def test2():
    await asyncio.sleep(1)
    print('now in test2.')

    return "test2"

# コールバック
def callback(f, future):
    f(future.result())


if __name__ == '__main__':
    # 複数の coroutine から future のシーケンス作成
    f1 = asyncio.ensure_future(test1())
    f2 = asyncio.ensure_future(test2())
    futures = [f1, f2]
    # コールバック設定
    for f in futures:
        f.add_done_callback(functools.partial(callback, print))

    # future シーケンスを実行
    loop = asyncio.get_event_loop()
    loop.run_until_complete(asyncio.gather(*futures))

結果は方式1と同じです。個人的には方式1の方が好き。

asyncio.as_completed を簡単に解説

方式1を書いてるときにちょっとハマったので、asyncio.as_completedの動作をわかったなりに少し解説してみる。

先程書いたコードの中でこの関数を使ってる fmap というコルーチンを見ると、

   async def fmap(f, fs):
       for res in asyncio.as_completed(fs):
           r = await res
           f(r)

関数bodyの1行目で、asyncio.as_completed(fs) の返り値からfor文でresを拾ってきてる。実はasyncio.as_completed(fs)の型はIterator[Awaitable[...]]となっていて、awaitableオブジェクト(実はコルーチン)のイテレータになってる。
通常のイテレータはIterableな型を持つ値をくるむのだけれども、そのIterableの非同期版がAwaitableだと思えばいい。
なので、Awaitable型の変数resに対してawait文を適用し、

    await res

とすることで、さらにAwaitableでくるまれた中身の値が取り出せるという仕組み。

9
8
1

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
9
8

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?