0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Azure AI Searchで始める「次世代検索システム」の作り方 - Part 3: RAG実装編

Posted at

はじめに

Part 1で基礎概念を学び、Part 2で検索システムの実装方法を習得しました。このPart 3では、いよいよ実践的なRAG(検索拡張生成)アプリケーションの構築に挑戦します!

Part 3で学ぶこと:

  • インデクサーとスキルセットを使った自動化
  • 統合ベクトル化パイプラインの構築
  • Azure OpenAIとの連携によるRAG実装
  • 本番運用のベストプラクティス
  • モニタリングとトラブルシューティング

それでは、ChatGPTのような対話型AIアプリケーションを作っていきましょう!

1. インデクサーとスキルセットの基礎

1.1 インデクサーとは?

おさらい:
インデクサーは「データ収集ロボット」でしたね。手動でデータをアップロードする代わりに、自動的にデータソースから取得してインデックスに登録してくれます。

インデクサーが解決する課題:

手動アップロードの場合:

# 新しいPDFが追加されるたびに...
for pdf_file in new_pdfs:
    text = extract_text(pdf_file)  # 手動で抽出
    vector = generate_embedding(text)  # 手動でベクトル化
    upload_to_index(text, vector)  # 手動でアップロード
# 大変!

インデクサーを使う場合:

1. データソースとインデクサーを設定
2. スケジュールを設定(例: 毎日深夜に実行)
3. あとは自動でやってくれる!

1.2 データソースの作成

まず、データがどこにあるかを教えます。例として、Azure Blob Storageを使います。

データソースの定義:

{
  "name": "my-blob-datasource",
  "type": "azureblob",
  "credentials": {
    "connectionString": "DefaultEndpointsProtocol=https;AccountName=mystorageaccount;AccountKey=xxx;EndpointSuffix=core.windows.net"
  },
  "container": {
    "name": "documents",
    "query": null
  }
}

REST APIで作成:

curl -X POST "https://my-search-service-001.search.windows.net/datasources?api-version=2024-07-01" \
  -H "Content-Type: application/json" \
  -H "api-key: YOUR_ADMIN_KEY" \
  -d @datasource.json

Pythonでの実装:

import requests

def create_datasource():
    """データソースを作成"""
    
    url = f"{endpoint}/datasources?api-version=2024-07-01"
    headers = {
        "Content-Type": "application/json",
        "api-key": api_key
    }
    
    datasource = {
        "name": "my-blob-datasource",
        "type": "azureblob",
        "credentials": {
            "connectionString": "YOUR_CONNECTION_STRING"
        },
        "container": {
            "name": "documents"
        }
    }
    
    response = requests.post(url, headers=headers, json=datasource)
    
    if response.status_code == 201:
        print("データソースが作成されました!")
    else:
        print(f"エラー: {response.status_code}")
        print(response.text)

create_datasource()

サポートされるデータソース:

  • Azure Blob Storage(PDF、Word、Excel、画像など)
  • Azure SQL Database
  • Azure Cosmos DB
  • Azure Table Storage
  • Azure Data Lake Storage Gen2

2. スキルセットによるAI加工

2.1 スキルセットとは?

おさらい:
スキルセットは、データを検索インデックスに入れる前に「AI加工」する仕組みでしたね。

典型的なスキルパイプライン:

PDF文書
  ↓ (1) Text Split Skill: 文書を適切なサイズに分割
チャンク(小さな断片)
  ↓ (2) Azure OpenAI Embedding Skill: ベクトル化
ベクトル埋め込み
  ↓ (3) インデックスに登録
検索可能なインデックス

2.2 統合ベクトル化スキルセット

2024-07-01でGAになった最新機能!

これまで別々に実装していた「チャンク化」と「ベクトル化」を、スキルセット内で自動実行できます。

完全なスキルセット定義:

