はじめに
この記事では、Databricks Vector SearchとFoundation Model APIを使って、猫カフェの情報を案内するRAG(Retrieval-Augmented Generation)を構築する方法をご紹介します。本プロジェクトでは、猫の情報、店舗情報、料金・ルール情報という3つの異なる情報源から適切な情報を検索し、自然な日本語で回答するチャットボットを実装しました。
システム構成
使用技術スタック
- Databricks Vector Search: セマンティック検索エンジン
- Delta Lake: データストレージ(Change Data Feed有効化)
- Foundation Model API: LLM推論(Llama 3.3 70B Instruct)
- Embedding Model: databricks-gte-large-en
- 開発言語: Python (PySpark), SQL
アーキテクチャ概要
[元データ(CSV)]
↓
[Icebergテーブル] ※今後Snowflakeと連携するのによさそうなIcebergで挑戦
↓
[RAG用Deltaテーブル(CDF有効化)] ※ここはIcebergだとダメだった
↓
[Vector Searchインデックス(自動Embedding)]
↓
[ユーザークエリ] → [Vector Search] → [LLM] → [回答生成]
準備: データとテーブルの作成
1. Unity Catalogの設定
まず、プロジェクト用のCatalog、Schema、Volumeを作成します。
-- カタログを作成
CREATE CATALOG IF NOT EXISTS cat_cafe_chatbot;
-- スキーマとボリュームを作成
CREATE SCHEMA IF NOT EXISTS cat_cafe_chatbot.sch_chatbot;
CREATE VOLUME IF NOT EXISTS cat_cafe_chatbot.sch_chatbot.vol_chatbot;
2. データファイルの準備
サンプルデータとして以下の3つのCSVファイルを作成しました。
猫情報.csv
猫名前,品種,店舗名,性格,特徴
タマ,スコティッシュフォールド,渋谷店,人懐っこい,膝に乗るのが好き、折れ耳が特徴
ミケ,三毛猫,渋谷店,元気で遊び好き,おもちゃ大好き、カラフルな毛並み
シロ,白猫,新宿店,ツンデレ,気分屋、真っ白な毛が美しい
...
店舗情報.csv
店舗名,営業時間,住所,特徴,最寄り駅
渋谷店,10:00-20:00,東京都渋谷区道玄坂1-2-3,広々とした空間、猫用キャットタワーが豊富,渋谷駅徒歩5分
新宿店,11:00-22:00,東京都新宿区新宿3-4-5,夜遅くまで営業、駅近で仕事帰りに便利,新宿駅徒歩3分
...
料金ルール.csv
項目,内容,備考
基本料金30分,500円,全店舗共通
基本料金1時間,900円,全店舗共通
ルール1,猫を無理に触らない,猫の意思を尊重してください
...
3. データのVolumeへのコピー
Workspace上のファイルをVolumeにコピーします。
catalog_name = "cat_cafe_chatbot"
schema_name = "sch_chatbot"
volume_name = "vol_chatbot"
volume_path = f"{catalog_name}/{schema_name}/{volume_name}"
workspace_path = "/Workspace/Users/<your-email>/project-cat-cafe-chatbot"
workspace_files = {
f"{workspace_path}/data/猫情報.csv": "猫情報.csv",
f"{workspace_path}/data/店舗情報.csv": "店舗情報.csv",
f"{workspace_path}/data/料金ルール.csv": "料金ルール.csv"
}
for source, filename in workspace_files.items():
destination = f"/Volumes/{volume_path}/{filename}"
dbutils.fs.cp(source, destination)
print(f"✅ コピー完了: {filename}")
4. Icebergテーブルの作成
CSVファイルからIcebergテーブルを作成します。
-- 猫情報テーブル
CREATE OR REPLACE TABLE cat_cafe_chatbot.sch_chatbot.neko_iceberg
USING iceberg
AS SELECT * FROM read_files(
'/Volumes/cat_cafe_chatbot/sch_chatbot/vol_chatbot/ねこ情報.csv',
format => 'csv',
header => true
)
-- 店舗情報テーブル
CREATE OR REPLACE TABLE cat_cafe_chatbot.sch_chatbot.shop_iceberg
USING iceberg
AS SELECT * FROM read_files(
'/Volumes/cat_cafe_chatbot/sch_chatbot/vol_chatbot/店舗情報.csv',
format => 'csv',
header => true
)
-- 料金ルールテーブル
CREATE OR REPLACE TABLE cat_cafe_chatbot.sch_chatbot.price_iceberg
USING iceberg
AS SELECT * FROM read_files(
'/Volumes/cat_cafe_chatbot/sch_chatbot/vol_chatbot/料金ルール.csv',
format => 'csv',
header => true
)
RAG用テーブルの作成
重要: IcebergテーブルとDeltaテーブルの違い
Databricks Vector SearchのDelta Sync Indexは、Change Data Feed(CDF)をサポートするDeltaテーブルでのみ動作します。Icebergテーブルは現時点でCDFに対応していないため、Vector Search用には必ずDeltaテーブルを使用する必要があります。
1. Embeddingスキーマの作成
spark.sql("CREATE SCHEMA IF NOT EXISTS cat_cafe_chatbot.sch_embedding")
2. データの自然言語テキスト化
構造化データをLLMが理解しやすい自然言語テキストに変換します。
import pandas as pd
# 猫情報をテキスト化
df = spark.sql("SELECT * FROM cat_cafe_chatbot.sch_chatbot.neko_iceberg")
pdf = df.toPandas()
texts = []
for _, row in pdf.iterrows():
name = row.get('猫名前', '')
feature = row.get('特徴', '')
race = row.get('品種', '')
shop = row.get('店舗名', '')
text = f"猫の名前は{name}、特徴は{feature}、品種は{race}で{shop}で会うことができます。"
texts.append(text)
# 出力例:
# "猫の名前はタマ、特徴は膝に乗るのが好き、折れ耳が特徴、品種はスコティッシュフォールドで渋谷店で会うことができます。"
3. RAG用Deltaテーブルの保存(CDF有効化)
# テキストをDataFrameに変換
df_texts = pd.DataFrame({
'id': range(1, len(texts) + 1),
'description': texts
})
spark_df_texts = spark.createDataFrame(df_texts)
# Deltaテーブルとして保存
spark_df_texts.write.format("delta").mode("overwrite").saveAsTable(
"cat_cafe_chatbot.sch_embedding.neko_description_rag"
)
# Change Data Feedを有効化
spark.sql("""
ALTER TABLE cat_cafe_chatbot.sch_embedding.neko_description_rag
SET TBLPROPERTIES (delta.enableChangeDataFeed = true)
""")
同様に店舗情報、料金情報もテキスト化してテーブルを作成します。
# 店舗情報のテキスト化例
shop_texts = []
for _, row in shop_pdf.iterrows():
name = row.get('店舗名', '')
hours = row.get('営業時間', '')
address = row.get('住所', '')
feature = row.get('特徴', '')
station = row.get('最寄り駅', '')
text = f"{name}は{hours}営業で、住所は{address}、{feature}が特徴です。{station}からアクセスできます。"
shop_texts.append(text)
# 料金情報のテキスト化例
price_texts = []
for _, row in price_pdf.iterrows():
item = row.get('項目', '')
content = row.get('内容', '')
note = row.get('備考', '')
text = f"{item}は{content}です。{note}"
price_texts.append(text)
Vector Searchの構築
1. Vector Searchエンドポイントの作成
Databricks UIから以下の手順でエンドポイントを作成します。
- Compute → Vector Search → Create Endpoint
- エンドポイント名を入力(例:
cat-cafe-vector-endpoint) - "Create"をクリック
- ステータスが"Online"になるまで待機(数分かかります)
2. Vector Searchインデックスの作成
UIまたはコードでインデックスを作成します。ここではコード例を示します。
from databricks.vector_search.client import VectorSearchClient
# クライアント初期化
client = VectorSearchClient()
ENDPOINT_NAME = "cat-cafe-vector-endpoint"
# 猫情報インデックス作成
client.create_delta_sync_index(
endpoint_name=ENDPOINT_NAME,
index_name="cat_cafe_chatbot.sch_embedding.neko_index",
source_table_name="cat_cafe_chatbot.sch_embedding.neko_description_rag",
pipeline_type="TRIGGERED",
primary_key="id",
embedding_source_column="description",
embedding_model_endpoint_name="databricks-gte-large-en"
)
# 店舗情報インデックス作成
client.create_delta_sync_index(
endpoint_name=ENDPOINT_NAME,
index_name="cat_cafe_chatbot.sch_embedding.shop_index",
source_table_name="cat_cafe_chatbot.sch_embedding.shop_description_rag",
pipeline_type="TRIGGERED",
primary_key="id",
embedding_source_column="description",
embedding_model_endpoint_name="databricks-gte-large-en"
)
# 料金情報インデックス作成
client.create_delta_sync_index(
endpoint_name=ENDPOINT_NAME,
index_name="cat_cafe_chatbot.sch_embedding.price_index",
source_table_name="cat_cafe_chatbot.sch_embedding.price_description_rag",
pipeline_type="TRIGGERED",
primary_key="id",
embedding_source_column="description",
embedding_model_endpoint_name="databricks-gte-large-en"
)
3. インデックスの同期確認
# インデックスステータスの確認
index = client.get_index(
endpoint_name=ENDPOINT_NAME,
index_name="cat_cafe_chatbot.sch_embedding.neko_index"
)
status = index.describe()
print(status)
ステータスがONLINEになっていることを確認してください。
RAG関数の実装
1. 単一インデックス版RAG関数
まず、1つのインデックスから検索する基本的なRAG関数を実装します。
from databricks.vector_search.client import VectorSearchClient
import mlflow.deployments
# クライアント初期化
client = VectorSearchClient()
deploy_client = mlflow.deployments.get_deploy_client("databricks")
ENDPOINT_NAME = "cat-cafe-vector-endpoint"
INDEX_NAME = "cat_cafe_chatbot.sch_embedding.neko_index"
def chat_with_rag(question: str, num_results: int = 3) -> str:
"""
Vector Searchで関連情報を検索し、LLMで回答を生成
"""
# Vector Searchで検索
index = client.get_index(
endpoint_name=ENDPOINT_NAME,
index_name=INDEX_NAME
)
results = index.similarity_search(
query_text=question,
columns=["id", "description"],
num_results=num_results
)
# 検索結果をコンテキストとして整形
context = "\n".join([
doc['description']
for doc in results.get('result', {}).get('data_array', [])
])
# プロンプト作成
prompt = f"""以下の情報を参考に質問に答えてください。
情報:
{context}
質問: {question}
回答:"""
# LLMで回答生成
response = deploy_client.predict(
endpoint="databricks-meta-llama-3-3-70b-instruct",
inputs={
"messages": [{"role": "user", "content": prompt}],
"max_tokens": 500,
"temperature": 0.7
}
)
return response['choices'][0]['message']['content']
# テスト実行
answer = chat_with_rag("白い猫はいますか?")
print(answer)
# 出力例: "はい、シロという名前の白猫が新宿店にいます。真っ白な毛が美しく、ツンデレな性格が特徴です。"
2. 複数インデックス版RAG関数
複数の情報源から統合的に検索する高度なRAG関数を実装します。
def chat_with_rag_multi(question: str, num_results: int = 3) -> str:
"""
複数のVector Searchインデックスから検索し、統合した情報でLLMが回答
"""
indexes = {
"猫情報": "cat_cafe_chatbot.sch_embedding.neko_index",
"店舗情報": "cat_cafe_chatbot.sch_embedding.shop_index",
"料金情報": "cat_cafe_chatbot.sch_embedding.price_index"
}
all_contexts = []
# 各インデックスから検索
for name, index_name in indexes.items():
try:
index = client.get_index(
endpoint_name=ENDPOINT_NAME,
index_name=index_name
)
results = index.similarity_search(
query_text=question,
columns=["id", "description"],
num_results=num_results
)
docs = results.get('result', {}).get('data_array', [])
if docs:
context = f"【{name}】\n" + "\n".join([
doc['description'] for doc in docs
])
all_contexts.append(context)
except Exception as e:
print(f"⚠️ {name}の検索でエラー: {e}")
continue
# 統合コンテキスト作成
combined_context = "\n\n".join(all_contexts)
# プロンプト作成
prompt = f"""以下の猫カフェに関する情報を参考に、質問に丁寧に答えてください。
{combined_context}
質問: {question}
回答:"""
# LLM回答生成
response = deploy_client.predict(
endpoint="databricks-meta-llama-3-3-70b-instruct",
inputs={
"messages": [{"role": "user", "content": prompt}],
"max_tokens": 500,
"temperature": 0.7
}
)
return response['choices'][0]['message']['content']
# テスト実行
answer = chat_with_rag_multi("渋谷店の特徴といる猫を教えて")
print(answer)
3. 実行結果例
# 例1: 猫の検索
Q: "白い猫がいる店舗を教えてください"
A: "白い猫のシロは新宿店にいます。新宿店は11:00-22:00営業で、
新宿駅徒歩3分の便利な立地です。夜遅くまで営業しているので
仕事帰りにも立ち寄りやすい店舗です。"
# 例2: 料金の問い合わせ
Q: "1時間いくらですか?"
A: "1時間の基本料金は900円です(全店舗共通)。
また、時間無制限のフリータイムプランは2000円でご利用いただけます。"
# 例3: 複合的な質問
Q: "人懐っこい猫に会いたいのですが、どの店舗に行けばいいですか?"
A: "人懐っこいタマ(スコティッシュフォールド)が渋谷店にいます。
渋谷店は10:00-20:00営業で、渋谷駅から徒歩5分です。
広々とした空間と豊富なキャットタワーが特徴の店舗です。"
トラブルシューティング
1. Change Data Feed未有効化エラー
エラー: "Change data feed is not enabled on the source table"
解決方法:
ALTER TABLE cat_cafe_chatbot.sch_embedding.neko_description_rag
SET TBLPROPERTIES (delta.enableChangeDataFeed = true);
2. インデックスが404エラー
原因: インデックスの初回同期が未完了
解決方法:
# ステータス確認
index = client.get_index(endpoint_name=ENDPOINT_NAME, index_name=INDEX_NAME)
print(index.describe())
# 手動同期トリガー
index.sync()
3. Embeddingモデルエラー
確認事項:
-
databricks-gte-large-enが利用可能か確認 - リージョンによってはモデル名が異なる場合がある
パフォーマンス最適化
1. インデックスオブジェクトの再利用
毎回get_index()を呼ぶのではなく、インデックスオブジェクトをキャッシュします。
class RAGChatbot:
def __init__(self, endpoint_name: str):
self.client = VectorSearchClient()
self.deploy_client = mlflow.deployments.get_deploy_client("databricks")
self.endpoint_name = endpoint_name
# インデックスを事前にロード
self.indexes = {
"neko": self.client.get_index(
endpoint_name=endpoint_name,
index_name="cat_cafe_chatbot.sch_embedding.neko_index"
),
"shop": self.client.get_index(
endpoint_name=endpoint_name,
index_name="cat_cafe_chatbot.sch_embedding.shop_index"
),
"price": self.client.get_index(
endpoint_name=endpoint_name,
index_name="cat_cafe_chatbot.sch_embedding.price_index"
)
}
def chat(self, question: str) -> str:
# インデックスオブジェクトを直接使用
results = self.indexes["neko"].similarity_search(
query_text=question,
columns=["id", "description"],
num_results=3
)
# ... (以降の処理)
2. 並列検索の実装
複数インデックスの検索を並列化します。
from concurrent.futures import ThreadPoolExecutor, as_completed
def search_index(index, question, num_results):
return index.similarity_search(
query_text=question,
columns=["id", "description"],
num_results=num_results
)
# 並列検索
with ThreadPoolExecutor(max_workers=3) as executor:
futures = {
executor.submit(search_index, idx, question, 3): name
for name, idx in self.indexes.items()
}
for future in as_completed(futures):
name = futures[future]
try:
results = future.result()
# 結果を処理
except Exception as e:
print(f"Error in {name}: {e}")
今後の拡張案
1. 会話履歴管理
class ConversationalRAG:
def __init__(self):
self.history = []
def chat(self, question: str) -> str:
# 履歴を含めたプロンプト生成
history_text = "\n".join([
f"Q: {q}\nA: {a}" for q, a in self.history
])
prompt = f"""過去の会話:
{history_text}
新しい質問: {question}"""
# 回答生成
answer = self.generate_answer(prompt)
# 履歴に追加
self.history.append((question, answer))
return answer
2. Hybrid Search(キーワード+ベクトル)
Vector Searchにフィルタ条件を追加して、より精度の高い検索を実現します。
# 店舗名でフィルタリング
results = index.similarity_search(
query_text="白い猫",
columns=["id", "description"],
filters={"shop_name": "新宿店"},
num_results=5
)
3. UIの作成(Gradioなど)
import gradio as gr
def chatbot_interface(message, history):
response = chat_with_rag_multi(message)
return response
demo = gr.ChatInterface(
fn=chatbot_interface,
title="猫カフェ案内チャットボット",
description="猫カフェの情報をお気軽にお尋ねください!"
)
demo.launch()
まとめ
本記事では、Databricks Vector SearchとFoundation Model APIを使った実用的なRAGチャットボットの構築方法を解説しました。
重要ポイント
- Deltaテーブル必須: Vector SearchのDelta Sync IndexはChange Data Feed対応のDeltaテーブルが必要
- 自然言語化: 構造化データは自然言語テキストに変換してから保存
- 複数インデックス: 異なる情報源ごとにインデックスを分けることで柔軟な検索が可能
- プロンプトエンジニアリング: LLMへのプロンプトの質が回答の質に直結
開発のポイント
- インデックス作成後は必ず同期ステータスを確認
- テスト時は少数のデータから始める
- エラーハンドリングを適切に実装
- パフォーマンス最適化は必要に応じて段階的に
このRAGシステムは、猫カフェ以外のドメイン(レストラン案内、商品検索、社内FAQ等)にも応用可能です。ぜひ自分のユースケースに合わせてカスタマイズしてみてください。
次は Databricks Apps を使ってチャットボットアプリに進化させていきたいです。
参考リンク
- Databricks Vector Search Documentation
- Foundation Model APIs
- Delta Lake Change Data Feed
- Unity Catalog
検証環境: Databricks Runtime 14.x, Vector Search Endpoint
完成度: 約85%(コア機能完成、デプロイ機能は今後実装予定)