LoginSignup
3
5

More than 5 years have passed since last update.

Pythonのasync/awaitで任意のユーザー定義イベントを待てるようにする

Posted at

Python 3.5で追加されたasyncioモジュールによって、ネットワークのI/Oやサブプロセスの起動を非同期に行うことができるようになった。
ここで非同期といっているのはスレッドによるものではなく、coroutineによるものを考えている。大量(数千、数万規模)の非同期な処理を同時に行えるようになっている。

asyncioは基本的にはI/Oの処理を非同期に行うためのライブラリだが、これを任意のイベントをawaitで待てるようにする方法をここでは考える。具体的には例えば

  • 特定のファイルが作られるまでawaitする
  • DBの特定のレコードが更新されるまでawaitする
  • ネットワークの入力で特定のバイト列がくるまで待つ

のようなケースを考えている。

ここではcoroutineが何かという部分については説明しない。またPythonのasync/awaitについての基本的な使い方は前提知識として持っていることを前提としている。
Pythonのasync/awaitについては公式ドキュメントの説明が一番正確でわかりやすいので、こちらを参照することをオススメする。

前提条件

  • Python 3.7

実装例

ここでは具体的に、coroutineの中で指定した名前のファイルが作られるまでawaitするケースを考える。
"foo", "bar", "baz"という3つのファイルが作られるのを監視し、それぞれのファイルが作られたら "File {filename} is found" と表示したいとしよう。
ここで、3つのファイルがどういう順番で作られてもよいものとする。
awaitを使って特定のファイルを待つというようなコードをかけるようにする。具体的には以下のようなコードをかけるようにする。

ここのfilecheckというモジュールは標準ライブラリではなく、これから実装していくものである。

main.py
import asyncio
import filecheck

async def user_func(f):
    await filecheck.watch_file(f)
    print(f"File {f} is found")

async def main():
    await asyncio.gather(
            user_func("foo"),
            user_func("baz"),
            user_func("bar")
            )
    print(f"All files are found")

filecheck.async_run( main() )

通常のcoroutineの場合はasyncio.run( coroutine )でイベントループを起動するが、ここでは代わりにfilecheck.async_run()を呼ぶ。
また、「ファイルが作成されるまで待つ」という処理は await filecheck.watch_file(f) という処理で書かれている。
mainの中で3つのファイルを監視するための3つのcoroutineが作成されて非同期に実行されている。

このファイルを実行する(python main.py)と、最初は何も表示されないが、別の端末でtouch fooなどというようにファイルを作ると"File foo is found"という文字列が標準出力に表示される。

filecheck.pyの実装

ではfilecheck.pyはどのように実装すれば良いか?以下が実装例になる。

filecheck.py
import os,asyncio
from collections import defaultdict

_file_events = defaultdict( list )

async def watch_file(f):
    event = asyncio.Event()
    _file_events[f].append(event)
    await event.wait()

async def _event_loop_for_file(task, interval=1):
    while not task.done():
        finished = []
        for f,events in _file_events.items():
            if os.path.exists(f):
                for e in events:
                    e.set()
                finished.append(f)
        for f in finished:
            del _file_events[f]
        await asyncio.sleep(interval)

def async_run(coroutine):
    async def _main():
        t = asyncio.create_task(coroutine)
        await asyncio.gather(
                t,
                _event_loop_for_file(t)
                )
    asyncio.run( _main() )

動作原理は

  • ファイルを監視するためのループを行うcoroutine(_event_loop_for_file)を自分で定義し実行
  • ユーザーから渡されたcoroutineも実行
  • ユーザーcoroutineの中で、watch_fileメソッドを呼び監視対象となるファイルを登録する。そのファイルができるまでEvent.wait()で待つ。
  • _event_loop_for_fileでは定期的にポーリングし、ファイルが作成されたらEvent.set()でユーザーのcoroutineに通知する。
  • ユーザーcoroutineとevent_loop_for_file は_file_eventsという辞書を経由してイベントを共有している。

ということを行なっている。

ちなみにasyncio.Eventクラスは複数のcoroutine間で同期を取るためのクラスの一つ。(参考 : https://docs.python.org/3/library/asyncio-sync.html

ここではファイルの存在検出までawaitするようにしたが、これを応用すれば任意のイベントをawaitで待てるように実装できる。

3
5
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
3
5