FAISSとAzure OpenAIを組み合わせたRAGシステム
この記事では、FAISSによるベクトル検索とAzure OpenAIを組み合わせた、RAGシステムのサンプルコードを紹介します。
SIGNATEというコンペサイトで 「(金融庁共催)第3回金融データ活用チャレンジ」 というコンペが開催されていました。RAGシステムの構築が課題で、実際のユーズケースを想定した作りにしています。
本記事が、RAGシステムの構築において少しでもお役に立てば幸いです。
※コンペの詳細はリンク先でご確認ください。
システム全体の方針
以下の事項を基本理念においてシステムを構築しています。
-
SaaSを利用する
Azure OpenAI などの外部SaaSを積極的に利用し、柔軟に拡張できるようにします。 -
修正&追加が比較的容易な構成にする
ストア修正や設定ファイル化など、後から手を加えやすい設計を心がけます。 -
各処理を独立化させて、展開やモジュール利用をしやすいようにする
例:PDF処理部分を独立したモジュールにまとめる、ベクトルストアの構築と別プログラムにする、など。
システム構成の大まかな内容
-
PDF処理層
PDFファイルからテキストやメタデータを抽出し、後段のベクトルストア構築に渡す前処理を担います。 -
ベクトルストア構築層
抽出されたテキストをEmbeddingしてFAISSなどのベクトルDBに格納し、検索可能な状態を作ります。 -
回答層
ユーザの質問に対して、ベクトル検索結果を用いて回答を生成します。キーワード検索などを組み合わせてリコール(検索漏れ)の補完も行います。
PDFからテキストを抽出してJSON化するサンプルコード
以下では、Azure Document Intelligence によるPDF解析と、
一連の流れを、あくまでサンプルとして示します。
実行に必要な環境やライブラリは各自でセットアップしてください(PyPDF2
, azure-ai-documentintelligence
, openai
など)。
- PDFページ数が多い場合は分割して
prebuilt-layout
モデルで解析し、図や表を含むマークダウンを取得します。 - 図表はテーブル構造として、Markdownになるようオプション指定します。
import os
import json
import PyPDF2
from azure.core.credentials import AzureKeyCredential
from azure.ai.documentintelligence import DocumentIntelligenceClient
from azure.ai.documentintelligence.models import (
AnalyzeOutputOption,
DocumentContentFormat
)
def split_pdf_if_needed(original_pdf_path, max_pages=100):
"""PDFのページが多い場合は前後に分割して保存し、分割後のパスを返す。"""
base_name = os.path.splitext(os.path.basename(original_pdf_path))[0]
temp_dir = "temp_splitted_pdfs"
os.makedirs(temp_dir, exist_ok=True)
reader = PyPDF2.PdfReader(original_pdf_path)
total_pages = len(reader.pages)
if total_pages <= max_pages:
return [original_pdf_path] # 分割不要
part1_path = os.path.join(temp_dir, f"{base_name}_part1.pdf")
part2_path = os.path.join(temp_dir, f"{base_name}_part2.pdf")
writer1 = PyPDF2.PdfWriter()
for i in range(max_pages):
writer1.add_page(reader.pages[i])
with open(part1_path, "wb") as f1:
writer1.write(f1)
writer2 = PyPDF2.PdfWriter()
for i in range(max_pages, total_pages):
writer2.add_page(reader.pages[i])
with open(part2_path, "wb") as f2:
writer2.write(f2)
return [part1_path, part2_path]
def analyze_pdf_to_markdown(pdf_file_path, client):
"""
1つのPDFをAzure Document Intelligenceへ投げ、
マークダウン形式の結果を取得して返す。
"""
with open(pdf_file_path, "rb") as f:
poller = client.begin_analyze_document(
model_id="prebuilt-layout",
body=f,
output_content_format=DocumentContentFormat.MARKDOWN,
output=[AnalyzeOutputOption.FIGURES] # 図の抽出を有効化
)
result = poller.result()
return result.content
def sample_pdf_processing():
# Azure Document Intelligence 設定
endpoint = "your_endpoint"
key = "your_key"
doc_client = DocumentIntelligenceClient(endpoint, AzureKeyCredential(key))
input_pdf_dir = "your_dir_pdf"
output_md_dir = "your_dir"
os.makedirs(output_md_dir, exist_ok=True)
pdf_files = [f for f in os.listdir(input_pdf_dir) if f.lower().endswith(".pdf")]
for pdf_file in pdf_files:
pdf_path = os.path.join(input_pdf_dir, pdf_file)
splitted_files = split_pdf_if_needed(pdf_path)
combined_md = ""
for part_path in splitted_files:
md_content = analyze_pdf_to_markdown(part_path, doc_client)
if combined_md:
combined_md += "\n\n---\n\n"
combined_md += md_content
base_name = os.path.splitext(pdf_file)[0]
md_out_path = os.path.join(output_md_dir, f"{base_name}.md")
with open(md_out_path, "w", encoding="utf-8") as outf:
outf.write(combined_md)
print(f"[Info] Saved: {md_out_path}")
if __name__ == "__main__":
sample_pdf_processing()
この後に、LLMでチャンキングできるように別プログラムで分割処理をします。
※1PDFファイルが1マークダウンファイルとして出力されるので、1ページごとに分割して、LLMのコンテキスト長を超えないようにします。
ここがポイント
- 抽出した図表はJSON テーブルで出力
図表の抽出において、テキスト情報はDocument Intelligenceが強力です。
自作だときれいな抽出が出来ませんでした。 - Markdown を置換して最終出力
Markdown形式のよりもJSONコードブロックとして、出力した方が精度が高いです。
※図表の部分のみがチャンキングされた場合は後述のハイブリッド検索が効いてくるので問題なくチャンクの特定が出来ます。 - 「ゴミを入れれば、ゴミが出てくる」という言葉があるように、うまく抽出できなければ、そもそも回答できないので、きれいなデータを用意するという点はとても重要です。
ベクトルストア構築の流れ
続いてはベクトルストアの構築です。
-
LLMによるチャンク分割
通常は文字数などで単純分割しがちですが、本コードでは マークダウン構造を考慮 して分割を行うために LLM へ指示します。これにより、表やコードブロックなど、文脈を壊さないチャンクにできます。 -
キーワード抽出
各チャンクに対して LLM で「重要キーワード」を抽出し、メタデータとして保持。後の検索やランキングに活かせます。 -
FAISS への格納
Embedding したベクトルを FAISS に格納し、.index
ファイルとして保存。FAISS はベクトルのみ扱うため、チャンク本文やキーワードなどのメタ情報は JSON ファイルに書き出します。
ベクトルストア構築部分
以下が、サンプルコードです。ディレクトリ内の Markdown ファイルを対象として、チャンク化 → キーワード抽出 → Embedding → FAISSインデックス&JSON 保存 まで行います。
実際の運用を想定するならAzure AI Search などのSaaSがいいです。今回はお金の都合上一旦、ローカルで作っています。(ベクトルDBを作り直すたびにお金がかかるので)
import os
import json
import faiss
import numpy as np
import re
from openai import AzureOpenAI
def create_openai_client(api_key: str, endpoint: str):
"""Azure OpenAI クライアント生成"""
client = AzureOpenAI(
api_key=api_key,
api_version="your_version",
azure_endpoint=your_endpoint
)
return client
# Embeddingモデル名(AzureOpenAIのデプロイ名にあわせて適宜変更)
EMBED_MODEL = "your_model"
# 生成AIのモデル名(適宜変更)
GPT_MODEL = "your_model"
# ベクトルDB保存先
VECTORSTORE_DIR = "your_vectorstore"
# ファイル名の先頭にある数字→企業名マップ (例)
# ここは重要ではないのであってもなくてもいいです。今回は与えられたものは手修正しないようにしています。
COMPANY_MAP = {
"1": "企業名"
# 必要に応じて追加
}
def main():
# 1) クライアント生成
client = create_openai_client("your_key","your_endpoint")
# 2) ベクトルDB 未構築の場合は構築
converted_dir = "your_dir" # 対象となるMarkdownが格納されているディレクトリ
os.makedirs(VECTORSTORE_DIR, exist_ok=True)
for f in os.listdir(converted_dir):
if f.lower().endswith(".md"):
base_name = os.path.splitext(f)[0]
# すでにこのファイル用のチャンクindexが存在するかどうかを判定
chunk_exists = any(
file_name.startswith(f"{base_name}_chunk")
for file_name in os.listdir(VECTORSTORE_DIR)
)
if chunk_exists:
print(f"[Info] '{f}' のインデックスが既に存在するためスキップします。")
continue
path = os.path.join(converted_dir, f)
with open(path, "r", encoding="utf-8") as f_:
text = f_.read()
# インデックスが存在しなければ以下で作成
build_vectorstore_by_chunk(client, f, text)
print("[Info] ベクトルDB構築が完了しました。")
def build_vectorstore_by_chunk(client, filename, text):
"""
1) マークダウン解析によるチャンク分割
2) チャンクごとに埋め込み→FAISS に保存
"""
base_name = os.path.splitext(filename)[0]
# 企業名を推定(ファイル名の先頭の数字をキーに)
prefix = base_name.split("_")[0]
company_name = COMPANY_MAP.get(prefix, "不明企業")
# 1) LLMを使って "マークダウン構造を考慮" したチャンク分割
chunks = llm_chunk_text_markdown(client, text)
print(f"[Info] '{filename}' は {len(chunks)} チャンクに分割されました。")
# 2) 各チャンクに対して、Embedding & メタ情報を FAISS / JSON に保存
for i, chunk_str in enumerate(chunks):
# キーワード抽出
keywords = llm_extract_keywords(client, chunk_str)
# チャンクをベクトル化
chunk_vector = embed_texts(client, [chunk_str])
if not chunk_vector:
print(f"[Warning] 埋め込みに失敗: chunk {i}")
continue
# FAISS にベクトル登録 (1チャンク = 1ベクトル)
dim = len(chunk_vector[0])
index = faiss.IndexFlatIP(dim)
vector_np = np.array(chunk_vector).astype('float32') # shape (1, dim)
index.add(vector_np)
# インデックスの保存先
chunk_index_path = os.path.join(
VECTORSTORE_DIR,
f"{base_name}_chunk{i}.index"
)
faiss.write_index(index, chunk_index_path)
# JSON保存 (メタ情報)
record = {
"text_chunk": chunk_str,
"keywords": keywords,
"chunk_vector": chunk_vector[0], # ベクトル
"metadata": {
"source_name": f"{company_name} 統合報告書",
"company_name": company_name,
"filename": filename,
"chunk_index": i
}
}
meta_filename = f"{base_name}_chunk{i}.json"
chunk_meta_path = os.path.join(VECTORSTORE_DIR, meta_filename)
with open(chunk_meta_path, "w", encoding="utf-8") as f:
json.dump(record, f, ensure_ascii=False, indent=2)
print(f" -> saved: {chunk_index_path}, {chunk_meta_path}")
def llm_chunk_text_markdown(client, markdown_text):
"""
LLMに、マークダウン構造を考慮して文章をチャンク分割するよう指示。
JSON配列形式でチャンクを返すのがポイント。
"""
system_prompt = (
"あなたは文章を分割できるアシスタントです。以下の文章はマークダウン形式です。\n"
"表、コードブロック、ヘッダ(#, ##, ###など)、HTMLコメントなど多様な要素が含まれています。\n"
"これらを含めたマークダウン構造を理解したうえで、必要に応じてセクション(チャンク)を分割してください。\n"
"ただし必ず複数チャンクに分割する必要はなく、1チャンクにまとまることもありえます。\n"
"表やコードブロックなどは原文を壊さないようにまとめて同じチャンクに含めてください。\n"
"出力はJSON配列のみで返し、各要素は文字列としてマークダウンをできるだけ保持してください。\n"
"余計な説明やコードブロックやコメントは付けずに、JSON配列だけを返してください。"
)
system_msg = {"role": "system", "content": system_prompt}
user_msg = {"role": "user", "content": markdown_text}
try:
response = client.chat.completions.create(
model=GPT_MODEL,
messages=[system_msg, user_msg],
max_tokens=2500,
temperature=0.0
)
raw_text = response.choices[0].message.content.strip()
# コードブロックを取り除いて純粋なJSONを抽出
json_str = extract_json_content(raw_text)
chunks = json.loads(json_str)
# チャンクが文字列リストかをチェック
if isinstance(chunks, list) and all(isinstance(c, str) for c in chunks):
return chunks
else:
print("[Warning] LLM が想定外の形式を返したため、全文を1チャンクにします。")
return [markdown_text]
except Exception as e:
print(f"[Warning] llm_chunk_text_markdown failed: {e}")
return [markdown_text]
def extract_json_content(raw_text):
"""
LLM応答が ```json ... ``` のようにコードブロック付きで返ってきた場合、
その中から JSON 部分だけを取り出して返す。
"""
pattern = r"```(?:json)?\s*(.*?)\s*```"
match = re.search(pattern, raw_text, flags=re.DOTALL)
if match:
return match.group(1).strip()
return raw_text.strip()
def llm_extract_keywords(client, chunk_text):
"""
LLMにテキストを渡し、重要なキーワードを抽出してJSON配列で返す。
"""
system_prompt = (
"あなたは文章から重要なキーワードを抽出できる有能なアシスタントです。\n"
"以下の文章の主題やテーマをよく表す単語・短いフレーズを抽出し、JSON配列で返してください。\n"
"例: [\"キーワード1\", \"キーワード2\", \"キーワード3\"]\n"
"余計な説明やコードブロックは加えずに、キーワードの配列だけを返してください。"
)
system_msg = {"role": "system", "content": system_prompt}
user_msg = {"role": "user", "content": chunk_text}
try:
response = client.chat.completions.create(
model=GPT_MODEL,
messages=[system_msg, user_msg],
max_tokens=300,
temperature=0.0
)
raw_text = response.choices[0].message.content.strip()
# コードブロックを取り除いて、純粋なJSON部分だけを取り出す
json_text = extract_json_content(raw_text)
keywords = json.loads(json_text)
if isinstance(keywords, list) and all(isinstance(k, str) for k in keywords):
return keywords
else:
print("[Warning] キーワード抽出結果が想定外の形式でした。空リストを返します。")
return []
except Exception as e:
print(f"[Warning] llm_extract_keywords failed: {e}")
return []
def embed_texts(client, texts):
"""
テキストのリストを分割しながら Embedding API を呼び出す。
"""
batch_size = 10
all_embeddings = []
for i in range(0, len(texts), batch_size):
batch = texts[i : i+batch_size]
response = client.embeddings.create(
input=batch,
model=EMBED_MODEL
)
for item in response.data:
emb = item.embedding
all_embeddings.append(emb)
return all_embeddings
if __name__ == "__main__":
main()
ここがポイント
- チャンク分割 (llm_chunk_text_markdown)
LLM へ「マークダウンの構造を意識しつつ、必要に応じて複数チャンクに分割して」と依頼。
返ってきた JSON 配列をパースして、各要素を1チャンクとして扱います。
Dynamic Chunking(動的的チャンキング)で実装しているので、チャンクサイズはLLM任せになります。(図表とその前後の文言を1チャンク内に収めたかったので、一番手軽に実装できるものを選択しました。理想はテキスト部分と図表の部分でそれぞれチャンキングして、意味的に似ていれば、「テキスト+図表」のチャンク、似ていなければ「独立したチャンク」にする方法が良いかなと思います。)
加えて、構造を意識しつつチャンキングするため、ドキュメントからテキスト情報を抽出するモデルは重要です。(ここのプロンプトで少し弱い指示にしているのは、完璧なマークダウンファイルにならない事を前提としている為です。完璧なマークダウンファイルを用意して、以下のようなプロンプトにすれば精度は上がると思います。)
system_prompt = (
"あなたは文章を分割できるアシスタントです。以下の文章はマークダウン形式です。\n"
"表、コードブロック、見出し(#, ##, ###など)、HTMLコメントなど多様な要素が含まれています。\n"
"提供されるマークダウンファイルには一切の誤りがなく、段落や改行も正しく整形されています。\n"
"マークダウンの構造を正しく理解したうえで、必要に応じてセクション(チャンク)を分割してください。\n"
"\n"
"チャンキングの際は、以下を考慮してください:\n"
"1. 見出しや段落など、文書構造上の区切りを尊重してセクションを分割してください。\n"
"2. 大きな見出し配下の文章が長い場合は、さらに小さなチャンクに分けても構いません。\n"
"3. 表やコードブロックなどは途中で分割せず、原文を壊さないように一つのチャンクにまとめてください。\n"
"4. 前後関係が切れて意味が通じなくならないよう、文脈をしっかり保つ必要があります。\n"
"5. ただし必ずしも複数のチャンクに分割する必要はなく、内容によっては1チャンクになる場合もあります。\n"
"\n"
"最終的な出力はJSON配列の形式のみで返してください。\n"
"各配列要素には、マークダウン形式のテキストをそのまま文字列として格納してください。\n"
"余計な説明やコードブロック、コメントなどは付けずに、純粋なJSON配列だけを出力してください。\n"
)
*ここでいう完璧とは、1ページ内の段落、改行や空白などの情報だけでなく、前ページの関係性や図表の位置なども完璧にくみ取れている状態を指します。
- キーワード抽出 (llm_extract_keywords)
それぞれのチャンクについて、LLM から JSON 配列形式で重要キーワードを取得。
将来的にキーワード検索やランキングの際に利用します。
このキーワード抽出によって、図表のみがチャンクになった場合でも、検索で引っかかるようになります。
ここで、抽出したキーワードに対して類似するキーワードを追加してもいいと思います。
回答層
ベクトルストアに格納されたチャンクを用い、ユーザからの質問に対して最終的な回答を生成するフェーズです。ここでは以下のような流れで実装しています。
-
質問の前処理
- 企業名の検出: 質問文中に登場する企業名(同義語含む)を判定。(検索速度向上)
- 多重質問の分割 (任意): LLM で「質問が複数要素に分かれているか」を判定し、必要であれば段階的に分解する。
-
ハイブリッド検索 + RRF
- 質問文を Embedding して FAISS で検索 (ベクトル検索)。
- キーワード検索(LLMから抽出したキーワード)をあわせて実行。
- それぞれの検索結果を RRF (Reciprocal Rank Fusion) によって統合し、最終的に上位チャンクを決定。
-
回答生成 (LLM)
- 決定した上位チャンクを「コンテキスト」として LLM に渡し、ユーザの質問への回答を生成。
- 出力される回答は、コンペ的には不十分なので、後々別プログラムで回答を整形する。
ベクトルストアを構築して回答を得る
以下が、回答層を担うサンプルコードです。かなり長くなるので流れだけにしています。
各関数で使用するプロンプトは残していますので、参考になれば幸いです。
# ----------------------------------------
# メモリ上にベクトルストアを保持するための構造
# ----------------------------------------
class VectorStoreItem:
"""
- index_obj: faiss.Index
- records: [{ "text_chunk": str, "metadata": dict, "keywords": list }, ...]
- company_name: str
- base_filename: str (拡張子除く)
"""
def __init__(self, index_obj, records, company_name, base_filename):
self.index_obj = index_obj
self.records = records
self.company_name = company_name
self.base_filename = base_filename
VECTORSTORE_ITEMS = []
CHUNK_TO_JSON = {} # {(chunk_text, filename): "xxx.json"}
def load_vectorstore():
"""
ベクトルストアの .index と .json を読み込み、メモリに保持。
"""
for fname in os.listdir(VECTORSTORE_DIR):
if fname.endswith(".index"):
base_name = os.path.splitext(fname)[0]
index_file = os.path.join(VECTORSTORE_DIR, fname)
meta_file = os.path.join(VECTORSTORE_DIR, base_name + ".json")
if not os.path.isfile(index_file) or not os.path.isfile(meta_file):
continue
index_obj = faiss.read_index(index_file)
with open(meta_file, "r", encoding="utf-8") as f:
meta_data = json.load(f)
if "records" not in meta_data:
meta_data["records"] = [{
"text_chunk": meta_data.get("text_chunk", ""),
"metadata": meta_data.get("metadata", {}),
"keywords": meta_data.get("keywords", []),
}]
records = meta_data["records"]
company_name = ""
if records and "metadata" in records[0]:
company_name = records[0]["metadata"].get("company_name", "")
VECTORSTORE_ITEMS.append(VectorStoreItem(index_obj, records, company_name, base_name))
# CHUNK_TO_JSON に対応付け
for r in records:
chunk_text = r.get("text_chunk", "")
file_name_in_rec = r.get("metadata", {}).get("filename", "")
CHUNK_TO_JSON[(chunk_text, file_name_in_rec)] = base_name + ".json"
print(f"[Info] Loaded {len(VECTORSTORE_ITEMS)} vectorstore items.")
# -------------------------------------
# 質問分解 (多重質問対応)
# -------------------------------------
def llm_should_split_question(client, question):
"""
質問が複数要素を含むかどうかを LLMに判定させる(はい/いいえ)。
"""
system_prompt = (
"あなたは有能なアシスタントです。以下の質問に対して、与えられた【質問】が複数の要素(段階的な解答を必要とする質問)を含む場合は「はい」とだけ答えてください。"
" 単一の質問の場合は「いいえ」とだけ答えてください。\n"
)
# 実装省略
def llm_split_question(client, question):
"""
複数要素の質問のうち、最初に回答すべき要素のみを抽出して返す。
"""
system_prompt = (
"あなたは有能なアシスタントです。与えられた質問が複数の要素を含む場合、まず最初の要素のみを抽出した質問に分解してください。\n"
"例:\n"
"元の質問:\n"
" 『[企業名]の海外事業において、営業利益が2番目に高い地域に含まれる国として記載がある国名を全て教えてください。』\n"
"分解後の質問:\n"
" 『[企業名]の海外事業において、営業利益が2番目に高い地域を教えてください。』\n"
f"質問: {question}\n"
"分解後の質問のみを回答してください。"
)
# 実装省略
def llm_generate_new_question_from_intermediate_answer(client, original_question, intermediate_answer, sub_question):
"""
中間回答を用いて、新しい質問を生成する。
"""
system_prompt = (
"あなたは有能なアシスタントです。以下の情報を元に、元の質問の意図を損なわずに新しい質問を生成してください。\n"
"また、企業名や年度の情報があれば入れてください。\n"
"元の質問に計算方法の指示(四捨五入)があれば、新しい質問にも反映してください。"
f"元の質問: {original_question}\n"
f"サブクエスチョン: {sub_question}\n"
f"中間回答: {intermediate_answer}\n"
"新しい質問のみを返してください。"
)
# 実装省略
# -------------------------------------
# パイプライン全体
# -------------------------------------
def execute_query_pipeline(client, question, detected_company):
"""
質問に対して、以下の処理を実行する:
1. LLMで類似クエリ生成
2. 複数のクエリ(オリジナルおよび類似クエリ)それぞれでハイブリッド検索を実行
3. RRFでリランキングし、上位結果からコンテキストを作成
4. 作成したコンテキストと質問からLLMに回答生成を依頼
戻り値は (answer, final_top_records)
"""
# --- B) 類似クエリ生成 ---
similar_queries = llm_generate_similar_queries(client, question, num=3)
# オリジナルと類似クエリを全て使用
all_queries = [question] + similar_queries
# キーワード抽出: 類似クエリから抽出する
query_keywords = []
for similar_query in all_queries:
keywords = llm_extract_query_keywords(client, similar_query)
query_keywords.extend(keywords)
combined_keywords = list(set(query_keywords))
# --- C) 各クエリでハイブリッド検索 ---
query_results_list = []
for q in all_queries:
_, top_records_sub = hybrid_search(
client=client,
user_query=q,
top_k=5,
filter_company=detected_company,
combined_keywords=combined_keywords
)
query_results_list.append(top_records_sub)
# --- D) RRFでリランキング ---
final_top_records = reciprocal_rank_fusion(query_results_list, top_k=3, alpha=60.0)
if final_top_records:
context_str = "\n\n".join([r[0] for r in final_top_records])
else:
context_str = ""
# --- E) LLMで最終回答生成 ---
system_msg = {
"role": "system",
"content": (
"【コンテキスト】と【質問】が提供され、あなたタスクはコンテキストの内容から次の事項を踏まえて質問に答えることです。\n"
"- 質問に会社名が含まれている場合は、そのコンテキストはその会社の情報であるとみなしてください。\n"
"- 割合計算は単純なポイント計算を行ってください。(例:24%-20%=4%)\n"
"- わからなければ「分かりません」とだけ答えてください。\n"
)
}
user_msg = {"role": "user", "content": f"【質問】:{question}\n\n【コンテキスト】:\n{context_str}"}
try:
response = client.chat.completions.create(
model=GPT_MODEL_ANSWER,
messages=[system_msg, user_msg]
)
answer = response.choices[0].message.content.strip()
except Exception as e:
answer = f"LLM call failed: {e}"
return answer, final_top_records
# -------------------------------------
# 類似クエリ生成、RRF, ハイブリッド検索等
# -------------------------------------
def llm_generate_similar_queries(...):
# 実装省略
system_prompt = (
"あなたは有能なアシスタントです。ユーザの質問と似た複数のクエリを3〜5個生成してください。\n"
"企業名がある場合はその企業名を使用してください。\n"
"生成されたクエリは、元の質問と関連性があるようにしてください。\n"
"短い文で、言葉の言い換えになるようなクエリをJSON配列で返してください。"
)
pass
def reciprocal_rank_fusion(...):
# 実装省略
"""
query_results_list: 複数クエリの検索結果リスト
( [(chunk_text, file_name, score), ...], # クエリ1の結果
[(chunk_text, file_name, score), ...], # クエリ2の結果
... )
RRFスコア = Σ(1 / (alpha + rank))
"""
pass
def llm_extract_query_keywords(...):
# 実装省略
system_prompt = (
"あなたのタスクは、与えられた文章から重要なキーワードを抽出することです。\n"
"文章の内容に応じたキーワードを可能な限り抽出し、必ず指定された関数形式で結果を返してください。\n"
"返答は余計な文章を含まず、抽出されたキーワードのみを JSON 配列として返してください。"
)
pass
def embed_texts(...):
"""
テキストのリストを分割しつつEmbedding APIを呼び出す
"""
# Embedding API呼び出し省略
pass
def hybrid_search(client, user_query, top_k=3, filter_company=None, combined_keywords=None):
"""
1) ユーザクエリからキーワード抽出(combined_keywords が指定されていなければ抽出)
2) ユーザクエリをEmbedding化してFaiss検索
3) キーワード検索(抽出または外部から渡された combined_keywords を利用)
4) 両者を統合し、上位top_kを返却
"""
if combined_keywords is None:
query_keywords = llm_extract_query_keywords(client, user_query)
combined_keywords = list(set(query_keywords))
print(f"[Info] Extracted keywords: {query_keywords}")
print(f"[Info] Combined keywords: {combined_keywords}")
query_emb = embed_texts(client, [user_query])[0]
vector_results = vector_search(query_emb, top_k=10, filter_company=filter_company)
keyword_results = keyword_search(combined_keywords, top_k=10, filter_company=filter_company)
return context_str, top_n
def vector_search(...):
# ベクトル検索
pass
def keyword_search(...):
# キーワード検索
pass
# -------------------------------------
# 回答根拠 (rationales.csv) 出力関連
# -------------------------------------
def initialize_rationales_csv(filepath):
# CSV header作成
pass
def write_rationales_csv(filepath, question_id, top_records):
# 上位チャンクを出力
pass
# -------------------------------------
# 企業名判定
# -------------------------------------
def detect_company_name(question, synonyms_dict):
"""
質問文から企業名(別名含む)を探し、見つかったら正規化された社名を返す
"""
q_lower = question.lower()
for canonical_name, aliases in synonyms_dict.items():
check_list = [canonical_name.lower()] + [a.lower() for a in aliases]
for candidate in check_list:
if candidate in q_lower:
return canonical_name
return None
def create_openai_client(api_key: str, endpoint: str):
"""Azure OpenAI クライアント生成"""
client = AzureOpenAI(
api_key=api_key,
api_version="your_version",
azure_endpoint=your_endpoint
)
return client
# -------------------------------------
# メイン処理
# -------------------------------------
def main():
# 1) クライアント生成
client = create_openai_client(
api_key="api_key",
api_version="your_version",
azure_endpoint="your_endpoint"
)
load_vectorstore()
# 2) 質問CSVを読み込み
query_csv = "query.csv" # 質問集
predictions_csv = "predictions.csv" # 回答
initialize_rationales_csv(RATIONALES_CSV)
predictions = []
if os.path.isfile(query_csv):
with open(query_csv, "r", encoding="utf-8") as qf:
reader = csv.reader(qf)
for i, row in enumerate(reader, start=1):
if len(row) < 2:
continue
q_index, question = row[0], row[1]
# 企業名を判定
detected_company = detect_company_name(question, COMPANY_SYNONYMS)
if not detected_company:
# 企業名がわからない場合、キーワード検索で最も合致するチャンクから企業名を推定
pass
# 質問分解
if llm_should_split_question(client, question):
sub_question = llm_split_question(client, question)
intermediate_answer, _ = execute_query_pipeline(client, sub_question, detected_company)
new_question = llm_generate_new_question_from_intermediate_answer(
client, question, intermediate_answer, sub_question
)
final_question = new_question
else:
final_question = question
# 通常パイプラインを実行
answer, final_top_records = execute_query_pipeline(client, final_question, detected_company)
print(f"\n[質問{q_index}] {question}")
if detected_company:
print(f" -> 検出企業名: {detected_company}")
print(f"[最終質問] {final_question}")
print(f"[回答{q_index}] {answer}")
predictions.append((q_index, answer))
write_rationales_csv(RATIONALES_CSV, q_index, final_top_records)
# 回答CSV出力
with open(predictions_csv, "w", encoding="utf-8", newline='') as pf:
for (num, ans) in predictions:
pf.write(f"{num},{ans}\n")
print(f"[Output] predictions.csv 生成完了")
print(f"[Output] rationales.csv 生成完了")
else:
print(f"[Info] {query_csv} not found.")
if __name__ == "__main__":
main()
コード解説
- メモリへのベクトルストア読込 (load_vectorstore)
- 前段階で.index (FAISS) と .json (メタ情報) が生成されている想定。
- 読み込んだデータは VECTORSTORE_ITEMS リストに格納しておき、クエリごとに検索できるようにします。
- 質問ごとの処理
- query.csvをループしながら回答を出力します。
- 企業名のシノニムを用いて detect_company_name で企業名を推定。見つからない場合はキーワード検索で最適な企業を補完するロジックも用意しています。
- 企業名で検索範囲を絞るのは速度改善です。(企業間の比較は出来なくなるので、したければフィルタを外すだけ)
- ポイント(割合)の計算ですが、上昇率や差分は意図しない計算が実行されることがあるので、プロンプトで指定しています。
- 質問分解 (多重質問対応)
- llm_should_split_question によって複数要素の質問かどうかを判定。
- 必要に応じて llm_split_question で質問を一部に絞り、一度検索&回答してから llm_generate_new_question_from_intermediate_answer を呼び出し、段階的に処理します。
- ハイブリッド検索 (hybrid_search) と RRF 統合 (reciprocal_rank_fusion)
- 質問を Embedding してベクトル検索を行いつつ、キーワード検索(LLMが抽出したキーワード)でも並行して検索。
- それらを統合する際、RRF (Reciprocal Rank Fusion) を用いてスコアを合成します。
- LLMによる回答生成
- 上位チャンクを「コンテキスト」として LLM に与え、本質問に対する回答を生成。
- 回答の根拠としてどのチャンクを使用したかは、rationales.csv に書き出して可視化します。
- 結果の出力
- predictions.csv に回答をまとめて書き出します。
ここがポイント
-
複雑な質問への対応
1つの質問に複数の問い合わせが含まれる場合、LLMによる質問分解や中間回答の連結を活用します。
例えば「A社の**商品ブランドは何年何月に発売されましたか?」のような質問は、「A社の**商品ブランドは?」という質問になりその答えを踏まえて、新しい質問を生成します。
例)
質問:A社の**商品ブランドは何年何月に発売されましたか?
分解質問:A社の**商品ブランドは?
分解答え:Bです。
新しい質問:A社のBは何年何月に発売されましたか?
答え:**年**月です
となります。
GraphRAGにしてしまえばこの辺は気にしなくても、目的のチャンクを特定できると思いますが、文字を列挙するような問題についての回答精度が落ちるので、問題を分類する処理が必要です。 -
RRF などのスコア統合
Cohereなどのリランカーモデルを使用すれば再現性が高くなると思います。 -
回答の制御
LLM が返す回答形式を厳密に指定したい場合、Function Calling や JSON 出力などの手法が考えられます。
「回答の根拠を引用せよ」「数字は小数第2位で四捨五入」といったプロンプト制御も有効です。
しかし、実際の運用では、根拠の部分を含めて回答させた方が良いのでそんなに気にしなくてもいいと思います。
まとめ
RAG システムを運用する上では、モデルバージョンの更新 や プロンプト改善、ハイパーパラメータ調整 などを継続的に行い、回答精度を高めていくことが鍵となります。
本記事が少しでもその参考になれば幸いです。ご質問や提案などあれば、ぜひコメントやIssueなどで共有いただければと思います。