0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

LangChain × FastAPI でstreaming処理を簡単に実装する

Last updated at Posted at 2025-12-10

こんにちは。新卒でエンジニアをやっているkoachanです。
今日はLangChainとFastAPIを使って、レスポンスをstreamingで返してくれるagentを簡単に作りたいと思います!(解説は少なめです🙇)

環境構築

パッケージ管理には uv を使います。以下のコマンドを実行してください。

mkdir langchain-fastapi-streaming
cd langchain-fastapi-streaming
uv init

必要なパッケージを追加します。

uv add fastapi uvicorn langchain langchain-aws

実装

次にmain.pyを編集します。
今回は Bedrock の haiku-4.5を使用しており、レスポンスは基本的に list 形式で返ってきます。念のためコード側では str 形式も扱えるように実装していますが、動作確認はできていません。モデルに合わせて適宜調整してください。

main.py
import json
from typing import AsyncIterator, Dict, Any

from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
from langchain_aws import ChatBedrock
from langchain.agents import create_agent

app = FastAPI()

# AWS Bedrock LLMの設定
llm = ChatBedrock(
    model_id="jp.anthropic.claude-haiku-4-5-20251001-v1:0",
    region_name="ap-northeast-1",
    streaming=True,
)

# エージェントの作成(ツールなし)
agent = create_agent(
    model=llm,
    tools=[],
)


class ChatRequest(BaseModel):
    message: str


async def generate_stream(message: str) -> AsyncIterator[Dict[str, Any]]:
    """エージェントからのレスポンスをストリーミングで返すジェネレーター"""
    print(f"\n{'='*50}")
    
    try:
        async for token, metadata in agent.astream(
            {"messages": [{"role": "user", "content": message}]},
            stream_mode="messages",
        ):
            node = metadata.get("langgraph_node", "")
            
            if node == "model":
                # テキスト生成のストリーミング
                if isinstance(token.content, str) and token.content:
                    print(token.content, end="", flush=True)
                    yield {
                        "type": "token",
                        "content": token.content,
                    }
                elif isinstance(token.content, list):
                    for block in token.content:
                        if isinstance(block, dict) and block.get("type") == "text":
                            text = block.get("text", "")
                            if text:
                                print(text, end="", flush=True)
                                yield {
                                    "type": "token",
                                    "content": text,
                                }
        
        print(f"\n{'='*50}\n")
        yield {"type": "done", "status": "success"}
    
    except Exception as e:
        print(f"\n❌ エラー: {e}")
        yield {"type": "error", "message": str(e), "status": "error"}

@app.post("/chat/stream")
async def chat_stream(request: ChatRequest):
    """ストリーミングレスポンスを返すエンドポイント"""
    async def generate():
        async for chunk in generate_stream(request.message):
            yield f"data: {json.dumps(chunk, ensure_ascii=False)}\n\n"
    
    return StreamingResponse(
        generate(),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "Connection": "keep-alive",
            "X-Accel-Buffering": "no",
        }
    )


@app.get("/health")
async def health():
    """ヘルスチェック用エンドポイント"""
    return {"status": "ok"}

では早速動かしてみましょう!!
Bedrockを使用しているので、AWSアカウントにログインし、profileを設定するのを忘れないようにしてください。また、Haiku-4.5の有効化も忘れずに。

aws sso login --profile your-profile
export AWS_PROFILE=your-profile

それでは、サーバーを起動します。

uv run uvicorn main:app

次に別のターミナルからなんでもいいので質問を投げてみましょう。

curl -X POST http://localhost:8000/chat/stream \
  -H "Content-Type: application/json" \
  -d '{"message": "日本の観光地を教えて"}' \
  --no-buffer

以下のようにストリーミングで処理されたかと思います!
画面収録

次に、簡易的なtoolを持たせてみましょう。main.py を以下のように書き換えます。

main.py
import json
from typing import AsyncIterator, Dict, Any

from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
from langchain_aws import ChatBedrock
from langchain_core.tools import tool
from langchain.agents import create_agent

app = FastAPI()

# AWS Bedrock LLMの設定
llm = ChatBedrock(
    model_id="jp.anthropic.claude-haiku-4-5-20251001-v1:0",
    region_name="ap-northeast-1",
    streaming=True,
)

@tool
def get_weather(city: str) -> str:
    """指定された都市の天気を取得します。"""
    weather_data = {
        "東京": "晴れ、気温22℃",
        "大阪": "曇り、気温20℃",
        "福岡": "雨、気温18℃",
        "北海道": "雪、気温-2℃",
    }
    return weather_data.get(city, f"{city}の天気情報は見つかりませんでした。")


