asyncio よくわからん人が、よくわからんなりに、何とか使ってみようとしていた時にハマった。
threading.Thread()
で別スレッドを開始して、その別スレッドで Future をset_result()
とかset_exception()
とかで完了しようとすると、単純にはうまくいかない。
(本当は、他スレッドで動かしたいなら asyncio.to_thread()
を使うのが素直)
動かないコード
例えば、以下のコードだとawait が抜けられない。10 カウント後にfut.set_result()
してるのに、いつまで経ってもprint(results)
に行かない。
import asyncio
import time
import threading
def heavy(name: str, fut: asyncio.Future[str]):
for i in range(10):
print(f"{name}: {i}")
time.sleep(1)
fut.set_result(f"{name}: done")
async def main():
loop = asyncio.get_event_loop()
fut1 = loop.create_future()
fut2 = loop.create_future()
threading.Thread(target=heavy, args=("hoge", fut1)).start()
threading.Thread(target=heavy, args=("fuga", fut2)).start()
results = await asyncio.gather(fut1, fut2)
print(results)
asyncio.run(main())
動くコード
どうやら、別スレッドから Future を完了しても反応しないようだ。
動かすには以下のようにする。肝はcall_soon_threadsafe
。イベント ループの管理下にない別スレッドから Future を完了しても反応しないのなら、イベントループ内で完了すればいい、という。
import asyncio
import time
import threading
def heavy(name: str, fut: asyncio.Future[str], loop: asyncio.BaseEventLoop):
for i in range(10):
print(f"{name}: {i}")
time.sleep(1)
def callback():
fut.set_result(f"{name}: done")
loop.call_soon_threadsafe(callback)
def main()
loop = asyncio.get_event_loop()
fut1 = loop.create_future()
fut2 = loop.create_future()
threading.Thread(target=heavy, args=("hoge", fut1, loop)).start()
threading.Thread(target=heavy, args=("fuga", fut2, loop)).start()
results = await asyncio.gather(fut1, fut2)
print(results)
asyncio.run(main())
多分望ましいコード
上のコードも動くけどゴチャゴチャしてて美しくない、そんなとき。 どうしても Future を直接扱いたい理由がなければ、asyncio.to_thread()
を使って、別スレッドも素直にイベント ループに任せればいい。
heavy は非同期処理を意識する必要がなくなり、同期処理的なコードでよくなる。
import asyncio
import time
def heavy(name: str):
for i in range(10):
print(f"{name}: {i}")
time.sleep(1)
return f"{name}: done"
def main():
coro1 = asyncio.to_thread(heavy, "hoge")
coro2 = asyncio.to_thread(heavy, "fuga")
results = await asyncio.gather(coro1, coro2)
print(results)
asyncio.run(main())
なぜ私はこんな目にあったのか
asyncio の存在を知らぬまま、threading を使ったクラスでこのイベント機構を使い、別スレッドでイベントを発火するクラスを作った。open()
で別スレッドを開始、途中でsend()
を受け付けたり recieved イベントを発火したりして、close()でスレッドを終了するようなクラス。
のちに asyncio を知り、以下のようなコードを書いた結果、この記事のような目にあった。
- コマンド的なものを send() し、Future を生成、「return await fut」するコルーチンを作成
- コマンドの結果を recieved イベント ハンドラー内で Future に放り込む
最初から asyncio を知っていれば、threading を使わずに書いたかも? まぁ書き直すのが面倒だったので、「動くコード」で放置してる。