0
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Pythonの非同期処理 — async/awaitをPHPのPromise感覚で理解しようとした話

0
Posted at

はじめに

FastAPIを触っているとasync defawaitが至るところに出てくる。

最初はPHPのPromiseやJavaScriptのasync/awaitと同じようなものだろうと思っていた。でも実際に調べてみるとevent loopの概念から理解しないとハマる、ということがわかった。「似てるようで全然違う」と感じた部分を中心に整理した。


まず同期処理の問題から

なぜ非同期が必要なのかを最初に整理する。

import time

def fetch_user(user_id: int) -> dict:
    time.sleep(2)  # APIコール(2秒かかる処理)
    return {"id": user_id, "name": "田中"}

def fetch_order(order_id: int) -> dict:
    time.sleep(2)  # APIコール(2秒かかる処理)
    return {"id": order_id, "amount": 1000}

# 同期的に実行
start = time.perf_counter()
user  = fetch_user(1)
order = fetch_order(101)
end   = time.perf_counter()

print(f"所要時間: {end - start:.1f}")  # 4.0秒

fetch_user()が終わるまでfetch_order()は待つので合計4秒かかる。この2つは互いに依存していないので、同時に実行できれば2秒で済むはず。

これを解決するのが非同期処理。


asyncio — Pythonの非同期処理の基盤

Pythonの非同期処理はasyncioというライブラリが基盤になっている。

import asyncio
import time

async def fetch_user(user_id: int) -> dict:
    await asyncio.sleep(2)  # 非同期のスリープ
    return {"id": user_id, "name": "田中"}

async def fetch_order(order_id: int) -> dict:
    await asyncio.sleep(2)  # 非同期のスリープ
    return {"id": order_id, "amount": 1000}

async def main():
    start = time.perf_counter()

    # 同時に実行
    user, order = await asyncio.gather(
        fetch_user(1),
        fetch_order(101),
    )

    end = time.perf_counter()
    print(f"所要時間: {end - start:.1f}")  # 2.0秒
    print(user, order)

asyncio.run(main())

asyncio.gather()で複数のコルーチンを同時に実行できる。4秒→2秒になった。


コルーチンとevent loop

ここがPHPとの一番の概念的な違い。

async defで定義した関数はコルーチンになる。コルーチンを呼んでも即実行されない。

async def greet(name: str) -> str:
    return f"こんにちは、{name}さん"

# コルーチンを呼んでも実行されない
result = greet("田中")
print(result)  # <coroutine object greet at 0x...>

# awaitすることで初めて実行される
result = await greet("田中")  # これはasync関数の中でしか書けない

コルーチンはevent loopが管理して実行する。

event loop
├── コルーチンA を実行
│   └── await で I/O待ちになる
│       ↓
├── コルーチンB を実行(Aが待ってる間に)
│   └── await で I/O待ちになる
│       ↓
├── コルーチンA の I/O完了 → 再開
└── コルーチンB の I/O完了 → 再開

1つのスレッドの中で交互に実行されるのがポイント。複数スレッドで並列実行しているわけではない。

PHPにはevent loopの概念がない(ReactPHPなどのライブラリを使えば実現できるが標準ではない)。JavaScriptのevent loopに近いが、JavaScriptはシングルスレッドが前提でevent loopが常に動いているのに対して、Pythonは明示的にasyncio.run()でevent loopを起動する必要がある。


async/awaitの基本ルール

# ① async def で定義した関数はコルーチン
async def my_func():
    ...

# ② awaitはasync defの中でしか使えない
async def caller():
    result = await my_func()  # OK

def normal_func():
    result = await my_func()  # SyntaxError!

# ③ awaitできるのはawaitableなオブジェクトだけ
# → コルーチン、Task、Future
async def example():
    await asyncio.sleep(1)     # OK(コルーチン)
    await some_async_func()    # OK(コルーチン)
    await 42                   # TypeError!

「awaitはasync defの中でしか使えない」というルールが伝播する。非同期関数を呼ぶ関数もasync defにしなければいけないので、コードベースに少しずつ広がっていく。これを「async汚染」と呼ぶことがある。


FastAPIでの非同期

FastAPIはasyncioの上に構築されているので、async defでエンドポイントを定義できる。

from fastapi import FastAPI
import httpx   # 非同期HTTPクライアント

app = FastAPI()

# 非同期エンドポイント
@app.get("/users/{user_id}")
async def get_user(user_id: int):
    async with httpx.AsyncClient() as client:
        response = await client.get(f"https://api.example.com/users/{user_id}")
        return response.json()
# 複数のAPIを同時に叩く
@app.get("/dashboard")
async def get_dashboard(user_id: int):
    async with httpx.AsyncClient() as client:
        user_task  = client.get(f"https://api.example.com/users/{user_id}")
        order_task = client.get(f"https://api.example.com/orders?user_id={user_id}")

        user_res, order_res = await asyncio.gather(user_task, order_task)

    return {
        "user":   user_res.json(),
        "orders": order_res.json(),
    }

FastAPIではasync defと普通のdefの両方が使える。

# 同期エンドポイント(CPUバウンドな処理や同期ライブラリを使う場合)
@app.get("/compute")
def compute(n: int):
    return {"result": sum(range(n))}

# 非同期エンドポイント(I/Oバウンドな処理)
@app.get("/fetch")
async def fetch(url: str):
    async with httpx.AsyncClient() as client:
        res = await client.get(url)
        return res.json()

