9
8

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

Python でコールバック形式の非同期APIを async/await に変換する

Last updated at Posted at 2017-04-23

概要

Python 3.5 から非同期プログラミングに async/await が使えるようになっています。
しかし、既存のライブラリは従来のコールバック型の非同期APIを持っているものが多く、そのままでは async/await を適用できません。そこで、そのようなコールバック型のAPIを async/await で使える形に変換します。

コールバック型の非同期APIの例

別のスレッド上で引数 a と 引数 b を加算し、結果をコールバックで返す
下記の async_add メソッドを考えます。

import time
import threading

def async_add(a, b, callback):
    def run():
        time.sleep(1)
        callback(a + b)
    thread = threading.Thread(target=run)
    thread.start()

これを使用して 1 + 2 + 3 の計算を行ってみましょう。次のようになります。

async_add(1, 2, lambda result1: \
        async_add(result1, 3, lambda result2: \
                print(result2)))

コールバックが入れ子になっていて複雑ですね。これをラップし、await を使って以下のようなイメージで書けるようにしたいと思います。

result1 = await awaitable_async_add(1, 2)
result2 = await awaitable_async_add(result1, 3)
print(result2)

実装

コード

import time
import threading
import asyncio

def async_add(a, b, callback):
    def run():
        time.sleep(1)
        callback(a + b)
    thread = threading.Thread(target=run)
    thread.start()

def awaitable_async_add(a, b, loop):
    f = asyncio.Future() # (1)
    def callback(result):
        loop.call_soon_threadsafe(
                lambda: f.set_result(result)) #(2)
    async_add(a, b, callback) # (1)
    return f # (1)

async def exec(loop):
    result1 = await awaitable_async_add(1, 2, loop)
    result2 = await awaitable_async_add(result1, 3, loop)
    print(result2)

loop = asyncio.get_event_loop() # (3)
loop.run_until_complete(exec(loop)) # (3)
loop.stop()

実行結果

6

解説

(1) awaitable_async_add メソッドが呼び出されると、async_add メソッドの実行を開始し、即座に asyncio.Future オブジェクトを返します。この時点で計算処理はまだ完了していません。

(2) 計算処理が完了するとコールバックが呼び出されます。処理結果を引数として asyncio.Future オブジェクトの set_result() メソッドを呼び出すことで、asyncio.Future オブジェクトに処理の完了を通知するとともに、計算結果を返します。

(3) イベントループを取得し、非同期処理が完了するまで処理を実行します。

ここで注意が必要なのが (2) の loop.call_soon_threadsafe です。試しに下記のように直接 f.set_result を呼び出してみると、エラーが発生します。

    def callback(result):
        f.set_result(result)

実行結果

RuntimeError: Non-thread-safe operation invoked on an event loop other than the current one

このエラーは set_result メソッドが (3) で実行しているイベントループとは別のスレッド上で呼び出されていることが原因です。asyncio.Future はスレッドセーフではないため、set_result メソッドは必ずイベントループと同じスレッドで呼び出す必要があります。そこで、イベントループの call_soon_threadsafe メソッドにコールバックとして処理を渡すことによって、イベントループ内で set_result を呼び出させています。

参考

9
8
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
9
8

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?