{
  "name": "my-skillset",
  "description": "PDFをチャンク化してベクトル化するスキルセット",
  "skills": [
    {
      "@odata.type": "#Microsoft.Skills.Text.SplitSkill",
      "name": "text-splitter",
      "description": "テキストを適切なサイズに分割",
      "context": "/document",
      "textSplitMode": "pages",
      "maximumPageLength": 2000,
      "pageOverlapLength": 500,
      "inputs": [
        {
          "name": "text",
          "source": "/document/content"
        }
      ],
      "outputs": [
        {
          "name": "textItems",
          "targetName": "chunks"
        }
      ]
    },
    {
      "@odata.type": "#Microsoft.Skills.Text.AzureOpenAIEmbeddingSkill",
      "name": "embedding-generator",
      "description": "チャンクをベクトル化",
      "context": "/document/chunks/*",
      "resourceUri": "https://your-openai.openai.azure.com/",
      "apiKey": "YOUR_OPENAI_KEY",
      "deploymentId": "text-embedding-ada-002",
      "modelName": "text-embedding-ada-002",
      "dimensions": 1536,
      "inputs": [
        {
          "name": "text",
          "source": "/document/chunks/*"
        }
      ],
      "outputs": [
        {
          "name": "embedding",
          "targetName": "vector"
        }
      ]
    }
  ],
  "indexProjections": {
    "selectors": [
      {
        "targetIndexName": "blog-index",
        "parentKeyFieldName": "parent_id",
        "sourceContext": "/document/chunks/*",
        "mappings": [
          {
            "name": "chunk_id",
            "source": "/document/chunks/*/chunk_id"
          },
          {
            "name": "content",
            "source": "/document/chunks/*"
          },
          {
            "name": "contentVector",
            "source": "/document/chunks/*/vector"
          },
          {
            "name": "title",
            "source": "/document/metadata_storage_name"
          }
        ]
      }
    ]
  }
}

重要なパラメータの説明:

パラメータ 説明 推奨値
maximumPageLength 1チャンクの最大文字数 2000-4000(LLMのコンテキストサイズに応じて)
pageOverlapLength チャンク間の重複文字数 500-1000(文脈の連続性を保つため)
dimensions 埋め込みベクトルの次元数 1536(text-embedding-ada-002の場合)

チャンクサイズの選び方:

小さすぎる(500文字):
  ✗ 文脈が失われる
  ✗ 検索結果が多すぎる
  
適切(2000文字):
  ✓ 十分な文脈を保持
  ✓ LLMのコンテキストに収まる
  
大きすぎる(8000文字):
  ✗ 関連性の低い情報も含まれる
  ✗ LLMのコンテキストを圧迫

2.3 スキルセットの作成

def create_skillset():
    """スキルセットを作成"""
    
    url = f"{endpoint}/skillsets?api-version=2024-07-01"
    headers = {
        "Content-Type": "application/json",
        "api-key": api_key
    }
    
    skillset = {
        "name": "my-skillset",
        "description": "統合ベクトル化パイプライン",
        "skills": [
            {
                "@odata.type": "#Microsoft.Skills.Text.SplitSkill",
                "context": "/document",
                "textSplitMode": "pages",
                "maximumPageLength": 2000,
                "pageOverlapLength": 500,
                "inputs": [
                    {
                        "name": "text",
                        "source": "/document/content"
                    }
                ],
                "outputs": [
                    {
                        "name": "textItems",
                        "targetName": "chunks"
                    }
                ]
            },
            {
                "@odata.type": "#Microsoft.Skills.Text.AzureOpenAIEmbeddingSkill",
                "context": "/document/chunks/*",
                "resourceUri": "https://your-openai.openai.azure.com/",
                "apiKey": "YOUR_OPENAI_KEY",
                "deploymentId": "text-embedding-ada-002",
                "modelName": "text-embedding-ada-002",
                "dimensions": 1536,
                "inputs": [
                    {
                        "name": "text",
                        "source": "/document/chunks/*"
                    }
                ],
                "outputs": [
                    {
                        "name": "embedding",
                        "targetName": "vector"
                    }
                ]
            }
        ]
    }
    
    response = requests.post(url, headers=headers, json=skillset)
    
    if response.status_code == 201:
        print("スキルセットが作成されました!")
    else:
        print(f"エラー: {response.status_code}")
        print(response.text)

create_skillset()

3. インデクサーの作成と実行

3.1 インデクサーの定義

データソース、スキルセット、インデックスを繋げます。

