Python 3.5で追加されたasyncioモジュールによって、ネットワークのI/Oやサブプロセスの起動を非同期に行うことができるようになった。
ここで非同期といっているのはスレッドによるものではなく、coroutineによるものを考えている。大量(数千、数万規模)の非同期な処理を同時に行えるようになっている。
asyncioは基本的にはI/Oの処理を非同期に行うためのライブラリだが、これを任意のイベントをawait
で待てるようにする方法をここでは考える。具体的には例えば
- 特定のファイルが作られるまでawaitする
- DBの特定のレコードが更新されるまでawaitする
- ネットワークの入力で特定のバイト列がくるまで待つ
のようなケースを考えている。
ここではcoroutineが何かという部分については説明しない。またPythonのasync/awaitについての基本的な使い方は前提知識として持っていることを前提としている。
Pythonのasync/awaitについては公式ドキュメントの説明が一番正確でわかりやすいので、こちらを参照することをオススメする。
前提条件
- Python 3.7
実装例
ここでは具体的に、coroutineの中で指定した名前のファイルが作られるまでawaitするケースを考える。
"foo", "bar", "baz"という3つのファイルが作られるのを監視し、それぞれのファイルが作られたら "File {filename} is found" と表示したいとしよう。
ここで、3つのファイルがどういう順番で作られてもよいものとする。
awaitを使って特定のファイルを待つというようなコードをかけるようにする。具体的には以下のようなコードをかけるようにする。
ここのfilecheck
というモジュールは標準ライブラリではなく、これから実装していくものである。
import asyncio
import filecheck
async def user_func(f):
await filecheck.watch_file(f)
print(f"File {f} is found")
async def main():
await asyncio.gather(
user_func("foo"),
user_func("baz"),
user_func("bar")
)
print(f"All files are found")
filecheck.async_run( main() )
通常のcoroutineの場合はasyncio.run( coroutine )
でイベントループを起動するが、ここでは代わりにfilecheck.async_run()
を呼ぶ。
また、「ファイルが作成されるまで待つ」という処理は await filecheck.watch_file(f)
という処理で書かれている。
main
の中で3つのファイルを監視するための3つのcoroutineが作成されて非同期に実行されている。
このファイルを実行する(python main.py
)と、最初は何も表示されないが、別の端末でtouch foo
などというようにファイルを作ると"File foo is found"という文字列が標準出力に表示される。
filecheck.pyの実装
ではfilecheck.pyはどのように実装すれば良いか?以下が実装例になる。
import os,asyncio
from collections import defaultdict
_file_events = defaultdict( list )
async def watch_file(f):
event = asyncio.Event()
_file_events[f].append(event)
await event.wait()
async def _event_loop_for_file(task, interval=1):
while not task.done():
finished = []
for f,events in _file_events.items():
if os.path.exists(f):
for e in events:
e.set()
finished.append(f)
for f in finished:
del _file_events[f]
await asyncio.sleep(interval)
def async_run(coroutine):
async def _main():
t = asyncio.create_task(coroutine)
await asyncio.gather(
t,
_event_loop_for_file(t)
)
asyncio.run( _main() )
動作原理は
- ファイルを監視するためのループを行うcoroutine(
_event_loop_for_file
)を自分で定義し実行 - ユーザーから渡されたcoroutineも実行
- ユーザーcoroutineの中で、
watch_file
メソッドを呼び監視対象となるファイルを登録する。そのファイルができるまでEvent.wait()
で待つ。 -
_event_loop_for_file
では定期的にポーリングし、ファイルが作成されたらEvent.set()
でユーザーのcoroutineに通知する。 - ユーザーcoroutineとevent_loop_for_file は
_file_events
という辞書を経由してイベントを共有している。
ということを行なっている。
ちなみにasyncio.Event
クラスは複数のcoroutine間で同期を取るためのクラスの一つ。(参考 : https://docs.python.org/3/library/asyncio-sync.html )
ここではファイルの存在検出までawaitするようにしたが、これを応用すれば任意のイベントをawait
で待てるように実装できる。