はじめに
本記事では、LLM(大規模言語モデル)にcsvファイルの内容を読み込ませて、質問に答えさせる方法を解説します。
なぜcsvで読み込ませたいのか
業務データや公開データなど、csv形式で管理されている情報が多そうなので、これらのデータをLLMに読み込ませて、自然言語で質問・分析できれば、業務効率化や新たな知見の発見につながるんじゃない?と思い、この記事を書くことにしました。
今回使った技術・環境
- Python 3.13.5
- FastAPI 0.127.0
- Pandas 2.3.3
- LangChain 1.1.0
FastAPIとは
FastAPIは、Python製の軽量Webフレームワークです。APIの構築が非常に簡単で、非同期処理や型ヒントにも対応しています。今回はできるだけ簡単にAPIの構築をしたかったのでFastAPIを選びました。
Pandasとは
Pandasは、Pythonのデータ分析ライブラリです。csvファイルの読み込みやデータ加工が得意で、データサイエンス分野で広く使われています。PandasからAPIを呼び出すようにしました。
実装手順
-
FastAPIでAPIサーバーを作成
-
csvファイルをアップロードしてpandasで読み込み
-
LangChainの天気AIエージェントにデータを渡して質問回答
全体コード
1. 天気AIエージェントの実装(LangChain)
過去記事参照
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.graph import StateGraph, MessagesState, START, END
from langchain_ollama import ChatOllama
from langchain_core.messages import SystemMessage
from langgraph.prebuilt import ToolNode
from langchain_core.tools import tool
import requests
import os
@tool
def get_weather(city: str) -> str:
"""指定した都市の現在の天気と気温を取得します(OpenWeather API使用)。"""
api_key = os.getenv("OPENWEATHER_API_KEY")
if not api_key:
return "OPENWEATHER_API_KEY が設定されていません。"
city_map = {
"東京": "Tokyo,JP",
"大阪": "Osaka,JP",
"名古屋": "Nagoya,JP",
"札幌": "Sapporo,JP",
"福岡": "Fukuoka,JP",
}
q = city_map.get(city, city)
url = "https://api.openweathermap.org/data/2.5/weather"
params = {
"q": q,
"appid": api_key,
"units": "metric",
"lang": "ja",
}
resp = requests.get(url, params=params, timeout=10)
if resp.status_code != 200:
return f"OpenWeather APIエラー: {resp.status_code} {resp.text[:200]}"
data = resp.json()
temp = data["main"]["temp"]
desc = data["weather"][0]["description"]
humidity = data["main"]["humidity"]
return f"{city}の現在の天気: {desc}、気温: {temp}℃、湿度: {humidity}%"
tools = [get_weather]
# モデル
model = ChatOllama(model="ministral-3:3b", temperature=0)
model_with_tools = model.bind_tools(tools)
checkpointer = InMemorySaver()
def agent_node(state: MessagesState):
system_msg = SystemMessage(content="""日本語で簡潔に答えてください。
- 現在の天気・気温: get_weather ツールを使用してください。""")
messages = [system_msg] + state["messages"]
response = model_with_tools.invoke(messages)
if response.tool_calls:
return {"messages": [response]}
return {"messages": [response], "__end__": True}
# グラフ(同じ)
builder = StateGraph(MessagesState)
builder.add_node("agent", agent_node)
builder.add_node("tools", ToolNode(tools))
builder.add_edge(START, "agent")
builder.add_conditional_edges(
"agent",
lambda state: "tools" if state["messages"][-1].tool_calls else END,
{"tools": "tools", END: END}
)
builder.add_edge("tools", "agent")
app = builder.compile(checkpointer=checkpointer)
2. APIサーバーの構築(FastAPI)
from fastapi import FastAPI
from pydantic import BaseModel
from langchainapp.langchain_sample04 import app
api = FastAPI()
class Query(BaseModel):
message: str
@api.post("/chat")
def chat(query: Query):
result = app.invoke(
{"messages": [{"role": "user", "content": query.message}]},
{"configurable": {"thread_id": "session1"}}
)
return {"response": result["messages"][-1].content}
3. CSVデータの一括質問処理(Pandas)
import pandas as pd
import requests
# CSVファイルを読み込む
df = pd.read_csv('input.csv')
def call_chat_api(prompt):
url = "http://localhost:8000/chat"
payload = {"message": prompt}
print(f"送信内容: {payload}") # 追加
try:
response = requests.post(url, json=payload, timeout=30)
response.raise_for_status()
return response.json().get("response", "").strip()
except Exception as e:
print(f"API呼び出しエラー: {e}")
return None
# 各行の'text'カラムをAPIに送り、生成結果を'output'カラムに追加
df["output"] = df["text"].apply(lambda x: call_chat_api(x))
# 結果を新しいCSVに保存
df.to_csv("output.csv", index=False)
# データの最初の5行を表示
print(df.head())
動作確認
実際にAPIを起動し、csvファイルをアップロードして質問できることを確認します。
事前準備
- input.csv
id,text
1,今日の東京の天気は?
2,服装はどうすればいい?
- 仮想環境を有効化
$ source .venv/bin/activate
- FastAPIサーバーの起動
$ uvicorn langchainapp.main:api --reload
実行結果
$ python pandas_app_sample01.py
送信内容: {'message': '今日の東京の天気は?'}
送信内容: {'message': '服装はどうすればいい?'}
id text output
0 1 今日の東京の天気は? 東京は曇りがちで、気温は約9.4℃、湿度は54%です。
1 2 服装はどうすればいい? **寒さ対策**で、長袖の上着やレッグウォーマー、手袋を着用してください。\n**湿度の高さ...
- output.csv
id,text,output
1,今日の東京の天気は?,東京は曇りがちで、気温は約9.4℃、湿度は54%です。
2,服装はどうすればいい?,"**寒さ対策**で、長袖の上着やレッグウォーマー、手袋を着用してください。
**湿度の高さ**を考慮し、防水性のある服を選ぶと良いでしょう。"
output.csvに自動で出力されました!
コードのポイント
-
天気AIエージェントの実装(LangChain)
過去記事で紹介済みのため割愛 -
APIサーバーの構築(FastAPI)
1. エンドポイントで質問を受け付ける
-
/chatエンドポイントでユーザーからの質問をPOSTで受け付けます。
@api.post("/chat") def chat(query: Query): ...2. Pydanticでリクエストバリデーション
- Pydanticの
BaseModelを使うことで、リクエストの型チェックやバリデーションを行います。簡単で嬉しい。
class Query(BaseModel): message: str3. LangChainエージェントの呼び出し
-
app.invoke(...)でLangChainエージェントを呼び出し、応答を取得します。app.invokeをそのまま組み込めるの凄い。
result = app.invoke( {"messages": [{"role": "user", "content": query.message}]}, {"configurable": {"thread_id": "session1"}} )4. スレッドIDによるセッション管理
-
thread_idを指定することで、ユーザーごとの会話履歴を分離できます。
{"configurable": {"thread_id": "session1"}} -
-
CSVデータの一括質問処理(Pandas)
1. pandasでcsvを読み込む
-
pd.read_csv('input.csv')でcsvファイルをDataFrameとして読み込みます。
df = pd.read_csv('input.csv')2. 各行の'text'カラムをAPIに送信
- 戻り値
response.json().get("response", "").strip()をapplyで各行の'text'カラムをAPIに送り、結果を'output'カラムに格納します。
def call_chat_api(prompt): url = "http://localhost:8000/chat" payload = {"message": prompt} try: response = requests.post(url, json=payload, timeout=30) response.raise_for_status() return response.json().get("response", "").strip() except Exception as e: print(f"API呼び出しエラー: {e}") return None df["output"] = df["text"].apply(lambda x: call_chat_api(x))3. エラー時もNoneを返して処理が止まらないように工夫
- 例外発生時は
Noneを返し、全体の処理が中断しないようにしています。
except Exception as e: print(f"API呼び出しエラー: {e}") return None4. 結果を新しいcsvとして保存し可視化
- 生成結果を
output.csvとして保存し、データの再利用や可視化が容易です。 -
index=falseは行番号(index)をCSVファイルに出力しない」ためのオプション
df.to_csv("output.csv", index=False) print(df.head()) -
おわりに(まとめ・今後の展望)
この記事でやったこと
- LLMにcsvファイルの内容を質問できる仕組みを構築
- FastAPIでAPIサーバーを作成し、外部から質問を受け付けるエンドポイントを実装
- LangChainで天気AIエージェントを用意し、API経由で推論を実行
- pandasでcsvデータを一括処理し、各行の質問をAPIに投げて自動で回答を取得
- 取得した回答をoutput.csvとして保存し、データの再利用や可視化も実現
今回はcsvを読み込ませる単純なプログラムを書きました。今後の展望として、CSVベースのRAG構築や、AIによるデータ分析レポートの自動生成などの応用をしていきたい考えています。