LoginSignup
45
43

More than 3 years have passed since last update.

Pythonの非同期処理 ~async, awaitを完全に理解する~

Posted at

Future について

Pythonで時間のかかる計算を実行して、その結果を得たいと思った時、通常はその処理を関数にまとめ、その関数を実行た時の返り値として得ることになります。これを同期処理といいます。

一方、同期処理とは異なる概念として非同期処理というものがあります。これは、Futureというオブジェクトを介して、計算を要求する処理(receiver)と実際の計算を行う処理(sender)の間で以下のようなやり取りを行います。

  • (receiver) Futureオブジェクトを生成する。
  • (receiver) 何らかの手段を用いて receiver を実行する。
  • (sender) 時間のかかる計算を行い、計算結果を、receiver が生成したFutureオブジェクトに書き込む
  • (receiver) Futureオブジェクトを確認し、計算結果が格納されていれば計算結果を取得する。

ここまでの処理は、例えば以下のようになります。

import asyncio
import time

def f(future):
    time.sleep(5) # 時間のかかる処理
    future.set_result("hello")
    return

future = asyncio.futures.Future()
f(future)

if future.done():
    res = future.result()
    print(res)

これを実行すると、5秒待った後に「hello」と表示されます。

一応、Futureの関連するコードを示すと以下のようになります。(一部省略)
Lib/asyncio/futures.py
class Future:
    _state = _PENDING
    _result = None

    def done(self):
        return self._state != _PENDING

    def result(self):
        if self._state != _FINISHED:
            raise exceptions.InvalidStateError('Result is not ready.')
        return self._result

    def set_result(self, result):
        if self._state != _PENDING:
            raise exceptions.InvalidStateError(f'{self._state}: {self!r}')
        self._result = result
        self._state = _FINISHED

イベントループ

既にお気づきのとおり、上のコードはFutureオブジェクトを使用している以外は、通常の関数呼び出しと同じです。なぜなら、sender のコードを receiver が直接実行しているからです。これではFutureの利点が活かされません。

ここでイベントループという概念が登場します。イベントループは1スレッドに0個または1個存在するオブジェクトで、登録された関数を実行する機能を持ちます。

実際に使ってみましょう。

import asyncio
import time

def f(future):
    time.sleep(5) # 時間のかかる処理
    future.set_result("hello")
    return

loop = asyncio.get_event_loop()
future = asyncio.futures.Future()
loop.call_soon(f, future)
loop.run_forever()

上のコードでは、asyncio.get_event_loopを呼び出してBaseEventLoopオブジェクトを取得しています。そして、call_soonによって関数floopに登録しています。最後にloop.run_forever()でイベントループを実行しています。

これを実際実行すると、run_forever()で無限ループになっており永遠にプログラムが終了しません。代わりに、以下のように書くことで、関数f()の実行が終わった後に自動的にイベントループを停止することができます。

res = loop.run_until_complete(future)
print(res)

run_until_complete()はどのようにして関数f()の完了を知ることができるのでしょうか。これには、futureのコールバックという仕組みが用いられています。
run_until_complete()ではまずfuture.add_done_callback()という関数を実行し、futureにコールバックを設定しています。その後run_forever()が呼ばれ関数f()が実行されます。その後関数f()内でfuture.set_result()によって値が設定されると、add_done_callback()によって設定されたコールバックが呼ばれます。run_until_complete()が設定するコールバック内では、loop.stop()によってイベントループの終了を予約する処理を行っているため、f()の実行終了後にイベントループが停止します。

注意点としては、future.set_result()が実行されて即座に関数f()の実行が終了するわけではないことです。あくまで終了が予約されるだけで、実際はreturnまで実行が継続されます。

関係するライブラリのコードを載せます。
Lib/asyncio/events.py
import contextvars
class Handle:
    def __init__(self, callback, args, loop):
        self._context = contextvars.copy_context()
        self._loop = loop
        self._callback = callback
        self._args = args

    def _run(self):
        self._context.run(self._callback, *self._args)
Lib/asyncio/base_events.py
class BaseEventLoop(events.AbstractEventLoop):
    def __init__(self):
        self._stopping = False
        self._ready = collections.deque()

    def _call_soon(self, callback, args, context):
        handle = events.Handle(callback, args, self, context)
        self._ready.append(handle)
        return handle

    def _run_once(self):
        ntodo = len(self._ready)
        for i in range(ntodo):
            handle = self._ready.popleft()
            if handle._cancelled:
                continue
            handle._run()

    def run_forever(self):
        while True:
            self._run_once()
            if self._stopping:
                break

    def run_until_complete(self, future):
        def _run_until_complete_cb(fut):
            self.stop()
        future.add_done_callback(_run_until_complete_cb)
        self.run_forever()
        return future.result()

    def stop(self):
        self._stopping = True
