はじめに
Part 1で基礎概念を学び、Part 2で検索システムの実装方法を習得しました。このPart 3では、いよいよ実践的なRAG(検索拡張生成)アプリケーションの構築に挑戦します!
Part 3で学ぶこと:
- インデクサーとスキルセットを使った自動化
- 統合ベクトル化パイプラインの構築
- Azure OpenAIとの連携によるRAG実装
- 本番運用のベストプラクティス
- モニタリングとトラブルシューティング
それでは、ChatGPTのような対話型AIアプリケーションを作っていきましょう!
1. インデクサーとスキルセットの基礎
1.1 インデクサーとは?
おさらい:
インデクサーは「データ収集ロボット」でしたね。手動でデータをアップロードする代わりに、自動的にデータソースから取得してインデックスに登録してくれます。
インデクサーが解決する課題:
手動アップロードの場合:
# 新しいPDFが追加されるたびに...
for pdf_file in new_pdfs:
text = extract_text(pdf_file) # 手動で抽出
vector = generate_embedding(text) # 手動でベクトル化
upload_to_index(text, vector) # 手動でアップロード
# 大変!
インデクサーを使う場合:
1. データソースとインデクサーを設定
2. スケジュールを設定(例: 毎日深夜に実行)
3. あとは自動でやってくれる!
1.2 データソースの作成
まず、データがどこにあるかを教えます。例として、Azure Blob Storageを使います。
データソースの定義:
{
"name": "my-blob-datasource",
"type": "azureblob",
"credentials": {
"connectionString": "DefaultEndpointsProtocol=https;AccountName=mystorageaccount;AccountKey=xxx;EndpointSuffix=core.windows.net"
},
"container": {
"name": "documents",
"query": null
}
}
REST APIで作成:
curl -X POST "https://my-search-service-001.search.windows.net/datasources?api-version=2024-07-01" \
-H "Content-Type: application/json" \
-H "api-key: YOUR_ADMIN_KEY" \
-d @datasource.json
Pythonでの実装:
import requests
def create_datasource():
"""データソースを作成"""
url = f"{endpoint}/datasources?api-version=2024-07-01"
headers = {
"Content-Type": "application/json",
"api-key": api_key
}
datasource = {
"name": "my-blob-datasource",
"type": "azureblob",
"credentials": {
"connectionString": "YOUR_CONNECTION_STRING"
},
"container": {
"name": "documents"
}
}
response = requests.post(url, headers=headers, json=datasource)
if response.status_code == 201:
print("データソースが作成されました!")
else:
print(f"エラー: {response.status_code}")
print(response.text)
create_datasource()
サポートされるデータソース:
- Azure Blob Storage(PDF、Word、Excel、画像など)
- Azure SQL Database
- Azure Cosmos DB
- Azure Table Storage
- Azure Data Lake Storage Gen2
2. スキルセットによるAI加工
2.1 スキルセットとは?
おさらい:
スキルセットは、データを検索インデックスに入れる前に「AI加工」する仕組みでしたね。
典型的なスキルパイプライン:
PDF文書
↓ (1) Text Split Skill: 文書を適切なサイズに分割
チャンク(小さな断片)
↓ (2) Azure OpenAI Embedding Skill: ベクトル化
ベクトル埋め込み
↓ (3) インデックスに登録
検索可能なインデックス
2.2 統合ベクトル化スキルセット
2024-07-01でGAになった最新機能!
これまで別々に実装していた「チャンク化」と「ベクトル化」を、スキルセット内で自動実行できます。
完全なスキルセット定義:
{
"name": "my-skillset",
"description": "PDFをチャンク化してベクトル化するスキルセット",
"skills": [
{
"@odata.type": "#Microsoft.Skills.Text.SplitSkill",
"name": "text-splitter",
"description": "テキストを適切なサイズに分割",
"context": "/document",
"textSplitMode": "pages",
"maximumPageLength": 2000,
"pageOverlapLength": 500,
"inputs": [
{
"name": "text",
"source": "/document/content"
}
],
"outputs": [
{
"name": "textItems",
"targetName": "chunks"
}
]
},
{
"@odata.type": "#Microsoft.Skills.Text.AzureOpenAIEmbeddingSkill",
"name": "embedding-generator",
"description": "チャンクをベクトル化",
"context": "/document/chunks/*",
"resourceUri": "https://your-openai.openai.azure.com/",
"apiKey": "YOUR_OPENAI_KEY",
"deploymentId": "text-embedding-ada-002",
"modelName": "text-embedding-ada-002",
"dimensions": 1536,
"inputs": [
{
"name": "text",
"source": "/document/chunks/*"
}
],
"outputs": [
{
"name": "embedding",
"targetName": "vector"
}
]
}
],
"indexProjections": {
"selectors": [
{
"targetIndexName": "blog-index",
"parentKeyFieldName": "parent_id",
"sourceContext": "/document/chunks/*",
"mappings": [
{
"name": "chunk_id",
"source": "/document/chunks/*/chunk_id"
},
{
"name": "content",
"source": "/document/chunks/*"
},
{
"name": "contentVector",
"source": "/document/chunks/*/vector"
},
{
"name": "title",
"source": "/document/metadata_storage_name"
}
]
}
]
}
}
重要なパラメータの説明:
| パラメータ | 説明 | 推奨値 |
|---|---|---|
| maximumPageLength | 1チャンクの最大文字数 | 2000-4000(LLMのコンテキストサイズに応じて) |
| pageOverlapLength | チャンク間の重複文字数 | 500-1000(文脈の連続性を保つため) |
| dimensions | 埋め込みベクトルの次元数 | 1536(text-embedding-ada-002の場合) |
チャンクサイズの選び方:
小さすぎる(500文字):
✗ 文脈が失われる
✗ 検索結果が多すぎる
適切(2000文字):
✓ 十分な文脈を保持
✓ LLMのコンテキストに収まる
大きすぎる(8000文字):
✗ 関連性の低い情報も含まれる
✗ LLMのコンテキストを圧迫
2.3 スキルセットの作成
def create_skillset():
"""スキルセットを作成"""
url = f"{endpoint}/skillsets?api-version=2024-07-01"
headers = {
"Content-Type": "application/json",
"api-key": api_key
}
skillset = {
"name": "my-skillset",
"description": "統合ベクトル化パイプライン",
"skills": [
{
"@odata.type": "#Microsoft.Skills.Text.SplitSkill",
"context": "/document",
"textSplitMode": "pages",
"maximumPageLength": 2000,
"pageOverlapLength": 500,
"inputs": [
{
"name": "text",
"source": "/document/content"
}
],
"outputs": [
{
"name": "textItems",
"targetName": "chunks"
}
]
},
{
"@odata.type": "#Microsoft.Skills.Text.AzureOpenAIEmbeddingSkill",
"context": "/document/chunks/*",
"resourceUri": "https://your-openai.openai.azure.com/",
"apiKey": "YOUR_OPENAI_KEY",
"deploymentId": "text-embedding-ada-002",
"modelName": "text-embedding-ada-002",
"dimensions": 1536,
"inputs": [
{
"name": "text",
"source": "/document/chunks/*"
}
],
"outputs": [
{
"name": "embedding",
"targetName": "vector"
}
]
}
]
}
response = requests.post(url, headers=headers, json=skillset)
if response.status_code == 201:
print("スキルセットが作成されました!")
else:
print(f"エラー: {response.status_code}")
print(response.text)
create_skillset()
3. インデクサーの作成と実行
3.1 インデクサーの定義
データソース、スキルセット、インデックスを繋げます。
{
"name": "my-indexer",
"dataSourceName": "my-blob-datasource",
"targetIndexName": "blog-index",
"skillsetName": "my-skillset",
"schedule": {
"interval": "PT24H",
"startTime": "2024-11-01T00:00:00Z"
},
"parameters": {
"batchSize": 10,
"maxFailedItems": 0,
"maxFailedItemsPerBatch": 0,
"configuration": {
"dataToExtract": "contentAndMetadata",
"parsingMode": "default",
"imageAction": "generateNormalizedImages"
}
},
"fieldMappings": [
{
"sourceFieldName": "metadata_storage_name",
"targetFieldName": "title"
},
{
"sourceFieldName": "metadata_storage_last_modified",
"targetFieldName": "publishedDate"
}
],
"outputFieldMappings": [
{
"sourceFieldName": "/document/chunks/*/vector",
"targetFieldName": "contentVector"
}
]
}
スケジュール設定:
| インターバル | 説明 | 用途 |
|---|---|---|
| PT5M | 5分ごと | リアルタイム更新が必要な場合 |
| PT1H | 1時間ごと | 頻繁に更新されるデータ |
| PT24H | 1日1回 | 通常のドキュメント管理 |
3.2 インデクサーの作成と実行
def create_and_run_indexer():
"""インデクサーを作成して実行"""
# インデクサーを作成
url = f"{endpoint}/indexers?api-version=2024-07-01"
headers = {
"Content-Type": "application/json",
"api-key": api_key
}
indexer = {
"name": "my-indexer",
"dataSourceName": "my-blob-datasource",
"targetIndexName": "blog-index",
"skillsetName": "my-skillset",
"schedule": {
"interval": "PT24H"
},
"parameters": {
"batchSize": 10,
"configuration": {
"dataToExtract": "contentAndMetadata",
"parsingMode": "default"
}
}
}
response = requests.post(url, headers=headers, json=indexer)
if response.status_code == 201:
print("インデクサーが作成されました!")
# インデクサーを実行
run_url = f"{endpoint}/indexers/my-indexer/run?api-version=2024-07-01"
run_response = requests.post(run_url, headers=headers)
if run_response.status_code == 202:
print("インデクサーの実行を開始しました!")
else:
print(f"実行エラー: {run_response.status_code}")
else:
print(f"作成エラー: {response.status_code}")
print(response.text)
create_and_run_indexer()
3.3 インデクサーのステータス確認
def check_indexer_status():
"""インデクサーの実行状態を確認"""
url = f"{endpoint}/indexers/my-indexer/status?api-version=2024-07-01"
headers = {"api-key": api_key}
response = requests.get(url, headers=headers)
if response.status_code == 200:
status = response.json()
print(f"ステータス: {status['status']}")
print(f"最終実行結果: {status['lastResult']['status']}")
print(f"処理ドキュメント数: {status['lastResult']['itemsProcessed']}")
print(f"失敗数: {status['lastResult']['itemsFailed']}")
# エラーがあれば表示
if status['lastResult'].get('errors'):
print("\nエラー:")
for error in status['lastResult']['errors']:
print(f" - {error['errorMessage']}")
else:
print(f"エラー: {response.status_code}")
# 定期的にチェック
import time
for i in range(10):
check_indexer_status()
time.sleep(10) # 10秒待機
4. RAGアプリケーションの実装
4.1 RAGパターンの全体フロー
ユーザー: "2024年の売上目標は?"
↓
(1) クエリベクトル化
↓
(2) Azure AI Search でハイブリッド検索
- キーワード検索: "2024年" "売上" "目標"
- ベクトル検索: 意味的に類似した文書
↓
(3) 関連ドキュメントを取得
- 「2024年度事業計画書」のチャンク
- 「売上目標に関する資料」のチャンク
↓
(4) コンテキストを作成
↓
(5) Azure OpenAI にプロンプトと共に送信
↓
回答: "2024年度の売上目標は前年比120%の30億円です..."
4.2 RAG実装コード
import openai
from typing import List, Dict
# Azure OpenAI の設定
openai.api_type = "azure"
openai.api_base = "https://your-openai.openai.azure.com/"
openai.api_version = "2024-02-15-preview"
openai.api_key = "YOUR_OPENAI_KEY"
def retrieve_relevant_documents(query: str, top_k: int = 5) -> List[Dict]:
"""
関連ドキュメントを検索
"""
# クエリをベクトル化
embedding_response = openai.Embedding.create(
engine="text-embedding-ada-002",
input=query
)
query_vector = embedding_response['data'][0]['embedding']
# ハイブリッド検索(セマンティックランキング付き)
url = f"{endpoint}/indexes/blog-index/docs/search?api-version=2024-07-01"
headers = {
"Content-Type": "application/json",
"api-key": api_key
}
payload = {
"search": query,
"vectorQueries": [
{
"kind": "vector",
"vector": query_vector,
"fields": "contentVector",
"k": 50
}
],
"queryType": "semantic",
"semanticConfiguration": "my-semantic-config",
"select": "id,title,content,category",
"top": top_k
}
response = requests.post(url, headers=headers, json=payload)
if response.status_code == 200:
result = response.json()
return result['value']
else:
raise Exception(f"検索エラー: {response.status_code}")
def generate_answer(query: str, documents: List[Dict]) -> str:
"""
検索結果を使って回答を生成
"""
# コンテキストを作成
context = "\n\n".join([
f"【文書{i+1}: {doc['title']}】\n{doc['content']}"
for i, doc in enumerate(documents)
])
# プロンプトを構築
system_message = """あなたは親切なアシスタントです。
以下の文書を参考にして、ユーザーの質問に正確に答えてください。
重要なルール:
1. 提供された文書の情報のみを使って回答してください
2. 文書に情報がない場合は「提供された文書には該当する情報がありません」と答えてください
3. 回答の根拠となった文書を明示してください
4. 日本語で丁寧に回答してください
"""
user_message = f"""参考文書:
{context}
質問: {query}
上記の文書を参考にして回答してください。"""
# Azure OpenAI で回答を生成
completion = openai.ChatCompletion.create(
engine="gpt-4", # または "gpt-35-turbo"
messages=[
{"role": "system", "content": system_message},
{"role": "user", "content": user_message}
],
temperature=0.3, # 創造性を抑えて正確性重視
max_tokens=1000
)
return completion.choices[0].message['content']
def rag_query(query: str) -> Dict:
"""
RAGパターンでクエリを処理
"""
print(f"質問: {query}\n")
# 1. 関連ドキュメントを検索
print("関連ドキュメントを検索中...")
documents = retrieve_relevant_documents(query, top_k=3)
print(f"関連ドキュメント {len(documents)}件を発見\n")
for i, doc in enumerate(documents):
print(f"{i+1}. {doc['title']} (スコア: {doc['@search.score']:.4f})")
# 2. 回答を生成
print("\n回答を生成中...")
answer = generate_answer(query, documents)
print(f"\n=== 回答 ===")
print(answer)
return {
"query": query,
"answer": answer,
"sources": documents
}
# 実行例
result = rag_query("Azure AI Searchのベクトル検索機能について教えてください")
4.3 ストリーミング対応(リアルタイム表示)
ChatGPTのように、回答を少しずつ表示する実装:
def generate_answer_stream(query: str, documents: List[Dict]):
"""
ストリーミングで回答を生成(リアルタイム表示)
"""
context = "\n\n".join([
f"【文書{i+1}: {doc['title']}】\n{doc['content']}"
for i, doc in enumerate(documents)
])
system_message = """あなたは親切なアシスタントです。
提供された文書を参考にして、ユーザーの質問に正確に答えてください。"""
user_message = f"""参考文書:
{context}
質問: {query}"""
# ストリーミングレスポンス
response = openai.ChatCompletion.create(
engine="gpt-4",
messages=[
{"role": "system", "content": system_message},
{"role": "user", "content": user_message}
],
temperature=0.3,
max_tokens=1000,
stream=True # ストリーミングを有効化
)
print("\n=== 回答 ===")
full_answer = ""
for chunk in response:
if chunk.choices[0].delta.get('content'):
content = chunk.choices[0].delta['content']
print(content, end='', flush=True)
full_answer += content
print("\n")
return full_answer
def rag_query_stream(query: str):
"""ストリーミング対応RAG"""
print(f"質問: {query}\n")
# 関連ドキュメントを検索
documents = retrieve_relevant_documents(query, top_k=3)
print(f"関連ドキュメント {len(documents)}件\n")
# 回答をストリーミング生成
answer = generate_answer_stream(query, documents)
return {
"query": query,
"answer": answer,
"sources": documents
}
# 実行例
rag_query_stream("Azure AI Searchの料金体系について教えてください")
4.4 会話履歴を考慮したRAG
複数回の質問に対応:
class ConversationalRAG:
"""会話履歴を保持するRAGシステム"""
def __init__(self):
self.conversation_history = []
def query(self, user_message: str) -> Dict:
"""会話履歴を考慮したクエリ処理"""
# 1. 会話履歴から検索クエリを生成
search_query = self._generate_search_query(user_message)
# 2. 関連ドキュメントを検索
documents = retrieve_relevant_documents(search_query, top_k=3)
# 3. 会話履歴とドキュメントを含めて回答生成
answer = self._generate_contextual_answer(
user_message,
documents
)
# 4. 会話履歴に追加
self.conversation_history.append({
"role": "user",
"content": user_message
})
self.conversation_history.append({
"role": "assistant",
"content": answer
})
return {
"query": user_message,
"answer": answer,
"sources": documents
}
def _generate_search_query(self, user_message: str) -> str:
"""会話履歴から検索クエリを生成"""
if not self.conversation_history:
return user_message
# 会話履歴を考慮してクエリを改善
messages = [
{"role": "system", "content": "会話の文脈から適切な検索クエリを生成してください。"},
*self.conversation_history[-4:], # 直近2往復分
{"role": "user", "content": f"次の質問に対する検索クエリを生成: {user_message}"}
]
response = openai.ChatCompletion.create(
engine="gpt-4",
messages=messages,
temperature=0,
max_tokens=100
)
return response.choices[0].message['content']
def _generate_contextual_answer(
self,
user_message: str,
documents: List[Dict]
) -> str:
"""会話履歴を考慮して回答生成"""
context = "\n\n".join([
f"【文書{i+1}】\n{doc['content']}"
for i, doc in enumerate(documents)
])
messages = [
{"role": "system", "content": """あなたは親切なアシスタントです。
提供された文書と会話履歴を参考にして回答してください。"""},
*self.conversation_history[-6:], # 直近3往復分
{"role": "user", "content": f"""参考文書:
{context}
質問: {user_message}"""}
]
response = openai.ChatCompletion.create(
engine="gpt-4",
messages=messages,
temperature=0.3,
max_tokens=1000
)
return response.choices[0].message['content']
# 使用例
rag = ConversationalRAG()
print("=== 1回目の質問 ===")
result1 = rag.query("Azure AI Searchとは何ですか?")
print(result1['answer'])
print("\n=== 2回目の質問(前の文脈を考慮) ===")
result2 = rag.query("その料金はいくらですか?")
print(result2['answer'])
print("\n=== 3回目の質問 ===")
result3 = rag.query("無料プランはありますか?")
print(result3['answer'])
5. 本番運用のベストプラクティス
5.1 エラーハンドリング
本番環境では、堅牢なエラー処理が必須です。
import logging
from tenacity import retry, stop_after_attempt, wait_exponential
# ロギング設定
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=2, max=10)
)
def retrieve_with_retry(query: str, top_k: int = 5) -> List[Dict]:
"""リトライ機能付き検索"""
try:
# クエリをベクトル化
embedding_response = openai.Embedding.create(
engine="text-embedding-ada-002",
input=query
)
query_vector = embedding_response['data'][0]['embedding']
# 検索実行
url = f"{endpoint}/indexes/blog-index/docs/search?api-version=2024-07-01"
headers = {
"Content-Type": "application/json",
"api-key": api_key
}
payload = {
"search": query,
"vectorQueries": [{
"kind": "vector",
"vector": query_vector,
"fields": "contentVector",
"k": 50
}],
"top": top_k
}
response = requests.post(url, headers=headers, json=payload, timeout=30)
response.raise_for_status()
result = response.json()
logger.info(f"検索成功: {len(result['value'])}件のドキュメント取得")
return result['value']
except openai.error.RateLimitError as e:
logger.warning(f"レート制限エラー: {e}")
raise
except requests.exceptions.Timeout:
logger.error("検索タイムアウト")
raise
except requests.exceptions.HTTPError as e:
logger.error(f"HTTP エラー: {e}")
raise
except Exception as e:
logger.error(f"予期しないエラー: {e}")
raise
def safe_rag_query(query: str) -> Dict:
"""安全なRAGクエリ実行"""
try:
documents = retrieve_with_retry(query)
if not documents:
return {
"query": query,
"answer": "申し訳ございません。関連する情報が見つかりませんでした。",
"sources": [],
"error": None
}
answer = generate_answer(query, documents)
return {
"query": query,
"answer": answer,
"sources": documents,
"error": None
}
except Exception as e:
logger.error(f"RAGクエリ失敗: {e}")
return {
"query": query,
"answer": "申し訳ございません。現在サービスに問題が発生しています。しばらく時間をおいてから再度お試しください。",
"sources": [],
"error": str(e)
}
5.2 パフォーマンス最適化
from functools import lru_cache
import hashlib
# 埋め込みキャッシュ
@lru_cache(maxsize=1000)
def cached_embedding(text: str) -> List[float]:
"""埋め込みをキャッシュ"""
response = openai.Embedding.create(
engine="text-embedding-ada-002",
input=text
)
return response['data'][0]['embedding']
def optimized_retrieve(query: str, top_k: int = 5) -> List[Dict]:
"""最適化された検索"""
# キャッシュされた埋め込みを使用
query_vector = cached_embedding(query)
# バッチサイズを調整してパフォーマンス向上
payload = {
"search": query,
"vectorQueries": [{
"kind": "vector",
"vector": query_vector,
"fields": "contentVector",
"k": min(top_k * 5, 50) # 適切な候補数
}],
"select": "id,title,content", # 必要なフィールドのみ
"top": top_k
}
# ... 検索実行
5.3 コスト最適化
class CostOptimizedRAG:
"""コストを最適化したRAG"""
def __init__(self):
self.embedding_cache = {}
self.search_cache = {}
def query(self, user_message: str, use_gpt4: bool = False) -> Dict:
"""コストを考慮したクエリ処理"""
# 1. 検索結果をキャッシュ
cache_key = hashlib.md5(user_message.encode()).hexdigest()
if cache_key in self.search_cache:
logger.info("キャッシュヒット: 検索をスキップ")
documents = self.search_cache[cache_key]
else:
documents = retrieve_relevant_documents(user_message)
self.search_cache[cache_key] = documents
# 2. モデルの選択(コストと品質のバランス)
engine = "gpt-4" if use_gpt4 else "gpt-35-turbo"
# 3. コンテキストを最小化
# 長いドキュメントは要約して使用
summarized_docs = [
{
"title": doc["title"],
"content": doc["content"][:500] # 最初の500文字のみ
}
for doc in documents[:3] # 上位3件のみ
]
answer = generate_answer(user_message, summarized_docs)
return {
"query": user_message,
"answer": answer,
"sources": documents,
"model_used": engine
}
6. モニタリングとトラブルシューティング
6.1 Azure Monitor との統合
from azure.monitor.opentelemetry import configure_azure_monitor
from opentelemetry import trace
# Azure Monitor の設定
configure_azure_monitor(
connection_string="YOUR_APPLICATION_INSIGHTS_CONNECTION_STRING"
)
tracer = trace.get_tracer(__name__)
def monitored_rag_query(query: str) -> Dict:
"""モニタリング付きRAGクエリ"""
with tracer.start_as_current_span("rag_query") as span:
span.set_attribute("query", query)
try:
# 検索フェーズ
with tracer.start_as_current_span("search"):
documents = retrieve_relevant_documents(query)
span.set_attribute("documents_found", len(documents))
# 生成フェーズ
with tracer.start_as_current_span("generate"):
answer = generate_answer(query, documents)
span.set_attribute("answer_length", len(answer))
span.set_attribute("status", "success")
return {
"query": query,
"answer": answer,
"sources": documents
}
except Exception as e:
span.set_attribute("status", "error")
span.set_attribute("error_message", str(e))
raise
6.2 パフォーマンスメトリクスの収集
import time
from dataclasses import dataclass
from typing import Optional
@dataclass
class QueryMetrics:
"""クエリのメトリクス"""
query: str
search_time: float
generation_time: float
total_time: float
documents_retrieved: int
tokens_used: Optional[int] = None
success: bool = True
error_message: Optional[str] = None
def measure_rag_performance(query: str) -> QueryMetrics:
"""RAGのパフォーマンスを測定"""
start_time = time.time()
try:
# 検索時間を測定
search_start = time.time()
documents = retrieve_relevant_documents(query)
search_time = time.time() - search_start
# 生成時間を測定
gen_start = time.time()
answer = generate_answer(query, documents)
gen_time = time.time() - gen_start
total_time = time.time() - start_time
return QueryMetrics(
query=query,
search_time=search_time,
generation_time=gen_time,
total_time=total_time,
documents_retrieved=len(documents),
success=True
)
except Exception as e:
return QueryMetrics(
query=query,
search_time=0,
generation_time=0,
total_time=time.time() - start_time,
documents_retrieved=0,
success=False,
error_message=str(e)
)
# 使用例
metrics = measure_rag_performance("Azure AI Searchについて教えてください")
print(f"検索時間: {metrics.search_time:.2f}秒")
print(f"生成時間: {metrics.generation_time:.2f}秒")
print(f"合計時間: {metrics.total_time:.2f}秒")
6.3 よくある問題と対処法
問題1: 検索結果が不正確
# 解決策: セマンティックランキングを有効化
payload = {
"search": query,
"vectorQueries": [...],
"queryType": "semantic", # ← これを追加
"semanticConfiguration": "my-semantic-config"
}
問題2: レスポンスが遅い
# 解決策: 並列処理とキャッシング
from concurrent.futures import ThreadPoolExecutor
def parallel_search(queries: List[str]) -> List[List[Dict]]:
"""複数クエリを並列実行"""
with ThreadPoolExecutor(max_workers=5) as executor:
results = list(executor.map(retrieve_relevant_documents, queries))
return results
問題3: コストが高い
# 解決策1: GPT-3.5 Turboを使用
engine = "gpt-35-turbo" # GPT-4より安価
# 解決策2: トークン数を削減
max_tokens = 500 # 必要最小限に設定
# 解決策3: キャッシングを活用
# (前述のコスト最適化を参照)
Part 3 のまとめ
お疲れさまでした!これでAzure AI Searchを使った本格的なRAGアプリケーションが構築できるようになりました。
押さえておきたいポイント:
✅ インデクサーとスキルセットで自動化
- 統合ベクトル化パイプラインで効率化
- スケジュール実行で常に最新状態を維持
✅ RAGパターンの実装
- ハイブリッド検索で最適な情報取得
- Azure OpenAIで自然な回答生成
- 会話履歴を考慮した対話型システム
✅ 本番運用の準備
- エラーハンドリングとリトライ機構
- パフォーマンスとコストの最適化
- モニタリングとトラブルシューティング
さらに学びたい方へ:
次のステップとして、以下のトピックも検討してみてください:
- マルチモーダル検索(画像 + テキスト)
- エージェント検索(Agentic Retrieval)
- カスタムスキルの開発
- セキュリティとコンプライアンス強化
Azure AI Searchは、継続的に新機能が追加されています。最新の情報は公式ドキュメントをチェックしましょう!
全体のまとめ(Part 1-3)
3つのパートを通して、Azure AI Searchの基礎から応用まで学びました:
Part 1: 基礎概念と用語の理解
Part 2: インデックス作成とベクトル検索の実装
Part 3: RAGアプリケーションと本番運用
これで、あなたも次世代の検索システムを構築できます!
ぜひ実際のプロジェクトで活用してみてください。分からないことがあれば、コミュニティやドキュメントを活用しながら、一歩ずつ進んでいきましょう。
Happy Searching! 🚀