1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

Python の asyncio で、別スレッドから Future を完了する

Last updated at Posted at 2023-03-16

asyncio よくわからん人が、よくわからんなりに、何とか使ってみようとしていた時にハマった。

threading.Thread()で別スレッドを開始して、その別スレッドで Future をset_result()とかset_exception()とかで完了しようとすると、単純にはうまくいかない。
(本当は、他スレッドで動かしたいなら asyncio.to_thread() を使うのが素直)

動かないコード

例えば、以下のコードだとawait が抜けられない。10 カウント後にfut.set_result()してるのに、いつまで経ってもprint(results)に行かない。

import asyncio
import time
import threading

def heavy(name: str, fut: asyncio.Future[str]):
    for i in range(10):
        print(f"{name}: {i}")
        time.sleep(1)
    fut.set_result(f"{name}: done")

async def main():
    loop = asyncio.get_event_loop()
    fut1 = loop.create_future()
    fut2 = loop.create_future()
    threading.Thread(target=heavy, args=("hoge", fut1)).start()
    threading.Thread(target=heavy, args=("fuga", fut2)).start()
    results = await asyncio.gather(fut1, fut2)
    print(results)

asyncio.run(main())

動くコード

どうやら、別スレッドから Future を完了しても反応しないようだ。
動かすには以下のようにする。肝はcall_soon_threadsafe。イベント ループの管理下にない別スレッドから Future を完了しても反応しないのなら、イベントループ内で完了すればいい、という。

import asyncio
import time
import threading

def heavy(name: str, fut: asyncio.Future[str], loop: asyncio.BaseEventLoop):
    for i in range(10):
        print(f"{name}: {i}")
        time.sleep(1)

    def callback():
        fut.set_result(f"{name}: done")

    loop.call_soon_threadsafe(callback)

def main()
    loop = asyncio.get_event_loop()
    fut1 = loop.create_future()
    fut2 = loop.create_future()
    threading.Thread(target=heavy, args=("hoge", fut1, loop)).start()
    threading.Thread(target=heavy, args=("fuga", fut2, loop)).start()
    results = await asyncio.gather(fut1, fut2)
    print(results)

asyncio.run(main())

多分望ましいコード

上のコードも動くけどゴチャゴチャしてて美しくない、そんなとき。 どうしても Future を直接扱いたい理由がなければ、asyncio.to_thread()を使って、別スレッドも素直にイベント ループに任せればいい。
heavy は非同期処理を意識する必要がなくなり、同期処理的なコードでよくなる。

import asyncio
import time

def heavy(name: str):
    for i in range(10):
        print(f"{name}: {i}")
        time.sleep(1)
    return f"{name}: done"

def main():
    coro1 = asyncio.to_thread(heavy, "hoge")
    coro2 = asyncio.to_thread(heavy, "fuga")
    results = await asyncio.gather(coro1, coro2)
    print(results)

asyncio.run(main())

なぜ私はこんな目にあったのか

asyncio の存在を知らぬまま、threading を使ったクラスでこのイベント機構を使い、別スレッドでイベントを発火するクラスを作った。open()で別スレッドを開始、途中でsend()を受け付けたり recieved イベントを発火したりして、close()でスレッドを終了するようなクラス。

のちに asyncio を知り、以下のようなコードを書いた結果、この記事のような目にあった。

  • コマンド的なものを send() し、Future を生成、「return await fut」するコルーチンを作成
  • コマンドの結果を recieved イベント ハンドラー内で Future に放り込む

最初から asyncio を知っていれば、threading を使わずに書いたかも? まぁ書き直すのが面倒だったので、「動くコード」で放置してる。

1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?