Lib/asyncio/futures.py
class Future:
    def add_done_callback(self, fn):
        context = contextvars.copy_context()
        self._callbacks.append((fn, context))

    def set_result(self, result):
        # ... 省略
        for callback, ctx in self._callbacks[:]:
            self._loop.call_soon(callback, self, context=ctx)

イベントループを用いた複数の処理の実行

前章では、イベントループを用いて処理を実行しました。しかし、変わったところは時間のかかる処理をおこなう関数fを直接実行するのではなく、イベントループを介して実行した所だけでした。これではやっていることは何ら変わりがありません。

イベントループの本領は、複数の処理を実行した時に発揮されます。では、実際にやってみましょう。

import asyncio
import time

def f(future, tag):
    for _ in range(3):
        time.sleep(1)
        print("waiting for f(%d)" % tag)
    future.set_result("hello %d" % tag)
    return

loop = asyncio.get_event_loop()
futures = []
for tag in range(3):
    future = loop.create_future()
    loop.call_soon(f, future, tag)
    futures += [future]
res = loop.run_until_complete(asyncio.gather(*futures))
print(res)

このコードでは3つの処理を登録しています。また、複数のFutureを一つに束ねるため、asyncio.gatherという新しい関数を用いています。この実行結果は以下のようになります。

waiting for f(0)
waiting for f(0)
waiting for f(0)
waiting for f(1)
waiting for f(1)
waiting for f(1)
waiting for f(2)
waiting for f(2)
waiting for f(2)
['hello 0', 'hello 1', 'hello 2']

この結果からわかるようにf(0),f(1),f(2)は並列で実行されているわけではないことに注意してください。ライブラリのソースコードを見れば分かるように、loop.run_until_complete()内ではloop._readyに登録されたコールバックを順次実行しているだけなのです。

関連するライブラリのコードを掲載します。
Lib/asyncio/tasks.py
class _GatheringFuture(futures.Future):
    def __init__(self, children, *, loop=None):
        super().__init__(loop=loop)
        self._children = children
        self._cancel_requested = False

def gather(*coros_or_futures, loop=None, return_exceptions=False):
    def _done_callback(fut):
        nonlocal nfinished
        nfinished += 1
        if nfinished == nfuts:
            results = []
            for fut in children:
                res = fut.result()
                results.append(res)
            outer.set_result(results)
    arg_to_fut = {}
    children = []
    nfuts = 0
    nfinished = 0
    for arg in coros_or_futures:
        nfuts += 1
        fut.add_done_callback(_done_callback)
        children.append(fut)
    outer = _GatheringFuture(children, loop=loop)
    return outer

ジェネレーター

ここで脱線しPythonのジェネレーターについて確認します。
ジェネレータは「イテレータを返す関数」です。ジェネレータを実行するとジェネレータオブジェクトが返されます。ジェネレータオブジェクトはイテレータを表す関数__iter__()を実装しています。ジェネレータは以下のように実装します。

def generator():
    yield 1
    yield 2
    yield 3
    return "END"

gg = generator().__iter__()
print(gg.__next__())
print(gg.__next__())
print(gg.__next__())
try:
    print(gg.__next__())
except StopIteration as e:
    print(e.value)

ここで、yieldはジェネレータの中身の処理を一時的に止める働きを持ちます。また、ジェネレータは2段重ねにすることもできます。

def generator2():
    yield 1
    yield 2
    yield 3
    return "END"

def generator():
    a = yield from generator2()
    return a

gg = generator().__iter__()
print(gg.__next__())
print(gg.__next__())
print(gg.__next__())
try:
    print(gg.__next__())
except StopIteration as e:
    print(e.value)

実行結果はどちらも

1
2
3
END

となります。

ジェネレータのイベントループによる実行

前々章で扱ったように、loop.run_until_completeを用いて複数の関数を実行する場合、1つ目の関数の実行が完了した後に2つ目の関数を実行し,...というように関数は並列には実行されず、順番に実行されます。ここで、関数ではなくジェネレータを用いると以下のようになります。

import asyncio
import time

def f(tag):
    for _ in range(3):
        yield
        time.sleep(1)
        print("waiting for f(%d)" % tag)
    return "hello %d" % tag

loop = asyncio.get_event_loop()
tasks = []
for tag in range(3):
    task = f(tag)
    tasks += [task]
