最近のLLM(大規模言語モデル)の機能の発達により、LLMをプロジェクトに組み込んで、自然言語からSQLクエリを自動生成し、データベースの情報を対話形式で取得できるようになっています。
この記事では、LangChainを使って「質問に応じて自動でSQLを生成 → SQLを実行しデータベースから結果を取得 → 結果を自然言語で説明」してくれるデータベース会話システムを構築します。
このシステムに以下の機能が実現されています:
- SQLiteデータベース接続:ローカルファイルとして動作し、軽量・簡単に利用可能
- LLMによる質問分類:SQLが必要な質問かどうかを自動判定
- SQL生成+結果の自然言語説明:LangChainがSQLを自動生成し、結果を分かりやすく解説
- 通常チャット対応:雑談や挨拶にも自然に応答
- Redisでの履歴保存:セッションIDごとに過去の会話を記録・再利用
1.全体像
まず、この「データベース会話システム」の全体コードを見せます。
from dotenv import load_dotenv
from langchain.chains.sql_database.query import create_sql_query_chain
from langchain_community.utilities import SQLDatabase
from langchain.chat_models import init_chat_model
from langchain.prompts import PromptTemplate
from langchain_community.chat_message_histories import RedisChatMessageHistory
import sqlite3, pathlib
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_core.runnables import RunnableWithMessageHistory
load_dotenv()
# SQLiteは軽量なデータベース。ファイルとして存在し、サーバーのセットアップ不要
db_path = pathlib.Path("example.db")
need_init = not db_path.exists()
conn = sqlite3.connect(db_path)
if need_init:
cur = conn.cursor()
cur.execute("""
CREATE TABLE customers(
id INTEGER PRIMARY KEY,
name TEXT,
city TEXT
);""")
cur.execute("""
CREATE TABLE orders(
id INTEGER PRIMARY KEY,
customer_id INTEGER,
amount REAL,
created_at TEXT
);""")
cur.executemany("INSERT INTO customers(name, city) VALUES (?,?)",
[("Alice","Tokyo"),("Bob","Osaka"),("Carol","Tokyo")])
cur.executemany("INSERT INTO orders(customer_id, amount, created_at) VALUES (?,?,?)",
[(1,120.5,"2024-05-01"),(1,80.0,"2024-08-09"),
(2,50.0,"2024-07-10"),(3,200.0,"2024-09-01")])
conn.commit()
conn.close()
def generate_sql(chain, question):
return chain.invoke({"question": question})
# 安全対策:更新系SQLは禁止
forbidden_words = ["update", "delete", "insert"]
def check_sql(sql:str):
lowered = sql.lower()
if any(k in lowered for k in forbidden_words):
raise ValueError("データベースの内容を変更する操作は禁止です。")
if "limit" not in lowered:
sql += " LIMIT 10"
return sql
def clean_sql(generated_sql):
generated_sql_clean = generated_sql.strip()
if generated_sql_clean.startswith("SQLQuery:"):
generated_sql_clean = generated_sql_clean[len("SQLQuery:"):].strip()
elif generated_sql_clean.startswith("SQL:"):
generated_sql_clean = generated_sql_clean[len("SQL:"):].strip()
generated_sql_clean = check_sql(generated_sql_clean)
return generated_sql_clean
# Redisを使った履歴保存
def get_session_history_redis(session_id:str):
return RedisChatMessageHistory(session_id=session_id, url="redis://localhost:6379")
# 質問がデータベースに関するものかどうかを分類
def needs_database_query(llm, question: str) -> bool:
classifier_prompt = PromptTemplate.from_template(
"""あなたは分類アシスタントです。以下のユーザー質問がデータベースの問い合わせを必要とするか判断してください。
データベースには2つのテーブルがあります:
- customers:顧客情報(id, name, city)
- orders:注文情報(id, customer_id, amount, created_at)
顧客、注文、金額、統計などの質問なら "YES"
雑談や挨拶などデータベースが不要な内容なら "NO"
ユーザーの質問:{question}
回答(YESまたはNOのみ):"""
)
response = llm.invoke(classifier_prompt.format(question=question))
answer = response.content.strip().upper()
return "YES" in answer
def main():
db = SQLDatabase.from_uri(f"sqlite:///{db_path}", include_tables=["customers", "orders"])
llm = init_chat_model(
model="deepseek-chat",
model_provider="deepseek",
api_key="sk-xxxxx", # 個人のAPI_KEYを設定してください
base_url="https://api.deepseek.com/v1"
)
sql_chain = create_sql_query_chain(llm, db)
# SQL結果の説明テンプレート
answer_template = PromptTemplate.from_template(
"ユーザーの質問:{question}\nSQL文:{generated_sql}\n実行結果:{result_rows}\n"
"この結果を簡潔な日本語で説明し、重要な数値を指摘してください。\n"
"履歴内容:{history}"
)
sql_answer_chain = answer_template | llm
# 普通の対話用テンプレート
chat_prompt = ChatPromptTemplate([
("system", """あなたはデータベース問い合わせアシスタントです。
customersとordersに関する質問に回答してください。
過去の履歴に基づいた会話も可能です。
挨拶や雑談にも丁寧に対応してください。"""),
MessagesPlaceholder(variable_name="history"),
("user", "{question}")
])
chat_chain = chat_prompt | llm
# 2種類の会話チェーン
sql_conversational_chain = RunnableWithMessageHistory(
sql_answer_chain,
get_session_history_redis,
input_messages_key="question",
history_messages_key="history"
)
chat_conversational_chain = RunnableWithMessageHistory(
chat_chain,
get_session_history_redis,
input_messages_key="question",
history_messages_key="history"
)
sid = "bob"
while(True):
question = input("あなた:")
if question.lower() == "exit":
print("対話を終了します。")
break
cfg = {"configurable": {"session_id": sid}}
if needs_database_query(llm, question):
print("[システム:データベース問い合わせモード]\n")
try:
generated_sql = generate_sql(sql_chain, question)
generated_sql_clean = clean_sql(generated_sql)
print("実行予定のSQL文:", generated_sql_clean)
result_rows = db.run(generated_sql_clean)
final_result = sql_conversational_chain.invoke(
{"question":question, "generated_sql":generated_sql, "result_rows":result_rows}, config=cfg)
print(final_result.content)
except Exception as e:
print(f"[エラー:クエリ実行失敗] {str(e)}\n")
else:
print("[システム:通常会話モード]")
final_result = chat_conversational_chain.invoke(
{"question": question}, config=cfg
)
print(final_result.content)
main()
2.開発の流れ及び機能ごとの解説
ここでは、実際に本システムを構築する過程を、機能ごとに分けて詳しく解説します。
各ステップでは、なぜその処理が必要なのか、LangChain内でどのように働くのかを明確に説明していきたいと思います。
2.1 SQliteでデータベースへの接続
まずは最も基礎的な部分であるデータベースの準備と接続です。
本システムでは、軽量なローカルデータベースである SQLite を使用しています。
SQLiteは単独のサーバーを構築する必要がなく、単一のファイルとして存在するため、普段の練習や個人開発に非常に適しています。
db_path = pathlib.Path("example.db")
need_init = not db_path.exists()
# データベースファイル(example.db)に接続する。存在しない場合は自動的に新規作成される
conn = sqlite3.connect(db_path)
if need_init:
# SQL文を実行するためのカーソル(操作用オブジェクト)を取得する。
cur = conn.cursor()
cur.execute("""
CREATE TABLE customers(
id INTEGER PRIMARY KEY,
name TEXT,
city TEXT
);""")
cur.execute("""
CREATE TABLE orders(
id INTEGER PRIMARY KEY,
customer_id INTEGER,
amount REAL,
created_at TEXT
);""")
cur.executemany("INSERT INTO customers(name, city) VALUES (?,?)",
[("Alice","Tokyo"),("Bob","Osaka"),("Carol","Tokyo")])
cur.executemany("INSERT INTO orders(customer_id, amount, created_at) VALUES (?,?,?)",
[(1,120.5,"2024-05-01"),(1,80.0,"2024-08-09"),
(2,50.0,"2024-07-10"),(3,200.0,"2024-09-01")])
# これまでの変更内容をデータベースに確定・保存する
conn.commit()
# データベースとの接続を閉じ、リソースを解放する
conn.close()
上記のコードでは、2つのテーブル customers と orders を作成しています。
customers:顧客情報(id, name, city)
orders:注文情報(id, customer_id, amount, created_at)
この段階でデータベースを初期化しておくことで、後にLangChainのSQLDatabaseクラスを通じて直接操作が可能になります。
db = SQLDatabase.from_uri(f"sqlite:///{db_path}", include_tables=["customers", "orders"])
from_uri() を使うことで、SQLiteのURIスキームを指定し、LangChain側で自動的に接続・スキーマ解析が行われます。
include_tablesを設定することで、アクセスを許可するテーブルを限定できます。
この点はセキュリティ上の観点からは非常に重要です。
2.2 SQL文の作成・チェック及び実行
次は、ユーザーの質問からSQL文を生成し、実行する部分です。
下の2.4のところにも出たきますが、
sql_chain = create_sql_query_chain(llm, db)
というチェーンに何かを検索したいという旨の自然言語を入力すると、それに対応するSQLが返されます。例えばユーザが
東京に住む顧客を教えて
を入力すると、次のようなSQLを生成されます:
SELECT * FROM customers WHERE city='Tokyo'
ただし、AIが生成するSQLは常に安全とは限りません。もしupdate, deleteといったキーワードが含まれているにもかかわらず、そのSQL文を実行してしまうと、データベースに対して予想外の修正・削除を起こしてしまうかもしれません。
そのため、SQL実行前に安全性チェックを行うのが必須です。
forbidden_words = ["update", "delete", "insert"]
def check_sql(sql:str):
lowered = sql.lower()
if any(k in lowered for k in forbidden_words):
raise ValueError("データベースの内容を変更する操作は禁止です。")
if "limit" not in lowered:
sql += " LIMIT 10"
return sql
ここでは、次のような2つの安全対策を実装しています:
・UPDATE, DELETE, INSERT など、データを変更する操作は禁止
・結果の件数制限として LIMIT 10 を自動的に付加
これにより、LLMが誤って全件取得や削除を行うリスクを防げるようになりました。
また、自分が作っていた時、このままだと出力に「SQLQuery:」や「SQL:」という余計な内容も含まれてしまうので、clean_sqlという前処理関数対処します。
def clean_sql(generated_sql):
generated_sql_clean = generated_sql.strip()
if generated_sql_clean.startswith("SQLQuery:"):
generated_sql_clean = generated_sql_clean[len("SQLQuery:"):].strip()
elif generated_sql_clean.startswith("SQL:"):
generated_sql_clean = generated_sql_clean[len("SQL:"):].strip()
generated_sql_clean = check_sql(generated_sql_clean)
return generated_sql_clean
最後に、整形済みSQLをLangChainのdb.run()で実行します。
result_rows = db.run(generated_sql_clean)
2.3 Redisで履歴を保存
われわれが実現したいのは会話型システムであり、一門一答のような独立しているわけではありません。ユーザが連続的に質問を投げる場合、その前の会話のやり取り、つまり対話のコンテキストを常に把握する必要があるわけです。
今のコードのままだと、プログラムを止める時点で、今回の実行で行われた会話のやり取りの履歴が全部失われてしまいます。何かの方法を使って会話を永続化設定することで、たとえ再実行しても前回の履歴が残れるようなシステムを実現したいですね。
LangChainには、会話の状態を保持するためのMessageHistoryという仕組みが用意されています。今回はRedisを使ってそれを実現しました。毎回の実行にて同じsession_id(会話id)を設定すれば、今までの会話の履歴が全部覚えられます。
Redisを使うことで、次のような利点があります:
・メモリ上で高速にアクセス可能
・複数セッションの履歴を同時に保持
・アプリケーション再起動後も会話履歴を保持可能
def get_session_history_redis(session_id: str):
return RedisChatMessageHistory(session_id=session_id, url="redis://localhost:6379")
2.4 2種類の会話チェーンの作成
ユーザからは、雑談や挨拶のような、SQLを生成する必要がなく直接答えれば良い質問をされる場合があります。この場合、無理やりSQLに変換しようとしたら、意味不明なSQLが生成されてしまい、db.run()で実行した時必ずエラーが出てきます。そのため、ユーザの質問内容に対して、SQL文に変換する必要が本当にあるかを一回判断して、そこで分岐をかけ、異なる処理ロジックに入らせることが必要になってきます。
このシステムにおいては、SQL文に変換する必要性によって、SQL回答チェーンと通常会話チェーンという二つの処理ロジックを用意しました。
2.4.1 SQL回答チェーン
まず、SQL実行結果を自然言語で説明するためのテンプレートを作成します。
answer_template = PromptTemplate.from_template(
"ユーザーの質問:{question}\nSQL文:{generated_sql}\n実行結果:{result_rows}\n"
"この結果を簡潔な日本語で説明し、重要な数値を指摘してください。\n"
"履歴内容:{history}"
)
sql_answer_chain = answer_template | llm
ここでは、SQL文と結果の両方をプロンプトに含めることで、モデルが「結果に基づいた自然な説明文」を生成できるようにしています。
さらに、履歴連携のために RunnableWithMessageHistory を利用します。
sql_conversational_chain = RunnableWithMessageHistory(
sql_answer_chain,
get_session_history_redis,
input_messages_key="question",
history_messages_key="history"
)
2.4.2 通常会話チェーン
一方、雑談や挨拶のようにSQLを必要としない質問には、シンプルな対話プロンプトを使用します。
chat_prompt = ChatPromptTemplate([
("system", """あなたはデータベース問い合わせアシスタントです。
customersとordersに関する質問に回答してください。
過去の履歴に基づいた会話も可能です。
挨拶や雑談にも丁寧に対応してください。"""),
MessagesPlaceholder(variable_name="history"),
("user", "{question}")
])
chat_chain = chat_prompt | llm
chat_conversational_chain = RunnableWithMessageHistory(
chat_chain,
get_session_history_redis,
input_messages_key="question",
history_messages_key="history"
)
システムは、**needs_database_query()**という関数を通じて
「SQL文に必要性」を判定し、分岐に入ります。
if needs_database_query(llm, question):
# SQL対話チェーンを使用
else:
# 通常会話チェーンを使用
3.デモ
システムを起動し、まずは挨拶:

明らかな挨拶ですので、通常会話モードに切り替えられます。
次に東京の顧客の注文額の合計を尋ねてみます:

システムはデータベース問い合わせモードへと切り替わり、SQLを生成し、回答を返してくれます。
そしていったんプログラムを止めて、もう一度実行します。


おめでたいことに、前回の履歴をはっきり覚えてくれていますね
4.まとめ
以上で説明した通り、LangChainのRunnableWithMessageHistoryやSQLDatabaseなどを活用することで、「自然言語 → SQL生成 → 結果解釈 → 会話履歴保持」という完全な質問・回答サイクルを実現し、わずか数百行で高機能なデータベース会話アシスタントを構築することができました。
最後までお読みいただき、ありがとうございました!