9
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

はじめに

async/await、なんとなく使ってたけど、awaitつけ忘れで2時間溶かした経験がある。

同じミスした人、結構いるんじゃない?

この記事では、asyncioの基本から落とし穴まで、実際に動かしながら解説するよ。

なぜ非同期が必要?

同期版(遅い)

import time

def fetch_data_sync(url: str, delay: float) -> str:
    print(f"Fetching {url}...")
    time.sleep(delay)  # ブロッキング!
    return f"Data from {url}"

def main_sync():
    start = time.time()
    result1 = fetch_data_sync("api/users", 1.0)
    result2 = fetch_data_sync("api/posts", 1.0)
    result3 = fetch_data_sync("api/comments", 1.0)
    print(f"Total: {time.time() - start:.2f}")
Fetching api/users...
Fetching api/posts...
Fetching api/comments...
Total: 3.00秒  ← 1秒 × 3 = 3秒

非同期版(速い)

import asyncio

async def fetch_data_async(url: str, delay: float) -> str:
    print(f"Fetching {url}...")
    await asyncio.sleep(delay)  # ノンブロッキング!
    return f"Data from {url}"

async def main_async():
    start = time.time()
    results = await asyncio.gather(
        fetch_data_async("api/users", 1.0),
        fetch_data_async("api/posts", 1.0),
        fetch_data_async("api/comments", 1.0),
    )
    print(f"Total: {time.time() - start:.2f}")

asyncio.run(main_async())
Fetching api/users...
Fetching api/posts...
Fetching api/comments...
Total: 1.00秒  ← 並行実行で1秒!

3秒 → 1秒。これが非同期の威力。


😱 awaitつけ忘れバグ

これが私が2時間溶かした原因。

async def get_user(user_id: int) -> dict:
    await asyncio.sleep(0.1)
    return {"id": user_id, "name": f"User{user_id}"}

async def main():
    # 正しい
    user = await get_user(1)
    print(user)  # => {'id': 1, 'name': 'User1'}
    
    # バグ!! awaitがない!
    user_wrong = get_user(2)
    print(user_wrong)  # => <coroutine object get_user at 0x...>

なぜ気づきにくい?

# このif文、常にTrue!
if get_user(3):  # awaitがない!でもコルーチンはtruthy
    print("通っちゃう")

# リスト内包表記でもやりがち
users = [get_user(i) for i in range(3)]  # コルーチンのリスト

警告を見逃すな

RuntimeWarning: coroutine 'get_user' was never awaited

この警告が出たら即座にawaitを探せ


基本パターン

asyncio.gather - 全部まとめて待つ

async def main():
    results = await asyncio.gather(
        task("A", 1),
        task("B", 2),
        task("C", 1),
    )
    print(results)  # => ['Result-A', 'Result-B', 'Result-C']
  • 全タスクが完了するまで待つ
  • 結果は渡した順で返る
  • 1つ失敗すると全部失敗(デフォルト)

asyncio.create_task - バックグラウンド実行

async def main():
    task1 = asyncio.create_task(slow_operation())
    
    # 他の処理を先にやる
    print("doing other stuff...")
    await asyncio.sleep(0.5)
    
    # 後でタスクの結果を取得
    result = await task1

asyncio.timeout - タイムアウト(Python 3.11+)

async def main():
    try:
        async with asyncio.timeout(1.0):
            await very_slow_task()  # 5秒かかる
    except asyncio.TimeoutError:
        print("タイムアウト!")

asyncio.wait_for - タイムアウト(旧式)

async def main():
    try:
        result = await asyncio.wait_for(slow_task(), timeout=1.0)
    except asyncio.TimeoutError:
        print("タイムアウト!")

実践パターン

Semaphore - 同時実行数制限

API呼び出しで同時接続数を制限したい時:

async def fetch_with_limit(semaphore: asyncio.Semaphore, url: str):
    async with semaphore:  # ここで待機する
        return await fetch(url)

async def main():
    semaphore = asyncio.Semaphore(5)  # 同時5つまで
    
    urls = [f"api/item/{i}" for i in range(100)]
    results = await asyncio.gather(*[
        fetch_with_limit(semaphore, url) for url in urls
    ])

Queue - プロデューサー・コンシューマー

async def producer(queue: asyncio.Queue):
    for item in items:
        await queue.put(item)
    await queue.put(None)  # 終了シグナル

async def consumer(queue: asyncio.Queue):
    while True:
        item = await queue.get()
        if item is None:
            break
        await process(item)
        queue.task_done()

async def main():
    queue = asyncio.Queue()
    await asyncio.gather(
        producer(queue),
        consumer(queue),
    )

よくある落とし穴

1. 同期関数の中でawait

def sync_function():
    await async_operation()  # SyntaxError!

asyncは伝染する。呼び出し元もasyncにする必要がある。

2. time.sleepを使っちゃう

async def bad_sleep():
    time.sleep(1)  # ブロッキング!他のタスクが動けない
async def good_sleep():
    await asyncio.sleep(1)  # ノンブロッキング

3. asyncio.runを何度も呼ぶ

# ダメ
for i in range(10):
    asyncio.run(task(i))  # 毎回イベントループ作り直し

# 良い
async def main():
    await asyncio.gather(*[task(i) for i in range(10)])

asyncio.run(main())  # 1回だけ

4. 例外処理を忘れる

async def main():
    results = await asyncio.gather(
        task1(),
        task2(),  # ここで例外
        task3(),
        return_exceptions=True  # 例外もresultsに入る
    )
    for r in results:
        if isinstance(r, Exception):
            print(f"エラー: {r}")

aiohttpで実際のHTTPリクエスト

import aiohttp
import asyncio

async def fetch_url(session: aiohttp.ClientSession, url: str) -> str:
    async with session.get(url) as response:
        return await response.text()

async def main():
    urls = [
        "https://httpbin.org/get",
        "https://httpbin.org/ip",
        "https://httpbin.org/headers",
    ]
    
    async with aiohttp.ClientSession() as session:
        results = await asyncio.gather(*[
            fetch_url(session, url) for url in urls
        ])
    
    for url, result in zip(urls, results):
        print(f"{url}: {len(result)} bytes")

asyncio.run(main())

まとめ

ポイント 説明
async def コルーチン関数を定義
await コルーチンを実行して結果を待つ
asyncio.gather() 複数タスクを並行実行
asyncio.create_task() バックグラウンドでタスク開始
asyncio.run() イベントループを開始(1回だけ!)

awaitつけ忘れチェックリスト

  • RuntimeWarning: coroutine was never awaited が出てない?
  • 関数呼び出しの結果が <coroutine object ...> になってない?
  • if文の条件に直接コルーチンを入れてない?
# これを見たら要チェック
result = some_async_function()  # awaitある?
if some_async_function():       # awaitある?
[x() for x in async_funcs]     # awaitある?
9
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
9
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?