こんにちは。新卒でエンジニアをやっているkoachanです。
今日はLangChainとFastAPIを使って、レスポンスをstreamingで返してくれるagentを簡単に作りたいと思います!(解説は少なめです🙇)
環境構築
パッケージ管理には uv を使います。以下のコマンドを実行してください。
mkdir langchain-fastapi-streaming
cd langchain-fastapi-streaming
uv init
必要なパッケージを追加します。
uv add fastapi uvicorn langchain langchain-aws
実装
次にmain.pyを編集します。
今回は Bedrock の haiku-4.5を使用しており、レスポンスは基本的に list 形式で返ってきます。念のためコード側では str 形式も扱えるように実装していますが、動作確認はできていません。モデルに合わせて適宜調整してください。
import json
from typing import AsyncIterator, Dict, Any
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
from langchain_aws import ChatBedrock
from langchain.agents import create_agent
app = FastAPI()
# AWS Bedrock LLMの設定
llm = ChatBedrock(
model_id="jp.anthropic.claude-haiku-4-5-20251001-v1:0",
region_name="ap-northeast-1",
streaming=True,
)
# エージェントの作成(ツールなし)
agent = create_agent(
model=llm,
tools=[],
)
class ChatRequest(BaseModel):
message: str
async def generate_stream(message: str) -> AsyncIterator[Dict[str, Any]]:
"""エージェントからのレスポンスをストリーミングで返すジェネレーター"""
print(f"\n{'='*50}")
try:
async for token, metadata in agent.astream(
{"messages": [{"role": "user", "content": message}]},
stream_mode="messages",
):
node = metadata.get("langgraph_node", "")
if node == "model":
# テキスト生成のストリーミング
if isinstance(token.content, str) and token.content:
print(token.content, end="", flush=True)
yield {
"type": "token",
"content": token.content,
}
elif isinstance(token.content, list):
for block in token.content:
if isinstance(block, dict) and block.get("type") == "text":
text = block.get("text", "")
if text:
print(text, end="", flush=True)
yield {
"type": "token",
"content": text,
}
print(f"\n{'='*50}\n")
yield {"type": "done", "status": "success"}
except Exception as e:
print(f"\n❌ エラー: {e}")
yield {"type": "error", "message": str(e), "status": "error"}
@app.post("/chat/stream")
async def chat_stream(request: ChatRequest):
"""ストリーミングレスポンスを返すエンドポイント"""
async def generate():
async for chunk in generate_stream(request.message):
yield f"data: {json.dumps(chunk, ensure_ascii=False)}\n\n"
return StreamingResponse(
generate(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no",
}
)
@app.get("/health")
async def health():
"""ヘルスチェック用エンドポイント"""
return {"status": "ok"}
では早速動かしてみましょう!!
Bedrockを使用しているので、AWSアカウントにログインし、profileを設定するのを忘れないようにしてください。また、Haiku-4.5の有効化も忘れずに。
aws sso login --profile your-profile
export AWS_PROFILE=your-profile
それでは、サーバーを起動します。
uv run uvicorn main:app
次に別のターミナルからなんでもいいので質問を投げてみましょう。
curl -X POST http://localhost:8000/chat/stream \
-H "Content-Type: application/json" \
-d '{"message": "日本の観光地を教えて"}' \
--no-buffer
次に、簡易的なtoolを持たせてみましょう。main.py を以下のように書き換えます。
import json
from typing import AsyncIterator, Dict, Any
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
from langchain_aws import ChatBedrock
from langchain_core.tools import tool
from langchain.agents import create_agent
app = FastAPI()
# AWS Bedrock LLMの設定
llm = ChatBedrock(
model_id="jp.anthropic.claude-haiku-4-5-20251001-v1:0",
region_name="ap-northeast-1",
streaming=True,
)
@tool
def get_weather(city: str) -> str:
"""指定された都市の天気を取得します。"""
weather_data = {
"東京": "晴れ、気温22℃",
"大阪": "曇り、気温20℃",
"福岡": "雨、気温18℃",
"北海道": "雪、気温-2℃",
}
return weather_data.get(city, f"{city}の天気情報は見つかりませんでした。")
@tool
def calculate(expression: str) -> str:
"""数式を計算します。例: '2 + 3 * 4'"""
try:
allowed = set("0123456789+-*/.(). ")
if not all(c in allowed for c in expression):
return "エラー: 無効な文字が含まれています"
result = eval(expression)
return f"{expression} = {result}"
except Exception as e:
return f"計算エラー: {e}"
# エージェントの作成
agent = create_agent(
model=llm,
tools=[get_weather, calculate],
)
class ChatRequest(BaseModel):
message: str
async def generate_stream(message: str) -> AsyncIterator[Dict[str, Any]]:
"""
エージェントからのレスポンスをストリーミングで返すジェネレーター
Yields:
ストリーミングチャンク(type: token/tool_start/tool_result/done/error)
"""
print(f"\n{'='*50}")
try:
async for token, metadata in agent.astream(
{"messages": [{"role": "user", "content": message}]},
stream_mode="messages",
):
node = metadata.get("langgraph_node", "")
# 1. ツール実行結果(toolsノード)の処理
if node == "tools":
print(f"🔧 ツール結果: {token.content}")
yield {
"type": "tool_result",
"content": token.content,
"tool_name": getattr(token, "name", "unknown"),
"node": node,
}
continue
# 2. LLM生成(modelノード)の処理
if node == "model":
# A. テキスト生成のストリーミング
if isinstance(token.content, str) and token.content:
print(token.content, end="", flush=True)
yield {
"type": "token",
"content": token.content,
"node": node,
}
elif isinstance(token.content, list):
for block in token.content:
if isinstance(block, dict) and block.get("type") == "text":
text = block.get("text", "")
if text:
print(text, end="", flush=True)
yield {
"type": "token",
"content": text,
"node": node,
}
# B. ツール呼び出しのストリーミング
if hasattr(token, "tool_call_chunks") and token.tool_call_chunks:
for chunk in token.tool_call_chunks:
if chunk.get("name"):
print(f"\n🔧 ツール呼び出し: {chunk['name']}")
yield {
"type": "tool_start",
"name": chunk["name"],
"node": node,
}
print(f"\n{'='*50}\n")
yield {"type": "done", "status": "success"}
except Exception as e:
print(f"\n❌ エラー: {e}")
yield {"type": "error", "message": str(e), "status": "error"}
@app.post("/chat/stream")
async def chat_stream(request: ChatRequest):
"""
エージェントのストリーミングレスポンスを返すエンドポイント
Server-Sent Events (SSE) 形式でレスポンスを返します。
利用可能なツール:
- get_weather: 都市の天気を取得
- calculate: 数式を計算
"""
async def generate():
async for chunk in generate_stream(request.message):
yield f"data: {json.dumps(chunk, ensure_ascii=False)}\n\n"
return StreamingResponse(
generate(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no",
}
)
@app.get("/health")
async def health():
"""ヘルスチェック用エンドポイント"""
return {"status": "ok"}
書き換えが終わったら、サーバーを立ち上げ直して、以下のような質問を投げてみましょう。
uv run uvicorn main:app
curl -X POST http://localhost:8000/chat/stream \
-H "Content-Type: application/json" \
-d '{"message": "北海道の天気を教えて。その天気に合った観光地を沢山教えて。"}' \
--no-buffer
すると、北海道の天気を取得し、その天気に合った観光地を教えてくれるagentが動くはずです。

今回は簡易的に固定の天気を渡していますが、実際の天気予報を取得するtoolを作れば、それだけでかなり実用的なagentになります。今回作ったものは以下のような構成になっています。
まとめ
今回は簡単ですがlangchainを使ったagentを構築してみました。皆さんも良いagent lifeを。
おまけ
フロントエンドは AI SDK UIと無理やり組み合わせて実装すると、かなりいい感じの表示ができます。
(以下は、指定した場所の天気を考慮して最適な観光地を提案するエージェント)
参考サイト

