Python
Trio

Python + Trio の非同期コーディングパターン

この記事では Python3 + Trio で非同期なプログラムを作る際によく使うパターンを紹介します。

Trio や pytest-trio について詳しく知りたい方は本家のドキュメントがおすすめです。


動作環境


  • Python 3.7

  • Trio 0.11

  • pytest-trio 0.5.2


非同期処理を同時に実行

まずは基本中の基本、非同期なメソッドを同時に実行する方法です。

import trio

async def func1():
await trio.sleep(1)
print("func1 finished")

async def func2():
await trio.sleep(1)
print("func2 finished")

async def main():
async with trio.open_nursery() as nursery:
# func1, func2 を同時に実行
nursery.start_soon(func1)
nursery.start_soon(func2)
# 両方のメソッドが完了するのを待つ
print("all methods finished")

trio.run(main)

実行するとこんな感じ。

func2 finished

func1 finished
all methods finished

各メソッドに引数を渡すこともできます。

import trio

async def func1(t):
await trio.sleep(t)
print("func1 finished")

async def func2(t1, t2):
await trio.sleep(t1 + t2)
print("func2 finished")

async def main():
async with trio.open_nursery() as nursery:
nursery.start_soon(func1, 1.0)
nursery.start_soon(func2, 0.5, 1.0)
print("all methods finished")

trio.run(main)

実行結果。

func1 finished

func2 finished
all methods finished


非同期なメソッドを複数実行し、どれか1つが完了したら他の処理を中断

これは、複数の条件のうち、どれか一つを満たすのを待つようなパターンです。

import trio

async def cond1(nursery):
await trio.sleep(1.0)
print("cond1 satisfied")
nursery.cancel_scope.cancel()

async def cond2(nursery):
await trio.sleep(0.5)
print("cond2 satisfied")
nursery.cancel_scope.cancel()

async def main():
async with trio.open_nursery() as nursery:
nursery.start_soon(cond1, nursery)
nursery.start_soon(cond2, nursery)
print("one condition satisfied")

trio.run(main)

実行結果。

cond2 satisfied

one condition satisfied

cond2 が満たされた時点で処理が終了して、 cond1 は最後まで実行されなかったことがわかります。

ラッパーメソッドを作ると各メソッドに nursery を渡さずに済むので、もう少しきれいに書けます(結果は同じ)。

import trio

async def cond1():
await trio.sleep(1.0)
print("cond1 satisfied")

async def cond2():
await trio.sleep(0.5)
print("cond2 satisfied")

async def wrapper(func, nursery):
await func()
nursery.cancel_scope.cancel()

async def main():
async with trio.open_nursery() as nursery:
nursery.start_soon(wrapper, cond1, nursery)
nursery.start_soon(wrapper, cond2, nursery)
print("one condition satisfied")

trio.run(main)


ある処理が特定の条件を満たしてから他の処理を開始

これは、例えばデータベースとの接続を待ってデータの読み書きを開始したり、ある計算の完了を待って別の計算を行ったりするようなパターンで使います。

import trio

async def connect_db(*, task_status=trio.TASK_STATUS_IGNORED):
await trio.sleep(1.0)
print("connected to DB")
# nursery にタスクが開始できたことを通知する
task_status.started()
try:
await trio.sleep_forever()
finally:
print("disconnected from DB")

async def write_data(data):
await trio.sleep(0.5)
print(f"write data to DB: {data}")

async def read_data():
await trio.sleep(1.0)
print("read data from DB")
return "data"

async def main():
async with trio.open_nursery() as nursery:
# nursery.start を使うと、タスクが開始された通知が来るまで待つ
await nursery.start(connect_db)
# DB への接続を待ってデータの読み書きを開始
await write_data("foo")
_ = await read_data()
nursery.cancel_scope.cancel()
print("transaction finished")

trio.run(main)

実行結果。

connected to DB

write data to DB: foo
read data from DB
disconnected from DB
transaction finished

きちんと DB への接続を待って、 write_data, read_data が実行されていることが分かります。

なお、非同期なメソッド内で try: ~ finally:finally 句に終了処理を書くことで、 nursery.cancel_scope.cancelKeyboardInterrupt などで中断された場合にも、終了処理を行うことができます。上記の例では全体が終了する前に、 connect_dbfinally が実行されています。


一定時間で処理をキャンセル

非同期な処理にタイムアウトを設定したい場合のパターンです。

import trio

async def func(name, t):
try:
await trio.sleep(t)
print(f"{name} finished")
except trio.Cancelled:
print(f"{name} canceled")

async def main():
# 2.5秒でタイムアウト
with trio.move_on_after(2.5):
async with trio.open_nursery() as nursery:
nursery.start_soon(func, "func1", 1.0)
nursery.start_soon(func, "func2", 2.0)
nursery.start_soon(func, "func3", 3.0)
print("timeout")

trio.run(main)

実行結果。

func1 finished

func2 finished
func3 canceled
timeout

2.5秒以内に完了する func1, func2 は終了していますが、 3秒かかる func3 は中断されていることが分かります。


sleep 時間をスキップしてテスト時間を短縮

pytest を使ってテストをするときに、 pytest-trio の autojump_clock という機能を使うと、テストの実行時間を短縮することができます。

最初におまじないとして pytest.ini に以下の設定をしておきます。


pytest.ini

[pytest]

trio_mode = true

以下のようなテストを実行してみましょう。

import time

import trio

async def test1():
v0, t0 = trio.current_time(), time.time()
await trio.sleep(1)
v1, t1 = trio.current_time(), time.time()
print(f"test1: virtual time={v1 - v0}sec, real time={t1 - t0}sec")
assert 1 + 1 == 2

async def test2(autojump_clock):
v0, t0 = trio.current_time(), time.time()
await trio.sleep(1)
v1, t1 = trio.current_time(), time.time()
print(f"test2: virtual time={v1 - v0}sec, real time={t1 - t0}sec")
assert 1 + 1 == 2

実行結果。

test1: virtual time=1.0040155599999707sec, real time=1.0039751529693604sec

test2: virtual time=1.0sec, real time=0.0003070831298828125sec

普通に書いた test1 は trio の経過時間も実際の実行時間も約1秒であることが分かります。一方で autojump_clock を用いた test2 では trio の経過時間がピッタリ1秒で、実際の実行時間は1ミリ秒以下となっています。

autojump_clock を利用すると、テスト内ではバーチャルな時計が利用され、他に処理が行われていない trio.sleep の時間を飛ばして次の処理まで進めてくれるので、テスト内で trio.sleep を使っても実際にはほぼ時間をかけずテストを行うことができます。通常では sleep をテスト内で多用するとテスト自体が遅くなったり、それを回避するために細かい sleep を while で回す必要があったりするのですが、 autojump_clock を使うことで格段にテストが書きやすく、しかもスピーディになります。


さいごに

Python3 いいですよね! Trio サイコーですよね!

他にも便利なパターンがあったら随時本記事を更新していこうと思うので、「こんな工夫してるよ〜」とか「こんな便利な機能あるよー」とかご存知でしたら、是非コメントいただけると嬉しいです!