1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

LangChain×FastAPI×Pandas - LLMの質問応答をCSVで入出力する -

1
Posted at

はじめに

本記事では、LLM(大規模言語モデル)にcsvファイルの内容を読み込ませて、質問に答えさせる方法を解説します。

なぜcsvで読み込ませたいのか

業務データや公開データなど、csv形式で管理されている情報が多そうなので、これらのデータをLLMに読み込ませて、自然言語で質問・分析できれば、業務効率化や新たな知見の発見につながるんじゃない?と思い、この記事を書くことにしました。

今回使った技術・環境

  • Python 3.13.5
  • FastAPI 0.127.0
  • Pandas 2.3.3
  • LangChain 1.1.0

FastAPIとは

FastAPIは、Python製の軽量Webフレームワークです。APIの構築が非常に簡単で、非同期処理や型ヒントにも対応しています。今回はできるだけ簡単にAPIの構築をしたかったのでFastAPIを選びました。

Pandasとは

Pandasは、Pythonのデータ分析ライブラリです。csvファイルの読み込みやデータ加工が得意で、データサイエンス分野で広く使われています。PandasからAPIを呼び出すようにしました。

実装手順

  1. FastAPIでAPIサーバーを作成

  2. csvファイルをアップロードしてpandasで読み込み

  3. LangChainの天気AIエージェントにデータを渡して質問回答

全体コード

1. 天気AIエージェントの実装(LangChain)

過去記事参照

from langgraph.checkpoint.memory import InMemorySaver
from langgraph.graph import StateGraph, MessagesState, START, END
from langchain_ollama import ChatOllama
from langchain_core.messages import SystemMessage
from langgraph.prebuilt import ToolNode
from langchain_core.tools import tool
import requests
import os

@tool
def get_weather(city: str) -> str:
    """指定した都市の現在の天気と気温を取得します(OpenWeather API使用)。"""
    api_key = os.getenv("OPENWEATHER_API_KEY")
    if not api_key:
      return "OPENWEATHER_API_KEY が設定されていません。"
    
    city_map = {
        "東京": "Tokyo,JP",
        "大阪": "Osaka,JP",
        "名古屋": "Nagoya,JP",
        "札幌": "Sapporo,JP",
        "福岡": "Fukuoka,JP",
    }
    q = city_map.get(city, city)

    url = "https://api.openweathermap.org/data/2.5/weather"
    params = {
        "q": q,
        "appid": api_key,
        "units": "metric",
        "lang": "ja",
    }
    resp = requests.get(url, params=params, timeout=10)
    if resp.status_code != 200:
        return f"OpenWeather APIエラー: {resp.status_code} {resp.text[:200]}"

    data = resp.json()
    temp = data["main"]["temp"]
    desc = data["weather"][0]["description"]
    humidity = data["main"]["humidity"]
    return f"{city}の現在の天気: {desc}、気温: {temp}℃、湿度: {humidity}%"

tools = [get_weather]

# モデル
model = ChatOllama(model="ministral-3:3b", temperature=0)
model_with_tools = model.bind_tools(tools)

checkpointer = InMemorySaver()

def agent_node(state: MessagesState):
    system_msg = SystemMessage(content="""日本語で簡潔に答えてください。
- 現在の天気・気温: get_weather ツールを使用してください。""")
    
    messages = [system_msg] + state["messages"]
    response = model_with_tools.invoke(messages)
    
    if response.tool_calls:
        return {"messages": [response]}
    return {"messages": [response], "__end__": True}

# グラフ(同じ)
builder = StateGraph(MessagesState)
builder.add_node("agent", agent_node)
builder.add_node("tools", ToolNode(tools))
builder.add_edge(START, "agent")
builder.add_conditional_edges(
    "agent",
    lambda state: "tools" if state["messages"][-1].tool_calls else END,
    {"tools": "tools", END: END}
)
builder.add_edge("tools", "agent")
app = builder.compile(checkpointer=checkpointer)

2. APIサーバーの構築(FastAPI)

from fastapi import FastAPI
from pydantic import BaseModel
from langchainapp.langchain_sample04 import app

api = FastAPI()

class Query(BaseModel):
    message: str

@api.post("/chat")
def chat(query: Query):
    result = app.invoke(
        {"messages": [{"role": "user", "content": query.message}]},
        {"configurable": {"thread_id": "session1"}}
    )
    return {"response": result["messages"][-1].content}

3. CSVデータの一括質問処理(Pandas)

import pandas as pd
import requests

# CSVファイルを読み込む
df = pd.read_csv('input.csv')

def call_chat_api(prompt):
    url = "http://localhost:8000/chat"
    payload = {"message": prompt}
    print(f"送信内容: {payload}")  # 追加
    try:
        response = requests.post(url, json=payload, timeout=30)
        response.raise_for_status()
        return response.json().get("response", "").strip()
    except Exception as e:
        print(f"API呼び出しエラー: {e}")
        return None