@tool
def calculate(expression: str) -> str:
    """数式を計算します。例: '2 + 3 * 4'"""
    try:
        allowed = set("0123456789+-*/.(). ")
        if not all(c in allowed for c in expression):
            return "エラー: 無効な文字が含まれています"
        result = eval(expression)
        return f"{expression} = {result}"
    except Exception as e:
        return f"計算エラー: {e}"


# エージェントの作成
agent = create_agent(
    model=llm,
    tools=[get_weather, calculate],
)


class ChatRequest(BaseModel):
    message: str


async def generate_stream(message: str) -> AsyncIterator[Dict[str, Any]]:
    """
    エージェントからのレスポンスをストリーミングで返すジェネレーター
    
    Yields:
        ストリーミングチャンク(type: token/tool_start/tool_result/done/error)
    """
    print(f"\n{'='*50}")
    
    try:
        async for token, metadata in agent.astream(
            {"messages": [{"role": "user", "content": message}]},
            stream_mode="messages",
        ):
            node = metadata.get("langgraph_node", "")
            
            # 1. ツール実行結果(toolsノード)の処理
            if node == "tools":
                print(f"🔧 ツール結果: {token.content}")
                yield {
                    "type": "tool_result",
                    "content": token.content,
                    "tool_name": getattr(token, "name", "unknown"),
                    "node": node,
                }
                continue
            
            # 2. LLM生成(modelノード)の処理
            if node == "model":
                # A. テキスト生成のストリーミング
                if isinstance(token.content, str) and token.content:
                    print(token.content, end="", flush=True)
                    yield {
                        "type": "token",
                        "content": token.content,
                        "node": node,
                    }
                elif isinstance(token.content, list):
                    for block in token.content:
                        if isinstance(block, dict) and block.get("type") == "text":
                            text = block.get("text", "")
                            if text:
                                print(text, end="", flush=True)
                                yield {
                                    "type": "token",
                                    "content": text,
                                    "node": node,
                                }
                
                # B. ツール呼び出しのストリーミング
                if hasattr(token, "tool_call_chunks") and token.tool_call_chunks:
                    for chunk in token.tool_call_chunks:
                        if chunk.get("name"):
                            print(f"\n🔧 ツール呼び出し: {chunk['name']}")
                            yield {
                                "type": "tool_start",
                                "name": chunk["name"],
                                "node": node,
                            }
        
        print(f"\n{'='*50}\n")
        yield {"type": "done", "status": "success"}
    
    except Exception as e:
        print(f"\n❌ エラー: {e}")
        yield {"type": "error", "message": str(e), "status": "error"}


@app.post("/chat/stream")
async def chat_stream(request: ChatRequest):
    """
    エージェントのストリーミングレスポンスを返すエンドポイント
    
    Server-Sent Events (SSE) 形式でレスポンスを返します。
    
    利用可能なツール:
    - get_weather: 都市の天気を取得
    - calculate: 数式を計算
    """
    async def generate():
        async for chunk in generate_stream(request.message):
            yield f"data: {json.dumps(chunk, ensure_ascii=False)}\n\n"
    
    return StreamingResponse(
        generate(),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "Connection": "keep-alive",
            "X-Accel-Buffering": "no",
        }
    )


@app.get("/health")
async def health():
    """ヘルスチェック用エンドポイント"""
    return {"status": "ok"}

書き換えが終わったら、サーバーを立ち上げ直して、以下のような質問を投げてみましょう。

uv run uvicorn main:app
curl -X POST http://localhost:8000/chat/stream \
  -H "Content-Type: application/json" \
  -d '{"message": "北海道の天気を教えて。その天気に合った観光地を沢山教えて。"}' \
  --no-buffer

すると、北海道の天気を取得し、その天気に合った観光地を教えてくれるagentが動くはずです。
画面収録

今回は簡易的に固定の天気を渡していますが、実際の天気予報を取得するtoolを作れば、それだけでかなり実用的なagentになります。今回作ったものは以下のような構成になっています。

まとめ

今回は簡単ですがlangchainを使ったagentを構築してみました。皆さんも良いagent lifeを。

おまけ

フロントエンドは AI SDK UIと無理やり組み合わせて実装すると、かなりいい感じの表示ができます。
(以下は、指定した場所の天気を考慮して最適な観光地を提案するエージェント)

画面収録 2025-12-10 15.10.09.gif

参考サイト

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?