{
  "name": "my-indexer",
  "dataSourceName": "my-blob-datasource",
  "targetIndexName": "blog-index",
  "skillsetName": "my-skillset",
  "schedule": {
    "interval": "PT24H",
    "startTime": "2024-11-01T00:00:00Z"
  },
  "parameters": {
    "batchSize": 10,
    "maxFailedItems": 0,
    "maxFailedItemsPerBatch": 0,
    "configuration": {
      "dataToExtract": "contentAndMetadata",
      "parsingMode": "default",
      "imageAction": "generateNormalizedImages"
    }
  },
  "fieldMappings": [
    {
      "sourceFieldName": "metadata_storage_name",
      "targetFieldName": "title"
    },
    {
      "sourceFieldName": "metadata_storage_last_modified",
      "targetFieldName": "publishedDate"
    }
  ],
  "outputFieldMappings": [
    {
      "sourceFieldName": "/document/chunks/*/vector",
      "targetFieldName": "contentVector"
    }
  ]
}

スケジュール設定:

インターバル 説明 用途
PT5M 5分ごと リアルタイム更新が必要な場合
PT1H 1時間ごと 頻繁に更新されるデータ
PT24H 1日1回 通常のドキュメント管理

3.2 インデクサーの作成と実行

def create_and_run_indexer():
    """インデクサーを作成して実行"""
    
    # インデクサーを作成
    url = f"{endpoint}/indexers?api-version=2024-07-01"
    headers = {
        "Content-Type": "application/json",
        "api-key": api_key
    }
    
    indexer = {
        "name": "my-indexer",
        "dataSourceName": "my-blob-datasource",
        "targetIndexName": "blog-index",
        "skillsetName": "my-skillset",
        "schedule": {
            "interval": "PT24H"
        },
        "parameters": {
            "batchSize": 10,
            "configuration": {
                "dataToExtract": "contentAndMetadata",
                "parsingMode": "default"
            }
        }
    }
    
    response = requests.post(url, headers=headers, json=indexer)
    
    if response.status_code == 201:
        print("インデクサーが作成されました!")
        
        # インデクサーを実行
        run_url = f"{endpoint}/indexers/my-indexer/run?api-version=2024-07-01"
        run_response = requests.post(run_url, headers=headers)
        
        if run_response.status_code == 202:
            print("インデクサーの実行を開始しました!")
        else:
            print(f"実行エラー: {run_response.status_code}")
    else:
        print(f"作成エラー: {response.status_code}")
        print(response.text)

create_and_run_indexer()

3.3 インデクサーのステータス確認

def check_indexer_status():
    """インデクサーの実行状態を確認"""
    
    url = f"{endpoint}/indexers/my-indexer/status?api-version=2024-07-01"
    headers = {"api-key": api_key}
    
    response = requests.get(url, headers=headers)
    
    if response.status_code == 200:
        status = response.json()
        
        print(f"ステータス: {status['status']}")
        print(f"最終実行結果: {status['lastResult']['status']}")
        print(f"処理ドキュメント数: {status['lastResult']['itemsProcessed']}")
        print(f"失敗数: {status['lastResult']['itemsFailed']}")
        
        # エラーがあれば表示
        if status['lastResult'].get('errors'):
            print("\nエラー:")
            for error in status['lastResult']['errors']:
                print(f"  - {error['errorMessage']}")
    else:
        print(f"エラー: {response.status_code}")

# 定期的にチェック
import time

for i in range(10):
    check_indexer_status()
    time.sleep(10)  # 10秒待機

4. RAGアプリケーションの実装

4.1 RAGパターンの全体フロー

ユーザー: "2024年の売上目標は?"
  ↓
(1) クエリベクトル化
  ↓
(2) Azure AI Search でハイブリッド検索
  - キーワード検索: "2024年" "売上" "目標"
  - ベクトル検索: 意味的に類似した文書
  ↓
(3) 関連ドキュメントを取得
  - 「2024年度事業計画書」のチャンク
  - 「売上目標に関する資料」のチャンク
  ↓
(4) コンテキストを作成
  ↓
(5) Azure OpenAI にプロンプトと共に送信
  ↓
回答: "2024年度の売上目標は前年比120%の30億円です..."

4.2 RAG実装コード

import openai
from typing import List, Dict

# Azure OpenAI の設定
openai.api_type = "azure"
openai.api_base = "https://your-openai.openai.azure.com/"
openai.api_version = "2024-02-15-preview"
openai.api_key = "YOUR_OPENAI_KEY"