FastAPIは同期関数を自動的に別スレッドで実行してくれるので、必ずしも全部async defにする必要はない。


非同期コンテキストマネージャ

# 通常のwith
with open("file.txt") as f:
    content = f.read()

# 非同期のwith(async with)
async with httpx.AsyncClient() as client:
    response = await client.get("https://example.com")
# 非同期のfor(async for)
async def generate_numbers():
    for i in range(5):
        await asyncio.sleep(0.1)
        yield i

async def main():
    async for num in generate_numbers():
        print(num)

async withasync forは非同期版のwithfor。通常のwith/forがawaitableなオブジェクトに対応した形。


タスクと並列実行

import asyncio

async def task(name: str, seconds: float) -> str:
    print(f"{name} 開始")
    await asyncio.sleep(seconds)
    print(f"{name} 完了")
    return f"{name}の結果"

async def main():
    # gather — 全部完了するまで待つ
    results = await asyncio.gather(
        task("A", 1.0),
        task("B", 2.0),
        task("C", 0.5),
    )
    print(results)  # ['Aの結果', 'Bの結果', 'Cの結果']

asyncio.run(main())
# C 開始
# B 開始
# A 開始
# C 完了(0.5秒後)
# A 完了(1.0秒後)
# B 完了(2.0秒後)

Taskを明示的に作る

async def main():
    # create_taskで即座にスケジュールされる
    task_a = asyncio.create_task(task("A", 1.0))
    task_b = asyncio.create_task(task("B", 2.0))

    # 他の処理をここで挟める
    print("タスクをスケジュール済み")

    # 完了を待つ
    result_a = await task_a
    result_b = await task_b

asyncio.gather()は全コルーチンを同時に開始して全部の完了を待つ。個別に制御したい場合はcreate_task()でTaskオブジェクトを作る。

タイムアウト

async def main():
    try:
        result = await asyncio.wait_for(
            task("A", 10.0),  # 10秒かかる処理
            timeout=3.0,      # 3秒でタイムアウト
        )
    except asyncio.TimeoutError:
        print("タイムアウトしました")

非同期が効かないケース — CPUバウンド処理

非同期はI/O待ち(ネットワーク・ファイル・DB)には効果があるが、CPU処理には効かない

import asyncio

def heavy_cpu_task(n: int) -> int:
    # CPUを使う処理(計算)
    return sum(range(n))

async def main():
    # これは並列にならない(CPUを使い切ってしまう)
    results = await asyncio.gather(
        asyncio.to_thread(heavy_cpu_task, 10_000_000),
        asyncio.to_thread(heavy_cpu_task, 10_000_000),
    )

CPU処理を並列化したいならProcessPoolExecutorを使う。

from concurrent.futures import ProcessPoolExecutor
import asyncio

async def main():
    loop = asyncio.get_event_loop()
    with ProcessPoolExecutor() as pool:
        result = await loop.run_in_executor(pool, heavy_cpu_task, 10_000_000)
    print(result)
処理の種類 解決策
I/Oバウンド(API・DB・ファイル) async/await
CPUバウンド(計算・変換) ProcessPoolExecutor
同期ライブラリのI/O asyncio.to_thread()

PHPとの比較まとめ

概念 PHP Python
非同期の標準サポート なし(ReactPHP等) asyncio(標準ライブラリ)
コルーチン なし async def
待機 なし await
並列実行 なし(標準) asyncio.gather()
event loop なし(標準) asyncio.run()
非同期HTTP なし(標準) httpxaiohttp

PHPはリクエストごとにプロセスを立ち上げる同期モデルが基本なので、非同期という概念自体があまり必要になる場面がなかった。Pythonはasyncioが標準で組み込まれていて、FastAPIのようなフレームワークも非同期前提で設計されている。


ハマったポイント

# ① async関数をawaitせずに呼んだ
async def get_data():
    return [1, 2, 3]

data = get_data()   # コルーチンオブジェクトが返るだけ
print(data)         # <coroutine object get_data at 0x...>
# → await get_data() にする

# ② 同期ライブラリをasync関数内でそのまま使う
import requests  # 同期ライブラリ

async def fetch():
    res = requests.get("https://example.com")  # event loopをブロックする!
    return res.json()
# → httpxやaiohttpなどの非同期対応ライブラリを使う

# ③ asyncio.run()の中でasyncio.run()を呼ぶ
async def main():
    asyncio.run(some_coroutine())  # RuntimeError!
# → awaitを使う

特に②は気づきにくいバグになる。requestsのような同期ライブラリをasync関数の中で使うと、I/O待ちの間もevent loopがブロックされて非同期の恩恵がなくなる。FastAPIで外部APIを叩くならhttpxを使うのが正解。


まとめ

  • async defでコルーチンを定義、awaitで実行する
  • event loopが1つのスレッドで複数のコルーチンを切り替えて実行する
  • asyncio.gather()で複数のコルーチンを同時実行できる
  • 非同期が効くのはI/Oバウンド処理。CPU処理はProcessPoolExecutorを使う
  • 同期ライブラリをasync関数内で使うとevent loopをブロックする

PHPの感覚で「async/awaitは書き方が変わるだけ」と思っていたが、event loopとコルーチンの概念から理解しないと詰まる。FastAPIを使うだけならasync defawaitの書き方を覚えれば動くが、なぜ動くのかを理解するとデバッグが格段に楽になった。

0
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?