res = loop.run_until_complete(asyncio.gather(*tasks))
print(res)

ここで、関数f()内にyield命令を追加し、また計算結果はfuture.set_resultではなくreturnで返すようにしました。引数futureは不要となったので削除しました。

この実行結果は以下のようになります。

waiting for f(0)
waiting for f(1)
waiting for f(2)
waiting for f(0)
waiting for f(1)
waiting for f(2)
waiting for f(0)
waiting for f(1)
waiting for f(2)
['hello 0', 'hello 1', 'hello 2']

前々章ではf(0)が3回表示された後にf(1)が表示され,...となっていたのが、f(0),f(1),f(2)の順で表示されるようになりました。なぜなら、たとえイベントループに複数のタスクが登録されていたとしても、すべて1つのスレッド内で実行されるからです。また、イベントループはPythonの関数の実行を一時停止することはできないため、returnなどによって自発的に関数が停止するまで1つの関数を実行し続けるしかないためです。

一方、ジェネレータを使った場合は、yieldによって関数の実行が一時停止します。このタイミングで処理がイベントループ側に戻るため、イベントループが実行するタスクを切り替えることができるのです

ちなみに、ジェネレータを使用する最小の例は以下の通りです。(これではタスクが1つしかないためジェネレータにした意味がないですが...。)

import asyncio
import time

def f():
    time.sleep(5) # 時間のかかる処理
    yield
    return "hello"

loop = asyncio.get_event_loop()
ret = loop.run_until_complete(f())
print(ret)

ちなみに、ジェネレータを使わないバージョンでは、loop.call_soon()を呼び出して関数f()をイベントループに登録していましたが、本章ではこれを呼び出していないことに疑問を持った方が多いと思います。具体的には以下のようになります。

関数名 引数(Future版) 引数(Generator版)
f() future なし
loop.call_soon() f --
loop.run_until_complete() future f

run_until_complete()内では、与えられた引数がジェネレータオブジェクト(ジェネレータとして定義された関数f()を呼んで得られるもの)の場合、(Futureのサブクラスである)Taskインスタンスを生成します。この生成時に内部的にcall_soon()が呼ばれているのです。

関連するライブラリのコード
Lib/asyncio/base_events.py
class BaseEventLoop(events.AbstractEventLoop):
    def run_until_complete(self, future):
        future = tasks.ensure_future(future, loop=self)
        future.add_done_callback(_run_until_complete_cb)
        self.run_forever()
        return future.result()
Lib/asyncio/tasks.py
def ensure_future(coro_or_future, loop):
    if isinstance(coro_or_future, types.CoroutineType) or isinstance(coro_or_future, types.GeneratorType):
        task = tasks.Task(coro_or_future, loop=loop)
        return task
    else:
        return coro_or_future
class Task(futures.Future):
    def __init__(self, coro, loop=None):
        super().__init__(loop=loop)
        self._coro = coro
        self._loop = loop
        self._context = contextvars.copy_context()

        loop.call_soon(self.__step, context=self._context)
        _register_task(self)

    def __step(self, exc=None):
        coro = self._coro
        self._fut_waiter = None
        try:
            result = coro.send(None)
        except StopIteration as exc:
            super().set_result(exc.value)
        else:
            self._loop.call_soon(self.__step, context=self._context)

sleep時に他のタスクを実行する

ここまでの例では、time.sleep()を多用していました。これはもちろん「時間のかかる処理」を例示するためですが、実際に実用目的でsleep()したい場合もあります。例えば、

  • メインの処理でネットワークの通信を行っている間、サブの処理として一定時間待ったあと、キャンセルの処理をするタイムアウト処理
  • メインの処理で時間のかかる計算を行っている間、サブの処理で進捗状況を表示する

しかしながら、このような場合にサブの処理でtime.sleep()を使うことが出来ません。なぜなら、サブの処理でいったんtime.sleep()を実行してしまうと、sleepしている間にメインの処理を続行することができず、結局time.sleep()が終了するまでサブがイベントループを専有し続けることになるからです。

タスクの中で一定時間待ちたいが、待っている時間はイベントループに処理を戻したい。そのような場合は、loop.call_later()関数が使用できます。この関数は、指定秒数だけ待った後に与えられた関数を実行します。
この性質を用いて、以下のようにmy_sleep()を実装できます。

import asyncio
import time

def my_sleep(delay):
    def _cb_set_result(fut):
        fut.set_result(None)
    loop = asyncio.get_running_loop()
    future = loop.create_future()
    h = loop.call_later(delay, _cb_set_result, future)
    yield from future