def retrieve_relevant_documents(query: str, top_k: int = 5) -> List[Dict]:
    """
    関連ドキュメントを検索
    """
    # クエリをベクトル化
    embedding_response = openai.Embedding.create(
        engine="text-embedding-ada-002",
        input=query
    )
    query_vector = embedding_response['data'][0]['embedding']
    
    # ハイブリッド検索(セマンティックランキング付き)
    url = f"{endpoint}/indexes/blog-index/docs/search?api-version=2024-07-01"
    headers = {
        "Content-Type": "application/json",
        "api-key": api_key
    }
    
    payload = {
        "search": query,
        "vectorQueries": [
            {
                "kind": "vector",
                "vector": query_vector,
                "fields": "contentVector",
                "k": 50
            }
        ],
        "queryType": "semantic",
        "semanticConfiguration": "my-semantic-config",
        "select": "id,title,content,category",
        "top": top_k
    }
    
    response = requests.post(url, headers=headers, json=payload)
    
    if response.status_code == 200:
        result = response.json()
        return result['value']
    else:
        raise Exception(f"検索エラー: {response.status_code}")

def generate_answer(query: str, documents: List[Dict]) -> str:
    """
    検索結果を使って回答を生成
    """
    # コンテキストを作成
    context = "\n\n".join([
        f"【文書{i+1}: {doc['title']}\n{doc['content']}"
        for i, doc in enumerate(documents)
    ])
    
    # プロンプトを構築
    system_message = """あなたは親切なアシスタントです。
以下の文書を参考にして、ユーザーの質問に正確に答えてください。

重要なルール:
1. 提供された文書の情報のみを使って回答してください
2. 文書に情報がない場合は「提供された文書には該当する情報がありません」と答えてください
3. 回答の根拠となった文書を明示してください
4. 日本語で丁寧に回答してください
"""
    
    user_message = f"""参考文書:
{context}

質問: {query}

上記の文書を参考にして回答してください。"""
    
    # Azure OpenAI で回答を生成
    completion = openai.ChatCompletion.create(
        engine="gpt-4",  # または "gpt-35-turbo"
        messages=[
            {"role": "system", "content": system_message},
            {"role": "user", "content": user_message}
        ],
        temperature=0.3,  # 創造性を抑えて正確性重視
        max_tokens=1000
    )
    
    return completion.choices[0].message['content']

def rag_query(query: str) -> Dict:
    """
    RAGパターンでクエリを処理
    """
    print(f"質問: {query}\n")
    
    # 1. 関連ドキュメントを検索
    print("関連ドキュメントを検索中...")
    documents = retrieve_relevant_documents(query, top_k=3)
    
    print(f"関連ドキュメント {len(documents)}件を発見\n")
    for i, doc in enumerate(documents):
        print(f"{i+1}. {doc['title']} (スコア: {doc['@search.score']:.4f})")
    
    # 2. 回答を生成
    print("\n回答を生成中...")
    answer = generate_answer(query, documents)
    
    print(f"\n=== 回答 ===")
    print(answer)
    
    return {
        "query": query,
        "answer": answer,
        "sources": documents
    }

# 実行例
result = rag_query("Azure AI Searchのベクトル検索機能について教えてください")

4.3 ストリーミング対応(リアルタイム表示)

ChatGPTのように、回答を少しずつ表示する実装:

def generate_answer_stream(query: str, documents: List[Dict]):
    """
    ストリーミングで回答を生成(リアルタイム表示)
    """
    context = "\n\n".join([
        f"【文書{i+1}: {doc['title']}\n{doc['content']}"
        for i, doc in enumerate(documents)
    ])
    
    system_message = """あなたは親切なアシスタントです。
提供された文書を参考にして、ユーザーの質問に正確に答えてください。"""
    
    user_message = f"""参考文書:
{context}

質問: {query}"""
    
    # ストリーミングレスポンス
    response = openai.ChatCompletion.create(
        engine="gpt-4",
        messages=[
            {"role": "system", "content": system_message},
            {"role": "user", "content": user_message}
        ],
        temperature=0.3,
        max_tokens=1000,
        stream=True  # ストリーミングを有効化
    )
    
    print("\n=== 回答 ===")
    full_answer = ""
    
    for chunk in response:
        if chunk.choices[0].delta.get('content'):
            content = chunk.choices[0].delta['content']
            print(content, end='', flush=True)
            full_answer += content
    
    print("\n")
    return full_answer

def rag_query_stream(query: str):
    """ストリーミング対応RAG"""
    
    print(f"質問: {query}\n")
    
    # 関連ドキュメントを検索
    documents = retrieve_relevant_documents(query, top_k=3)
    
    print(f"関連ドキュメント {len(documents)}\n")
    
    # 回答をストリーミング生成
    answer = generate_answer_stream(query, documents)
    
    return {
        "query": query,
        "answer": answer,
        "sources": documents
    }

