Future
について
Pythonで時間のかかる計算を実行して、その結果を得たいと思った時、通常はその処理を関数にまとめ、その関数を実行た時の返り値として得ることになります。これを同期処理といいます。
一方、同期処理とは異なる概念として非同期処理というものがあります。これは、Future
というオブジェクトを介して、計算を要求する処理(receiver)と実際の計算を行う処理(sender)の間で以下のようなやり取りを行います。
- (receiver)
Future
オブジェクトを生成する。 - (receiver) 何らかの手段を用いて receiver を実行する。
- (sender) 時間のかかる計算を行い、計算結果を、receiver が生成した
Future
オブジェクトに書き込む - (receiver)
Future
オブジェクトを確認し、計算結果が格納されていれば計算結果を取得する。
ここまでの処理は、例えば以下のようになります。
import asyncio
import time
def f(future):
time.sleep(5) # 時間のかかる処理
future.set_result("hello")
return
future = asyncio.futures.Future()
f(future)
if future.done():
res = future.result()
print(res)
これを実行すると、5秒待った後に「hello」と表示されます。
一応、`Future`の関連するコードを示すと以下のようになります。(一部省略)
class Future:
_state = _PENDING
_result = None
def done(self):
return self._state != _PENDING
def result(self):
if self._state != _FINISHED:
raise exceptions.InvalidStateError('Result is not ready.')
return self._result
def set_result(self, result):
if self._state != _PENDING:
raise exceptions.InvalidStateError(f'{self._state}: {self!r}')
self._result = result
self._state = _FINISHED
イベントループ
既にお気づきのとおり、上のコードはFuture
オブジェクトを使用している以外は、通常の関数呼び出しと同じです。なぜなら、sender のコードを receiver が直接実行しているからです。これではFuture
の利点が活かされません。
ここでイベントループという概念が登場します。イベントループは1スレッドに0個または1個存在するオブジェクトで、登録された関数を実行する機能を持ちます。
実際に使ってみましょう。
import asyncio
import time
def f(future):
time.sleep(5) # 時間のかかる処理
future.set_result("hello")
return
loop = asyncio.get_event_loop()
future = asyncio.futures.Future()
loop.call_soon(f, future)
loop.run_forever()
上のコードでは、asyncio.get_event_loop
を呼び出してBaseEventLoop
オブジェクトを取得しています。そして、call_soon
によって関数f
をloop
に登録しています。最後にloop.run_forever()
でイベントループを実行しています。
これを実際実行すると、run_forever()
で無限ループになっており永遠にプログラムが終了しません。代わりに、以下のように書くことで、関数f()
の実行が終わった後に自動的にイベントループを停止することができます。
res = loop.run_until_complete(future)
print(res)
run_until_complete()
はどのようにして関数f()
の完了を知ることができるのでしょうか。これには、future
のコールバックという仕組みが用いられています。
run_until_complete()
ではまずfuture.add_done_callback()
という関数を実行し、future
にコールバックを設定しています。その後run_forever()
が呼ばれ関数f()
が実行されます。その後関数f()
内でfuture.set_result()
によって値が設定されると、add_done_callback()
によって設定されたコールバックが呼ばれます。run_until_complete()
が設定するコールバック内では、loop.stop()
によってイベントループの終了を予約する処理を行っているため、f()
の実行終了後にイベントループが停止します。
注意点としては、future.set_result()
が実行されて即座に関数f()
の実行が終了するわけではないことです。あくまで終了が予約されるだけで、実際はreturn
まで実行が継続されます。
関係するライブラリのコードを載せます。
import contextvars
class Handle:
def __init__(self, callback, args, loop):
self._context = contextvars.copy_context()
self._loop = loop
self._callback = callback
self._args = args
def _run(self):
self._context.run(self._callback, *self._args)
class BaseEventLoop(events.AbstractEventLoop):
def __init__(self):
self._stopping = False
self._ready = collections.deque()
def _call_soon(self, callback, args, context):
handle = events.Handle(callback, args, self, context)
self._ready.append(handle)
return handle
def _run_once(self):
ntodo = len(self._ready)
for i in range(ntodo):
handle = self._ready.popleft()
if handle._cancelled:
continue
handle._run()
def run_forever(self):
while True:
self._run_once()
if self._stopping:
break
def run_until_complete(self, future):
def _run_until_complete_cb(fut):
self.stop()
future.add_done_callback(_run_until_complete_cb)
self.run_forever()
return future.result()
def stop(self):
self._stopping = True
class Future:
def add_done_callback(self, fn):
context = contextvars.copy_context()
self._callbacks.append((fn, context))
def set_result(self, result):
# ... 省略
for callback, ctx in self._callbacks[:]:
self._loop.call_soon(callback, self, context=ctx)
イベントループを用いた複数の処理の実行
前章では、イベントループを用いて処理を実行しました。しかし、変わったところは時間のかかる処理をおこなう関数f
を直接実行するのではなく、イベントループを介して実行した所だけでした。これではやっていることは何ら変わりがありません。
イベントループの本領は、複数の処理を実行した時に発揮されます。では、実際にやってみましょう。
import asyncio
import time
def f(future, tag):
for _ in range(3):
time.sleep(1)
print("waiting for f(%d)" % tag)
future.set_result("hello %d" % tag)
return
loop = asyncio.get_event_loop()
futures = []
for tag in range(3):
future = loop.create_future()
loop.call_soon(f, future, tag)
futures += [future]
res = loop.run_until_complete(asyncio.gather(*futures))
print(res)
このコードでは3つの処理を登録しています。また、複数のFuture
を一つに束ねるため、asyncio.gather
という新しい関数を用いています。この実行結果は以下のようになります。
waiting for f(0)
waiting for f(0)
waiting for f(0)
waiting for f(1)
waiting for f(1)
waiting for f(1)
waiting for f(2)
waiting for f(2)
waiting for f(2)
['hello 0', 'hello 1', 'hello 2']
この結果からわかるようにf(0)
,f(1)
,f(2)
は並列で実行されているわけではないことに注意してください。ライブラリのソースコードを見れば分かるように、loop.run_until_complete()
内ではloop._ready
に登録されたコールバックを順次実行しているだけなのです。
関連するライブラリのコードを掲載します。
class _GatheringFuture(futures.Future):
def __init__(self, children, *, loop=None):
super().__init__(loop=loop)
self._children = children
self._cancel_requested = False
def gather(*coros_or_futures, loop=None, return_exceptions=False):
def _done_callback(fut):
nonlocal nfinished
nfinished += 1
if nfinished == nfuts:
results = []
for fut in children:
res = fut.result()
results.append(res)
outer.set_result(results)
arg_to_fut = {}
children = []
nfuts = 0
nfinished = 0
for arg in coros_or_futures:
nfuts += 1
fut.add_done_callback(_done_callback)
children.append(fut)
outer = _GatheringFuture(children, loop=loop)
return outer
ジェネレーター
ここで脱線しPythonのジェネレーターについて確認します。
ジェネレータは「イテレータを返す関数」です。ジェネレータを実行するとジェネレータオブジェクトが返されます。ジェネレータオブジェクトはイテレータを表す関数__iter__()
を実装しています。ジェネレータは以下のように実装します。
def generator():
yield 1
yield 2
yield 3
return "END"
gg = generator().__iter__()
print(gg.__next__())
print(gg.__next__())
print(gg.__next__())
try:
print(gg.__next__())
except StopIteration as e:
print(e.value)
ここで、yield
はジェネレータの中身の処理を一時的に止める働きを持ちます。また、ジェネレータは2段重ねにすることもできます。
def generator2():
yield 1
yield 2
yield 3
return "END"
def generator():
a = yield from generator2()
return a
gg = generator().__iter__()
print(gg.__next__())
print(gg.__next__())
print(gg.__next__())
try:
print(gg.__next__())
except StopIteration as e:
print(e.value)
実行結果はどちらも
1
2
3
END
となります。
ジェネレータのイベントループによる実行
前々章で扱ったように、loop.run_until_complete
を用いて複数の関数を実行する場合、1つ目の関数の実行が完了した後に2つ目の関数を実行し,...というように関数は並列には実行されず、順番に実行されます。ここで、関数ではなくジェネレータを用いると以下のようになります。
import asyncio
import time
def f(tag):
for _ in range(3):
yield
time.sleep(1)
print("waiting for f(%d)" % tag)
return "hello %d" % tag
loop = asyncio.get_event_loop()
tasks = []
for tag in range(3):
task = f(tag)
tasks += [task]
res = loop.run_until_complete(asyncio.gather(*tasks))
print(res)
ここで、関数f()
内にyield
命令を追加し、また計算結果はfuture.set_result
ではなくreturn
で返すようにしました。引数future
は不要となったので削除しました。
この実行結果は以下のようになります。
waiting for f(0)
waiting for f(1)
waiting for f(2)
waiting for f(0)
waiting for f(1)
waiting for f(2)
waiting for f(0)
waiting for f(1)
waiting for f(2)
['hello 0', 'hello 1', 'hello 2']
前々章ではf(0)
が3回表示された後にf(1)
が表示され,...となっていたのが、f(0)
,f(1)
,f(2)
の順で表示されるようになりました。なぜなら、たとえイベントループに複数のタスクが登録されていたとしても、すべて1つのスレッド内で実行されるからです。また、イベントループはPythonの関数の実行を一時停止することはできないため、return
などによって自発的に関数が停止するまで1つの関数を実行し続けるしかないためです。
一方、ジェネレータを使った場合は、yield
によって関数の実行が一時停止します。このタイミングで処理がイベントループ側に戻るため、イベントループが実行するタスクを切り替えることができるのです
ちなみに、ジェネレータを使用する最小の例は以下の通りです。(これではタスクが1つしかないためジェネレータにした意味がないですが...。)
import asyncio
import time
def f():
time.sleep(5) # 時間のかかる処理
yield
return "hello"
loop = asyncio.get_event_loop()
ret = loop.run_until_complete(f())
print(ret)
ちなみに、ジェネレータを使わないバージョンでは、loop.call_soon()
を呼び出して関数f()
をイベントループに登録していましたが、本章ではこれを呼び出していないことに疑問を持った方が多いと思います。具体的には以下のようになります。
関数名 | 引数(Future版) | 引数(Generator版) |
---|---|---|
f() |
future |
なし |
loop.call_soon() |
f |
-- |
loop.run_until_complete() |
future |
f |
run_until_complete()
内では、与えられた引数がジェネレータオブジェクト(ジェネレータとして定義された関数f()
を呼んで得られるもの)の場合、(Future
のサブクラスである)Task
インスタンスを生成します。この生成時に内部的にcall_soon()
が呼ばれているのです。
関連するライブラリのコード
class BaseEventLoop(events.AbstractEventLoop):
def run_until_complete(self, future):
future = tasks.ensure_future(future, loop=self)
future.add_done_callback(_run_until_complete_cb)
self.run_forever()
return future.result()
def ensure_future(coro_or_future, loop):
if isinstance(coro_or_future, types.CoroutineType) or isinstance(coro_or_future, types.GeneratorType):
task = tasks.Task(coro_or_future, loop=loop)
return task
else:
return coro_or_future
class Task(futures.Future):
def __init__(self, coro, loop=None):
super().__init__(loop=loop)
self._coro = coro
self._loop = loop
self._context = contextvars.copy_context()
loop.call_soon(self.__step, context=self._context)
_register_task(self)
def __step(self, exc=None):
coro = self._coro
self._fut_waiter = None
try:
result = coro.send(None)
except StopIteration as exc:
super().set_result(exc.value)
else:
self._loop.call_soon(self.__step, context=self._context)
sleep時に他のタスクを実行する
ここまでの例では、time.sleep()
を多用していました。これはもちろん「時間のかかる処理」を例示するためですが、実際に実用目的でsleep()
したい場合もあります。例えば、
- メインの処理でネットワークの通信を行っている間、サブの処理として一定時間待ったあと、キャンセルの処理をするタイムアウト処理
- メインの処理で時間のかかる計算を行っている間、サブの処理で進捗状況を表示する
しかしながら、このような場合にサブの処理でtime.sleep()
を使うことが出来ません。なぜなら、サブの処理でいったんtime.sleep()
を実行してしまうと、sleepしている間にメインの処理を続行することができず、結局time.sleep()
が終了するまでサブがイベントループを専有し続けることになるからです。
タスクの中で一定時間待ちたいが、待っている時間はイベントループに処理を戻したい。そのような場合は、loop.call_later()
関数が使用できます。この関数は、指定秒数だけ待った後に与えられた関数を実行します。
この性質を用いて、以下のようにmy_sleep()
を実装できます。
import asyncio
import time
def my_sleep(delay):
def _cb_set_result(fut):
fut.set_result(None)
loop = asyncio.get_running_loop()
future = loop.create_future()
h = loop.call_later(delay, _cb_set_result, future)
yield from future
def f(tag):
for i in range(3):
yield from my_sleep(1)
print("waiting for f(%d)" % tag)
return "hello %d" % tag
loop = asyncio.get_event_loop()
tasks = [f(n) for n in range(3)]
ret = loop.run_until_complete(asyncio.gather(*tasks))
print(ret)
これは、前章の処理をmy_sleep()
を使って書き直したものです。前章は3つの処理でそれぞれ3秒ずつ待っていたので合計9秒間かかりました。しかし、今回の処理は約3秒で実行が終了します。
もう少し複雑なこともできます。例えば、タスクの中である関数を呼び出しており、その関数がmy_sleep()
しようとしている場合を考えます。この場合、以下のように呼び出す関数をジェネレータとして定義すれば大丈夫です。
def g():
yield from my_sleep(10)
return "hello"
def f():
ret = yield from g()
return ret
loop = asyncio.get_event_loop()
ret = loop.run_until_complete(asyncio.gather(f()))
print(ret)
なぜyield future
でなくyield from future
なの?
上で説明したmy_sleep()
のコードで、最後の行がyield from future
になっていたことに気づいた方も居るでしょう。ジェネレータの__next__()
が呼ばれた時に返る値を設定する場合は、yield
を使用するのでした。逆にyield from
を指定するのは、他のイテレータを指定する場合なのでした。イテレータではない、ただの結果を代入する箱であるFuture
をかえすために、なぜyield from
を使っているのでしょうか?
技術的な理由としては、Future
インスタンスは実はイテレータなのです! Future
は__iter__()
を実装しており、この関数は以下のようになっています:
class Future:
#....
def __iter__(self):
yield self
すなわち、my_sleep()
のイテレーションは以下のようになっています。
-
yield from my_sleep(1)
が実行される。 -
go = my_sleep(1)
によってmy_sleep
のジェネレータオブジェクトを生成 -
it = go.__iter__()
によってイテレータを生成(これはgo
と同じものである) -
my_sleep
の最初の要素を取得するためres = it.__next__()
が実行される -
my_sleep()
の中身の実行が始まる。 -
my_sleep()
内のyield from
の右辺の式が評価され、future
が生成される -
it_inner = future.__iter__()
が実行される。 -
res_inner = it_inner.__next__()
が実行される。これはfuture
と同じものである。 -
res_inner
がit.__next__()
の戻り値となる。すなわちres = future
となる
もう一つの、政治的な理由としては、ジェネレータ(又はコルーチン)とFuture
を同列に扱えるようにしたかったのでしょう。これは、次章のawait
ともつながってきます。
関連するライブラリのコード
class Task(futures.Future):
def __step(self, exc=None):
coro = self._coro
try:
result = coro.send(None)
except StopIteration as exc:
super().set_result(exc.value)
elif result != None:
result.add_done_callback(self.__wakeup, context=self._context)
else:
self._loop.call_soon(self.__step, context=self._context)
def __wakeup(self, future):
self.__step()
async
await
キーワードを使う
前章と同じコードは、Python3.7以降では以下のように書けます。
(厳密には、本章で使用するのはcoroutine
、前章で使用したのはgenerator
という多少の違いはあります)
import asyncio
import time
async def f(tag):
for i in range(3):
await asyncio.sleep(1)
print("waiting for f(%d)" % tag)
return "hello %d" % tag
loop = asyncio.get_event_loop()
tasks = [f(n) for n in range(3)]
ret = loop.run_until_complete(asyncio.gather(*tasks))
print(ret)
この書き方では、my_sleep()
の代わりにasyncio.sleep()
を使えます。
また、タスクが1個のみの場合は、asyncio.run()
を使ってさらに簡単に書けます。
import asyncio
import time
async def g():
await asyncio.sleep(10)
return "hello"
async def f():
return await g()
asyncio.run(f())
関連するライブラリのコード
async def sleep(delay, result=None):
loop = events.get_running_loop()
future = loop.create_future()
h = loop.call_later(delay,
futures._set_result_unless_cancelled,
future, result)
return await future
def run(main):
loop = events.new_event_loop()
return loop.run_until_complete(main)
さいごに
こんな記事を下書きに溜め込んでいたのですが、他の方が似た記事を公開されていたので私も急いで(?)公開することにしました。