# 各行の'text'カラムをAPIに送り、生成結果を'output'カラムに追加
df["output"] = df["text"].apply(lambda x: call_chat_api(x))

# 結果を新しいCSVに保存
df.to_csv("output.csv", index=False)

# データの最初の5行を表示
print(df.head())

動作確認

実際にAPIを起動し、csvファイルをアップロードして質問できることを確認します。

事前準備

  1. input.csv
id,text
1,今日の東京の天気は?
2,服装はどうすればいい?
  1. 仮想環境を有効化
$ source .venv/bin/activate
  1. FastAPIサーバーの起動
$ uvicorn langchainapp.main:api --reload

実行結果

$ python pandas_app_sample01.py
送信内容: {'message': '今日の東京の天気は?'}
送信内容: {'message': '服装はどうすればいい?'}
   id         text                                             output
0   1   今日の東京の天気は?                        東京は曇りがちで、気温は約9.4℃、湿度は54%です。
1   2  服装はどうすればいい?  **寒さ対策**で、長袖の上着やレッグウォーマー、手袋を着用してください。\n**湿度の高さ...
  • output.csv
id,text,output
1,今日の東京の天気は?,東京は曇りがちで、気温は約9.4℃、湿度は54%です。
2,服装はどうすればいい?,"**寒さ対策**で、長袖の上着やレッグウォーマー、手袋を着用してください。
**湿度の高さ**を考慮し、防水性のある服を選ぶと良いでしょう。"

output.csvに自動で出力されました!

コードのポイント

  • 天気AIエージェントの実装(LangChain)
    過去記事で紹介済みのため割愛

  • APIサーバーの構築(FastAPI)

    1. エンドポイントで質問を受け付ける

    • /chat エンドポイントでユーザーからの質問をPOSTで受け付けます。
    @api.post("/chat")
    def chat(query: Query):
        ...
    

    2. Pydanticでリクエストバリデーション

    • PydanticのBaseModelを使うことで、リクエストの型チェックやバリデーションを行います。簡単で嬉しい。
    class Query(BaseModel):
        message: str
    

    3. LangChainエージェントの呼び出し

    • app.invoke(...) でLangChainエージェントを呼び出し、応答を取得します。app.invokeをそのまま組み込めるの凄い。
    result = app.invoke(
        {"messages": [{"role": "user", "content": query.message}]},
        {"configurable": {"thread_id": "session1"}}
    )
    

    4. スレッドIDによるセッション管理

    • thread_idを指定することで、ユーザーごとの会話履歴を分離できます。
    {"configurable": {"thread_id": "session1"}}
    
  • CSVデータの一括質問処理(Pandas)

    1. pandasでcsvを読み込む

    • pd.read_csv('input.csv') でcsvファイルをDataFrameとして読み込みます。
    df = pd.read_csv('input.csv')
    

    2. 各行の'text'カラムをAPIに送信

    • 戻り値response.json().get("response", "").strip()applyで各行の'text'カラムをAPIに送り、結果を'output'カラムに格納します。
    def call_chat_api(prompt):
        url = "http://localhost:8000/chat"
        payload = {"message": prompt}
        try:
            response = requests.post(url, json=payload, timeout=30)
            response.raise_for_status()
            return response.json().get("response", "").strip()
        except Exception as e:
            print(f"API呼び出しエラー: {e}")
            return None
    
    df["output"] = df["text"].apply(lambda x: call_chat_api(x))
    

    3. エラー時もNoneを返して処理が止まらないように工夫

    • 例外発生時はNoneを返し、全体の処理が中断しないようにしています。
    except Exception as e:
        print(f"API呼び出しエラー: {e}")
        return None
    

    4. 結果を新しいcsvとして保存し可視化

    • 生成結果をoutput.csvとして保存し、データの再利用や可視化が容易です。
    • index=falseは行番号(index)をCSVファイルに出力しない」ためのオプション
    df.to_csv("output.csv", index=False)
    print(df.head())
    

おわりに(まとめ・今後の展望)

この記事でやったこと

  • LLMにcsvファイルの内容を質問できる仕組みを構築
  • FastAPIでAPIサーバーを作成し、外部から質問を受け付けるエンドポイントを実装
  • LangChainで天気AIエージェントを用意し、API経由で推論を実行
  • pandasでcsvデータを一括処理し、各行の質問をAPIに投げて自動で回答を取得
  • 取得した回答をoutput.csvとして保存し、データの再利用や可視化も実現

今回はcsvを読み込ませる単純なプログラムを書きました。今後の展望として、CSVベースのRAG構築や、AIによるデータ分析レポートの自動生成などの応用をしていきたい考えています。

1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?