# 実行例
rag_query_stream("Azure AI Searchの料金体系について教えてください")

4.4 会話履歴を考慮したRAG

複数回の質問に対応:

class ConversationalRAG:
    """会話履歴を保持するRAGシステム"""
    
    def __init__(self):
        self.conversation_history = []
    
    def query(self, user_message: str) -> Dict:
        """会話履歴を考慮したクエリ処理"""
        
        # 1. 会話履歴から検索クエリを生成
        search_query = self._generate_search_query(user_message)
        
        # 2. 関連ドキュメントを検索
        documents = retrieve_relevant_documents(search_query, top_k=3)
        
        # 3. 会話履歴とドキュメントを含めて回答生成
        answer = self._generate_contextual_answer(
            user_message, 
            documents
        )
        
        # 4. 会話履歴に追加
        self.conversation_history.append({
            "role": "user",
            "content": user_message
        })
        self.conversation_history.append({
            "role": "assistant",
            "content": answer
        })
        
        return {
            "query": user_message,
            "answer": answer,
            "sources": documents
        }
    
    def _generate_search_query(self, user_message: str) -> str:
        """会話履歴から検索クエリを生成"""
        
        if not self.conversation_history:
            return user_message
        
        # 会話履歴を考慮してクエリを改善
        messages = [
            {"role": "system", "content": "会話の文脈から適切な検索クエリを生成してください。"},
            *self.conversation_history[-4:],  # 直近2往復分
            {"role": "user", "content": f"次の質問に対する検索クエリを生成: {user_message}"}
        ]
        
        response = openai.ChatCompletion.create(
            engine="gpt-4",
            messages=messages,
            temperature=0,
            max_tokens=100
        )
        
        return response.choices[0].message['content']
    
    def _generate_contextual_answer(
        self, 
        user_message: str, 
        documents: List[Dict]
    ) -> str:
        """会話履歴を考慮して回答生成"""
        
        context = "\n\n".join([
            f"【文書{i+1}\n{doc['content']}"
            for i, doc in enumerate(documents)
        ])
        
        messages = [
            {"role": "system", "content": """あなたは親切なアシスタントです。
提供された文書と会話履歴を参考にして回答してください。"""},
            *self.conversation_history[-6:],  # 直近3往復分
            {"role": "user", "content": f"""参考文書:
{context}

質問: {user_message}"""}
        ]
        
        response = openai.ChatCompletion.create(
            engine="gpt-4",
            messages=messages,
            temperature=0.3,
            max_tokens=1000
        )
        
        return response.choices[0].message['content']

# 使用例
rag = ConversationalRAG()

print("=== 1回目の質問 ===")
result1 = rag.query("Azure AI Searchとは何ですか?")
print(result1['answer'])

print("\n=== 2回目の質問(前の文脈を考慮) ===")
result2 = rag.query("その料金はいくらですか?")
print(result2['answer'])

print("\n=== 3回目の質問 ===")
result3 = rag.query("無料プランはありますか?")
print(result3['answer'])

5. 本番運用のベストプラクティス

5.1 エラーハンドリング

本番環境では、堅牢なエラー処理が必須です。

import logging
from tenacity import retry, stop_after_attempt, wait_exponential

# ロギング設定
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

@retry(
    stop=stop_after_attempt(3),
    wait=wait_exponential(multiplier=1, min=2, max=10)
)
def retrieve_with_retry(query: str, top_k: int = 5) -> List[Dict]:
    """リトライ機能付き検索"""
    
    try:
        # クエリをベクトル化
        embedding_response = openai.Embedding.create(
            engine="text-embedding-ada-002",
            input=query
        )
        query_vector = embedding_response['data'][0]['embedding']
        
        # 検索実行
        url = f"{endpoint}/indexes/blog-index/docs/search?api-version=2024-07-01"
        headers = {
            "Content-Type": "application/json",
            "api-key": api_key
        }
        
        payload = {
            "search": query,
            "vectorQueries": [{
                "kind": "vector",
                "vector": query_vector,
                "fields": "contentVector",
                "k": 50
            }],
            "top": top_k
        }
        
        response = requests.post(url, headers=headers, json=payload, timeout=30)
        response.raise_for_status()
        
        result = response.json()
        logger.info(f"検索成功: {len(result['value'])}件のドキュメント取得")
        
        return result['value']
        
    except openai.error.RateLimitError as e:
        logger.warning(f"レート制限エラー: {e}")
        raise
        
    except requests.exceptions.Timeout:
        logger.error("検索タイムアウト")
        raise
        
    except requests.exceptions.HTTPError as e:
        logger.error(f"HTTP エラー: {e}")
        raise
        
    except Exception as e:
        logger.error(f"予期しないエラー: {e}")
        raise