def f(tag):
    for i in range(3):
        yield from my_sleep(1)
        print("waiting for f(%d)" % tag)
    return "hello %d" % tag

loop = asyncio.get_event_loop()
tasks = [f(n) for n in range(3)]
ret = loop.run_until_complete(asyncio.gather(*tasks))
print(ret)

これは、前章の処理をmy_sleep()を使って書き直したものです。前章は3つの処理でそれぞれ3秒ずつ待っていたので合計9秒間かかりました。しかし、今回の処理は約3秒で実行が終了します。

もう少し複雑なこともできます。例えば、タスクの中である関数を呼び出しており、その関数がmy_sleep()しようとしている場合を考えます。この場合、以下のように呼び出す関数をジェネレータとして定義すれば大丈夫です。

def g():
    yield from my_sleep(10)
    return "hello"

def f():
    ret = yield from g()
    return ret

loop = asyncio.get_event_loop()
ret = loop.run_until_complete(asyncio.gather(f()))
print(ret)

なぜyield futureでなくyield from futureなの?

上で説明したmy_sleep()のコードで、最後の行がyield from futureになっていたことに気づいた方も居るでしょう。ジェネレータの__next__()が呼ばれた時に返る値を設定する場合は、yieldを使用するのでした。逆にyield fromを指定するのは、他のイテレータを指定する場合なのでした。イテレータではない、ただの結果を代入する箱であるFutureをかえすために、なぜyield fromを使っているのでしょうか?

技術的な理由としては、Futureインスタンスは実はイテレータなのです! Future__iter__()を実装しており、この関数は以下のようになっています:

class Future:
    #....
    def __iter__(self):
        yield self

すなわち、my_sleep()のイテレーションは以下のようになっています。

  1. yield from my_sleep(1)が実行される。
  2. > go = my_sleep(1)によってmy_sleepのジェネレータオブジェクトを生成
  3. > it = go.__iter__()によってイテレータを生成(これはgoと同じものである)
  4. > my_sleepの最初の要素を取得するためres = it.__next__()が実行される
  5. > my_sleep()の中身の実行が始まる。
  6. > > my_sleep()内のyield fromの右辺の式が評価され、futureが生成される
  7. > > it_inner = future.__iter__()が実行される。
  8. > > res_inner = it_inner.__next__()が実行される。これはfutureと同じものである。
  9. > res_innerit.__next__()の戻り値となる。すなわち res = future となる

もう一つの、政治的な理由としては、ジェネレータ(又はコルーチン)とFutureを同列に扱えるようにしたかったのでしょう。これは、次章のawaitともつながってきます。

関連するライブラリのコード
Lib/asyncio/tasks.py
class Task(futures.Future):
    def __step(self, exc=None):
        coro = self._coro
        try:
            result = coro.send(None)
        except StopIteration as exc:
            super().set_result(exc.value)
        elif result != None:
            result.add_done_callback(self.__wakeup, context=self._context)
        else:
            self._loop.call_soon(self.__step, context=self._context)

    def __wakeup(self, future):
        self.__step()

async await キーワードを使う

前章と同じコードは、Python3.7以降では以下のように書けます。
(厳密には、本章で使用するのはcoroutine、前章で使用したのはgeneratorという多少の違いはあります)

import asyncio
import time

async def f(tag):
    for i in range(3):
        await asyncio.sleep(1)
        print("waiting for f(%d)" % tag)
    return "hello %d" % tag

loop = asyncio.get_event_loop()
tasks = [f(n) for n in range(3)]
ret = loop.run_until_complete(asyncio.gather(*tasks))
print(ret)

この書き方では、my_sleep()の代わりにasyncio.sleep()を使えます。

また、タスクが1個のみの場合は、asyncio.run()を使ってさらに簡単に書けます。

import asyncio
import time

async def g():
    await asyncio.sleep(10)
    return "hello"

async def f():
    return await g()

asyncio.run(f())

関連するライブラリのコード
Lib/asyncio/tasks.py
async def sleep(delay, result=None):
    loop = events.get_running_loop()
    future = loop.create_future()
    h = loop.call_later(delay,
                        futures._set_result_unless_cancelled,
                        future, result)
    return await future
Lib/asyncio/runner.py
def run(main):
    loop = events.new_event_loop()
    return loop.run_until_complete(main)

さいごに

こんな記事を下書きに溜め込んでいたのですが、他の方が似た記事を公開されていたので私も急いで(?)公開することにしました。

45
43
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
45
43