LLMの応答をリアルタイムに画面へ流すにはSSE (Server-Sent Events) を使うのが簡単だ。FastAPIはSSEと相性がよく、数十行でストリーミングAPIが書ける。実運用で必要な中断対応とエラー処理まで整理する。
1. 最小構成
StreamingResponseにasync generatorを渡すだけだ。SSEは「data:で始まる行の連続、空行でメッセージ区切り」という単純な仕様で、これだけで成立する。
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from openai import OpenAI
app = FastAPI()
client = OpenAI()
async def stream_chat(prompt: str):
stream = client.chat.completions.create(
model="gpt-4o-mini", stream=True,
messages=[{"role": "user", "content": prompt}],
)
for chunk in stream:
delta = chunk.choices[0].delta.content or ""
if delta:
yield f"data: {delta}\n\n"
@app.get("/chat")
async def chat(q: str):
return StreamingResponse(stream_chat(q), media_type="text/event-stream")
2. JSONを流すときの注意
改行を含むJSONは空行(メッセージ区切り)とぶつかる。1トークンずつjson.dumpsしてからdata: {payload}\n\nの形で送るのが安全だ。改行を含む構造化メッセージを送りたい場合は必ずJSONを1行に収める。
3. 切断検知と中断
タブを閉じられたとき生成を止めないとトークンを無駄に消費する。ハンドラでRequestを受け取り、ループ内でif await request.is_disconnected(): stream.close(); breakと書く。OpenAI SDKのstreamオブジェクトはclose()で接続を切れる。このひと手間でコストが大きく変わるため、LLM APIを叩く実装では必須の対応だ。
4. エラー処理
生成途中の例外はHTTPステータス200の後に起きるため、SSEイベントで通知するしかない。クライアント側はevent: errorを受けたら通知UIを出す。
try:
for chunk in stream:
yield f"data: {chunk.choices[0].delta.content or ''}\n\n"
except Exception as e:
yield f"event: error\ndata: {json.dumps({'error': str(e)})}\n\n"
5. プロキシ対策
Nginx経由だとバッファリングで詰まる。レスポンスヘッダにX-Accel-Buffering: noとCache-Control: no-cacheを付けてStreamingResponseを返すと解消する。
まとめ
実運用では「切断検知」「エラーをSSEで返す」「Nginxバッファ無効化」の3点で安定する。