def safe_rag_query(query: str) -> Dict:
    """安全なRAGクエリ実行"""
    
    try:
        documents = retrieve_with_retry(query)
        
        if not documents:
            return {
                "query": query,
                "answer": "申し訳ございません。関連する情報が見つかりませんでした。",
                "sources": [],
                "error": None
            }
        
        answer = generate_answer(query, documents)
        
        return {
            "query": query,
            "answer": answer,
            "sources": documents,
            "error": None
        }
        
    except Exception as e:
        logger.error(f"RAGクエリ失敗: {e}")
        
        return {
            "query": query,
            "answer": "申し訳ございません。現在サービスに問題が発生しています。しばらく時間をおいてから再度お試しください。",
            "sources": [],
            "error": str(e)
        }

5.2 パフォーマンス最適化

from functools import lru_cache
import hashlib

# 埋め込みキャッシュ
@lru_cache(maxsize=1000)
def cached_embedding(text: str) -> List[float]:
    """埋め込みをキャッシュ"""
    
    response = openai.Embedding.create(
        engine="text-embedding-ada-002",
        input=text
    )
    return response['data'][0]['embedding']

def optimized_retrieve(query: str, top_k: int = 5) -> List[Dict]:
    """最適化された検索"""
    
    # キャッシュされた埋め込みを使用
    query_vector = cached_embedding(query)
    
    # バッチサイズを調整してパフォーマンス向上
    payload = {
        "search": query,
        "vectorQueries": [{
            "kind": "vector",
            "vector": query_vector,
            "fields": "contentVector",
            "k": min(top_k * 5, 50)  # 適切な候補数
        }],
        "select": "id,title,content",  # 必要なフィールドのみ
        "top": top_k
    }
    
    # ... 検索実行

5.3 コスト最適化

class CostOptimizedRAG:
    """コストを最適化したRAG"""
    
    def __init__(self):
        self.embedding_cache = {}
        self.search_cache = {}
    
    def query(self, user_message: str, use_gpt4: bool = False) -> Dict:
        """コストを考慮したクエリ処理"""
        
        # 1. 検索結果をキャッシュ
        cache_key = hashlib.md5(user_message.encode()).hexdigest()
        
        if cache_key in self.search_cache:
            logger.info("キャッシュヒット: 検索をスキップ")
            documents = self.search_cache[cache_key]
        else:
            documents = retrieve_relevant_documents(user_message)
            self.search_cache[cache_key] = documents
        
        # 2. モデルの選択(コストと品質のバランス)
        engine = "gpt-4" if use_gpt4 else "gpt-35-turbo"
        
        # 3. コンテキストを最小化
        # 長いドキュメントは要約して使用
        summarized_docs = [
            {
                "title": doc["title"],
                "content": doc["content"][:500]  # 最初の500文字のみ
            }
            for doc in documents[:3]  # 上位3件のみ
        ]
        
        answer = generate_answer(user_message, summarized_docs)
        
        return {
            "query": user_message,
            "answer": answer,
            "sources": documents,
            "model_used": engine
        }

6. モニタリングとトラブルシューティング

6.1 Azure Monitor との統合

from azure.monitor.opentelemetry import configure_azure_monitor
from opentelemetry import trace

# Azure Monitor の設定
configure_azure_monitor(
    connection_string="YOUR_APPLICATION_INSIGHTS_CONNECTION_STRING"
)

tracer = trace.get_tracer(__name__)

def monitored_rag_query(query: str) -> Dict:
    """モニタリング付きRAGクエリ"""
    
    with tracer.start_as_current_span("rag_query") as span:
        span.set_attribute("query", query)
        
        try:
            # 検索フェーズ
            with tracer.start_as_current_span("search"):
                documents = retrieve_relevant_documents(query)
                span.set_attribute("documents_found", len(documents))
            
            # 生成フェーズ
            with tracer.start_as_current_span("generate"):
                answer = generate_answer(query, documents)
                span.set_attribute("answer_length", len(answer))
            
            span.set_attribute("status", "success")
            
            return {
                "query": query,
                "answer": answer,
                "sources": documents
            }
            
        except Exception as e:
            span.set_attribute("status", "error")
            span.set_attribute("error_message", str(e))
            raise

