この記事では Python3 + Trio で非同期なプログラムを作る際によく使うパターンを紹介します。
Trio や pytest-trio について詳しく知りたい方は本家のドキュメントがおすすめです。
- Trio: https://trio.readthedocs.io/en/latest/
- pytest-trio: https://pytest-trio.readthedocs.io/en/latest/
動作環境
- Python 3.7
- Trio 0.11
- pytest-trio 0.5.2
非同期処理を同時に実行
まずは基本中の基本、非同期なメソッドを同時に実行する方法です。
import trio
async def func1():
await trio.sleep(1)
print("func1 finished")
async def func2():
await trio.sleep(1)
print("func2 finished")
async def main():
async with trio.open_nursery() as nursery:
# func1, func2 を同時に実行
nursery.start_soon(func1)
nursery.start_soon(func2)
# 両方のメソッドが完了するのを待つ
print("all methods finished")
trio.run(main)
実行するとこんな感じ。
func2 finished
func1 finished
all methods finished
各メソッドに引数を渡すこともできます。
import trio
async def func1(t):
await trio.sleep(t)
print("func1 finished")
async def func2(t1, t2):
await trio.sleep(t1 + t2)
print("func2 finished")
async def main():
async with trio.open_nursery() as nursery:
nursery.start_soon(func1, 1.0)
nursery.start_soon(func2, 0.5, 1.0)
print("all methods finished")
trio.run(main)
実行結果。
func1 finished
func2 finished
all methods finished
非同期なメソッドを複数実行し、どれか1つが完了したら他の処理を中断
これは、複数の条件のうち、どれか一つを満たすのを待つようなパターンです。
import trio
async def cond1(nursery):
await trio.sleep(1.0)
print("cond1 satisfied")
nursery.cancel_scope.cancel()
async def cond2(nursery):
await trio.sleep(0.5)
print("cond2 satisfied")
nursery.cancel_scope.cancel()
async def main():
async with trio.open_nursery() as nursery:
nursery.start_soon(cond1, nursery)
nursery.start_soon(cond2, nursery)
print("one condition satisfied")
trio.run(main)
実行結果。
cond2 satisfied
one condition satisfied
cond2
が満たされた時点で処理が終了して、 cond1
は最後まで実行されなかったことがわかります。
ラッパーメソッドを作ると各メソッドに nursery
を渡さずに済むので、もう少しきれいに書けます(結果は同じ)。
import trio
async def cond1():
await trio.sleep(1.0)
print("cond1 satisfied")
async def cond2():
await trio.sleep(0.5)
print("cond2 satisfied")
async def wrapper(func, nursery):
await func()
nursery.cancel_scope.cancel()
async def main():
async with trio.open_nursery() as nursery:
nursery.start_soon(wrapper, cond1, nursery)
nursery.start_soon(wrapper, cond2, nursery)
print("one condition satisfied")
trio.run(main)
ある処理が特定の条件を満たしてから他の処理を開始
これは、例えばデータベースとの接続を待ってデータの読み書きを開始したり、ある計算の完了を待って別の計算を行ったりするようなパターンで使います。
import trio
async def connect_db(*, task_status=trio.TASK_STATUS_IGNORED):
await trio.sleep(1.0)
print("connected to DB")
# nursery にタスクが開始できたことを通知する
task_status.started()
try:
await trio.sleep_forever()
finally:
print("disconnected from DB")
async def write_data(data):
await trio.sleep(0.5)
print(f"write data to DB: {data}")
async def read_data():
await trio.sleep(1.0)
print("read data from DB")
return "data"
async def main():
async with trio.open_nursery() as nursery:
# nursery.start を使うと、タスクが開始された通知が来るまで待つ
await nursery.start(connect_db)
# DB への接続を待ってデータの読み書きを開始
await write_data("foo")
_ = await read_data()
nursery.cancel_scope.cancel()
print("transaction finished")
trio.run(main)
実行結果。
connected to DB
write data to DB: foo
read data from DB
disconnected from DB
transaction finished
きちんと DB への接続を待って、 write_data
, read_data
が実行されていることが分かります。
なお、非同期なメソッド内で try: ~ finally:
の finally
句に終了処理を書くことで、 nursery.cancel_scope.cancel
や KeyboardInterrupt
などで中断された場合にも、終了処理を行うことができます。上記の例では全体が終了する前に、 connect_db
の finally
が実行されています。
一定時間で処理をキャンセル
非同期な処理にタイムアウトを設定したい場合のパターンです。
import trio
async def func(name, t):
try:
await trio.sleep(t)
print(f"{name} finished")
except trio.Cancelled:
print(f"{name} canceled")
async def main():
# 2.5秒でタイムアウト
with trio.move_on_after(2.5):
async with trio.open_nursery() as nursery:
nursery.start_soon(func, "func1", 1.0)
nursery.start_soon(func, "func2", 2.0)
nursery.start_soon(func, "func3", 3.0)
print("timeout")
trio.run(main)
実行結果。
func1 finished
func2 finished
func3 canceled
timeout
2.5秒以内に完了する func1
, func2
は終了していますが、 3秒かかる func3
は中断されていることが分かります。
sleep 時間をスキップしてテスト時間を短縮
pytest を使ってテストをするときに、 pytest-trio の autojump_clock
という機能を使うと、テストの実行時間を短縮することができます。
最初におまじないとして pytest.ini
に以下の設定をしておきます。
[pytest]
trio_mode = true
以下のようなテストを実行してみましょう。
import time
import trio
async def test1():
v0, t0 = trio.current_time(), time.time()
await trio.sleep(1)
v1, t1 = trio.current_time(), time.time()
print(f"test1: virtual time={v1 - v0}sec, real time={t1 - t0}sec")
assert 1 + 1 == 2
async def test2(autojump_clock):
v0, t0 = trio.current_time(), time.time()
await trio.sleep(1)
v1, t1 = trio.current_time(), time.time()
print(f"test2: virtual time={v1 - v0}sec, real time={t1 - t0}sec")
assert 1 + 1 == 2
実行結果。
test1: virtual time=1.0040155599999707sec, real time=1.0039751529693604sec
test2: virtual time=1.0sec, real time=0.0003070831298828125sec
普通に書いた test1
は trio の経過時間も実際の実行時間も約1秒であることが分かります。一方で autojump_clock
を用いた test2
では trio の経過時間がピッタリ1秒で、実際の実行時間は1ミリ秒以下となっています。
autojump_clock
を利用すると、テスト内ではバーチャルな時計が利用され、他に処理が行われていない trio.sleep
の時間を飛ばして次の処理まで進めてくれるので、テスト内で trio.sleep
を使っても実際にはほぼ時間をかけずテストを行うことができます。通常では sleep をテスト内で多用するとテスト自体が遅くなったり、それを回避するために細かい sleep を while で回す必要があったりするのですが、 autojump_clock
を使うことで格段にテストが書きやすく、しかもスピーディになります。
さいごに
Python3 いいですよね! Trio サイコーですよね!
他にも便利なパターンがあったら随時本記事を更新していこうと思うので、「こんな工夫してるよ〜」とか「こんな便利な機能あるよー」とかご存知でしたら、是非コメントいただけると嬉しいです!