はじめに
async/await、なんとなく使ってたけど、awaitつけ忘れで2時間溶かした経験がある。
同じミスした人、結構いるんじゃない?
この記事では、asyncioの基本から落とし穴まで、実際に動かしながら解説するよ。
なぜ非同期が必要?
同期版(遅い)
import time
def fetch_data_sync(url: str, delay: float) -> str:
print(f"Fetching {url}...")
time.sleep(delay) # ブロッキング!
return f"Data from {url}"
def main_sync():
start = time.time()
result1 = fetch_data_sync("api/users", 1.0)
result2 = fetch_data_sync("api/posts", 1.0)
result3 = fetch_data_sync("api/comments", 1.0)
print(f"Total: {time.time() - start:.2f}秒")
Fetching api/users...
Fetching api/posts...
Fetching api/comments...
Total: 3.00秒 ← 1秒 × 3 = 3秒
非同期版(速い)
import asyncio
async def fetch_data_async(url: str, delay: float) -> str:
print(f"Fetching {url}...")
await asyncio.sleep(delay) # ノンブロッキング!
return f"Data from {url}"
async def main_async():
start = time.time()
results = await asyncio.gather(
fetch_data_async("api/users", 1.0),
fetch_data_async("api/posts", 1.0),
fetch_data_async("api/comments", 1.0),
)
print(f"Total: {time.time() - start:.2f}秒")
asyncio.run(main_async())
Fetching api/users...
Fetching api/posts...
Fetching api/comments...
Total: 1.00秒 ← 並行実行で1秒!
3秒 → 1秒。これが非同期の威力。
😱 awaitつけ忘れバグ
これが私が2時間溶かした原因。
async def get_user(user_id: int) -> dict:
await asyncio.sleep(0.1)
return {"id": user_id, "name": f"User{user_id}"}
async def main():
# 正しい
user = await get_user(1)
print(user) # => {'id': 1, 'name': 'User1'}
# バグ!! awaitがない!
user_wrong = get_user(2)
print(user_wrong) # => <coroutine object get_user at 0x...>
なぜ気づきにくい?
# このif文、常にTrue!
if get_user(3): # awaitがない!でもコルーチンはtruthy
print("通っちゃう")
# リスト内包表記でもやりがち
users = [get_user(i) for i in range(3)] # コルーチンのリスト
警告を見逃すな
RuntimeWarning: coroutine 'get_user' was never awaited
この警告が出たら即座にawaitを探せ。
基本パターン
asyncio.gather - 全部まとめて待つ
async def main():
results = await asyncio.gather(
task("A", 1),
task("B", 2),
task("C", 1),
)
print(results) # => ['Result-A', 'Result-B', 'Result-C']
- 全タスクが完了するまで待つ
- 結果は渡した順で返る
- 1つ失敗すると全部失敗(デフォルト)
asyncio.create_task - バックグラウンド実行
async def main():
task1 = asyncio.create_task(slow_operation())
# 他の処理を先にやる
print("doing other stuff...")
await asyncio.sleep(0.5)
# 後でタスクの結果を取得
result = await task1
asyncio.timeout - タイムアウト(Python 3.11+)
async def main():
try:
async with asyncio.timeout(1.0):
await very_slow_task() # 5秒かかる
except asyncio.TimeoutError:
print("タイムアウト!")
asyncio.wait_for - タイムアウト(旧式)
async def main():
try:
result = await asyncio.wait_for(slow_task(), timeout=1.0)
except asyncio.TimeoutError:
print("タイムアウト!")
実践パターン
Semaphore - 同時実行数制限
API呼び出しで同時接続数を制限したい時:
async def fetch_with_limit(semaphore: asyncio.Semaphore, url: str):
async with semaphore: # ここで待機する
return await fetch(url)
async def main():
semaphore = asyncio.Semaphore(5) # 同時5つまで
urls = [f"api/item/{i}" for i in range(100)]
results = await asyncio.gather(*[
fetch_with_limit(semaphore, url) for url in urls
])
Queue - プロデューサー・コンシューマー
async def producer(queue: asyncio.Queue):
for item in items:
await queue.put(item)
await queue.put(None) # 終了シグナル
async def consumer(queue: asyncio.Queue):
while True:
item = await queue.get()
if item is None:
break
await process(item)
queue.task_done()
async def main():
queue = asyncio.Queue()
await asyncio.gather(
producer(queue),
consumer(queue),
)
よくある落とし穴
1. 同期関数の中でawait
def sync_function():
await async_operation() # SyntaxError!
asyncは伝染する。呼び出し元もasyncにする必要がある。
2. time.sleepを使っちゃう
async def bad_sleep():
time.sleep(1) # ブロッキング!他のタスクが動けない
async def good_sleep():
await asyncio.sleep(1) # ノンブロッキング
3. asyncio.runを何度も呼ぶ
# ダメ
for i in range(10):
asyncio.run(task(i)) # 毎回イベントループ作り直し
# 良い
async def main():
await asyncio.gather(*[task(i) for i in range(10)])
asyncio.run(main()) # 1回だけ
4. 例外処理を忘れる
async def main():
results = await asyncio.gather(
task1(),
task2(), # ここで例外
task3(),
return_exceptions=True # 例外もresultsに入る
)
for r in results:
if isinstance(r, Exception):
print(f"エラー: {r}")
aiohttpで実際のHTTPリクエスト
import aiohttp
import asyncio
async def fetch_url(session: aiohttp.ClientSession, url: str) -> str:
async with session.get(url) as response:
return await response.text()
async def main():
urls = [
"https://httpbin.org/get",
"https://httpbin.org/ip",
"https://httpbin.org/headers",
]
async with aiohttp.ClientSession() as session:
results = await asyncio.gather(*[
fetch_url(session, url) for url in urls
])
for url, result in zip(urls, results):
print(f"{url}: {len(result)} bytes")
asyncio.run(main())
まとめ
| ポイント | 説明 |
|---|---|
async def |
コルーチン関数を定義 |
await |
コルーチンを実行して結果を待つ |
asyncio.gather() |
複数タスクを並行実行 |
asyncio.create_task() |
バックグラウンドでタスク開始 |
asyncio.run() |
イベントループを開始(1回だけ!) |
awaitつけ忘れチェックリスト
-
RuntimeWarning: coroutine was never awaitedが出てない? -
関数呼び出しの結果が
<coroutine object ...>になってない? - if文の条件に直接コルーチンを入れてない?
# これを見たら要チェック
result = some_async_function() # awaitある?
if some_async_function(): # awaitある?
[x() for x in async_funcs] # awaitある?