6.2 パフォーマンスメトリクスの収集

import time
from dataclasses import dataclass
from typing import Optional

@dataclass
class QueryMetrics:
    """クエリのメトリクス"""
    query: str
    search_time: float
    generation_time: float
    total_time: float
    documents_retrieved: int
    tokens_used: Optional[int] = None
    success: bool = True
    error_message: Optional[str] = None

def measure_rag_performance(query: str) -> QueryMetrics:
    """RAGのパフォーマンスを測定"""
    
    start_time = time.time()
    
    try:
        # 検索時間を測定
        search_start = time.time()
        documents = retrieve_relevant_documents(query)
        search_time = time.time() - search_start
        
        # 生成時間を測定
        gen_start = time.time()
        answer = generate_answer(query, documents)
        gen_time = time.time() - gen_start
        
        total_time = time.time() - start_time
        
        return QueryMetrics(
            query=query,
            search_time=search_time,
            generation_time=gen_time,
            total_time=total_time,
            documents_retrieved=len(documents),
            success=True
        )
        
    except Exception as e:
        return QueryMetrics(
            query=query,
            search_time=0,
            generation_time=0,
            total_time=time.time() - start_time,
            documents_retrieved=0,
            success=False,
            error_message=str(e)
        )

# 使用例
metrics = measure_rag_performance("Azure AI Searchについて教えてください")
print(f"検索時間: {metrics.search_time:.2f}")
print(f"生成時間: {metrics.generation_time:.2f}")
print(f"合計時間: {metrics.total_time:.2f}")

6.3 よくある問題と対処法

問題1: 検索結果が不正確

# 解決策: セマンティックランキングを有効化
payload = {
    "search": query,
    "vectorQueries": [...],
    "queryType": "semantic",  # ← これを追加
    "semanticConfiguration": "my-semantic-config"
}

問題2: レスポンスが遅い

# 解決策: 並列処理とキャッシング
from concurrent.futures import ThreadPoolExecutor

def parallel_search(queries: List[str]) -> List[List[Dict]]:
    """複数クエリを並列実行"""
    
    with ThreadPoolExecutor(max_workers=5) as executor:
        results = list(executor.map(retrieve_relevant_documents, queries))
    
    return results

問題3: コストが高い

# 解決策1: GPT-3.5 Turboを使用
engine = "gpt-35-turbo"  # GPT-4より安価

# 解決策2: トークン数を削減
max_tokens = 500  # 必要最小限に設定

# 解決策3: キャッシングを活用
# (前述のコスト最適化を参照)

Part 3 のまとめ

お疲れさまでした!これでAzure AI Searchを使った本格的なRAGアプリケーションが構築できるようになりました。

押さえておきたいポイント:

インデクサーとスキルセットで自動化

  • 統合ベクトル化パイプラインで効率化
  • スケジュール実行で常に最新状態を維持

RAGパターンの実装

  • ハイブリッド検索で最適な情報取得
  • Azure OpenAIで自然な回答生成
  • 会話履歴を考慮した対話型システム

本番運用の準備

  • エラーハンドリングとリトライ機構
  • パフォーマンスとコストの最適化
  • モニタリングとトラブルシューティング

さらに学びたい方へ:

次のステップとして、以下のトピックも検討してみてください:

  • マルチモーダル検索(画像 + テキスト)
  • エージェント検索(Agentic Retrieval)
  • カスタムスキルの開発
  • セキュリティとコンプライアンス強化

Azure AI Searchは、継続的に新機能が追加されています。最新の情報は公式ドキュメントをチェックしましょう!


全体のまとめ(Part 1-3)

3つのパートを通して、Azure AI Searchの基礎から応用まで学びました:

Part 1: 基礎概念と用語の理解
Part 2: インデックス作成とベクトル検索の実装
Part 3: RAGアプリケーションと本番運用

これで、あなたも次世代の検索システムを構築できます!

ぜひ実際のプロジェクトで活用してみてください。分からないことがあれば、コミュニティやドキュメントを活用しながら、一歩ずつ進んでいきましょう。

Happy Searching! 🚀


参考リンク

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?