5
3

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

【Azure OpenAI開発】簡易版エンタープライズサーチを自作して理解を深める (1) Cognitive Searchのプロビジョニング

Last updated at Posted at 2023-05-20

0. 全体像

今回作っていく環境は以下の通りです。

image.png

Microsoftのエンタープライズサーチのアーキテクチャの理解を深めるため、
自分で作ってみようというのがモチベーションです。

今回は、①質問を入力, ③クエリを基にファイル検索の機能を実装したので、その備忘録です。

※今回作る環境とAzure-Samplesとの差異

  • セマンティック検索は使用しません。
  • PDFファイルを文字数で分割し、txtファイルとしてBLOBに保存する構成とします。

1. Congnitive Searchを学ぶ

①BLOBストレージに手動でファイルをアップロード

  • ストレージアカウントを作成
  • コンテナーを作成
  • 作成したコンテナーを選択し、検索対象とするファイルをアップロード
  • アクセスキーから接続文字列をコピーしておく

補足
今回は、クラウドサービス提供における情報セキュリティ対策ガイドライン令和3年版 情報通信白書の一部をアップロード。

②Azure Congnitive Searchの作成・設定

Azure Congnitive Searchを作成

  • Azureポータルからフリープランで作成

a. データのインポートを実施

  • データソース: Azure BLOBストレージを選択
  • 抽出されるデータ: コンテンツとメタデータに
  • 接続文字列: 先ほどコピーしたもの
  • コンテナ名: 先ほど作成したコンテナーの名前

image.png

補足
様々なデータソースを選択可能。

image.png

b. コグニティブスキルの追加

  • 今回は省略

c. インデックスをカスタマイズ

  • 検索時に利用できる属性をカスタマイズする
  • 今回はファイルの中身とファイル名で検索・フィルターできるように設定

image.png

補足

  • その他サポートされるインデックスフィールドの種類
    • ファイルタイプによって異なる


③外部からCognitive Searchを呼び出す

今回はPythonのrequestsモジュールを使って検索を試します。

a. PythonからAPI呼び出しするためのセットアップ

  • Cognitive SearchのAPIキーをコピー
    • 左側メニューのキーをクリックし、管理者キーまたはクエリキーをコピー

image.png

  • Cognitive Searchのリソース画面からエンドポイントを確認

    • 概要からURLをコピー
    • 概要→インデックスからインデックス名をコピー
  • 環境変数を設定

Windowsの場合
$Env:API_KEY="your api key"
$Env:ENDPOINT="{URL}/indexes/{インデックス名}/docs/search?api-version=2020-06-30"

b. Pythonから検索実行 (クエリを指定しない)

  • 検索を実行するソースコード (クエリを指定しない場合)
import requests
import os

api_key = os.getenv("API_KEY")
endpoint = os.getenv("ENDPOINT")

headers = {
    "api-key": api_key,
    "Content-Type": "application/json"
}

query = {}

response = requests.post(endpoint, headers=headers, json=query)
results = response.json()["value"]

for result in results:
    print(f"score: {result['@search.score']}")
    print(f"content: {result['content']}")
    print("!!!"*20)

全ての文書が検索により取得され、scoreは1.0となっています。よさそうです。

image.png

image.png

c. Pythonから検索実行 (クエリを指定する)

上記のソースコードのqueryにクエリ情報を埋め込みます。

  • 単純なクエリを指定して実行
user_input = "クラウドサービス"
# user_input = "マルウェア セキュリティ"

query = {
    "search": user_input,    # 検索ワード
    "searchMode": "any",     # 部分一致 (デフォルト)
    # "orderby": "xxx"       # デフォルトでスコアの降順になる
    "top": 3                 # 上位3件 
}

クラウドサービスという単語はどちらの文書にも含まれているため、
二つのファイルが検索結果として表示されます。

マルウェア セキュリティという単語は片方の文書にしか含まれていないため、
検索結果には一つの文書しか表示されない結果となりました。

d. インデックスを利用して絞り込み

  • filterにインデックスのフィールドを利用する
    • 今回はインデックス作成時に、content, metadata_storage_pathを指定している
    • 以下ではmetadata_storage_nameを利用してファイル名で絞り込みする
import requests
import os

api_key = os.getenv("API_KEY")
endpoint = os.getenv("ENDPOINT")

headers = {
    "api-key": api_key,
    "Content-Type": "application/json"
}


user_input = "クラウドサービス"
file_name = "ガイドライン"

query = {
    "search": user_input,    # 検索ワード
    "searchMode": "any",     # 部分一致 (デフォルト)
    "top": 3,                # 上位3件
    "filter": f"search.ismatch('{file_name}', 'metadata_storage_name')"  # ファイル名にfile_nameが含まれているか
}

response = requests.post(endpoint, headers=headers, json=query)
results = response.json()["value"]


for result in results:
    print(f"score: {result['@search.score']}")
    print(f"content: {result['content']}")
    print("!!!"*20)

クラウドサービスという単語は全ての文書に含まれていますが、
ファイル名にガイドラインが含まれているのは片方の文書だけです。
そのため一つの文書のみを検索結果として得ることができます。

その他・補足

  • APIバージョンの安定版は2020-06-30

  • 複雑なクエリは以下サイトが参考になりそう。


2. ファイルの分割アップロード機能を作る

今回Azure OpenAIで使うモデルはgpt-35-turboです。
Microsoftが公開しているエンタープライズサーチのアーキテクチャでは、
GPT3.5のmax_tokensの制限を回避するために、ファイルをBLOBストレージにアップロードする際に分割しておく手法が採用されています。
この機能を実装します。

※以降、バックエンド(Azure Functions)の処理のみ記載します

  • ストレージの接続文字列とコンテナ名を、環境変数に設定する
Windowsの場合
$Env:AZURE_STORAGE_CONNECTION_STRING="接続文字列"
$Env:STORAGE_CONTAINER_NAME="コンテナ名"
  • 必要なモジュールをインストールする
api/requirements.txt
# DO NOT include azure-functions-worker in this file
# The Python Worker is managed by Azure Functions platform
# Manually managing azure-functions-worker may cause unexpected issues

azure-functions
flask
azure-storage-blob
pypdf
  • エンドポイント作成
    • /api/upload_blobのPOSTリクエストが来た場合、テキスト分割&ファイルをBLOBストレージにアップロード
    • /api/searchのPOSTリクエストが来た場合、Cognitive Searchに対してクエリを送信
app.py
from flask import Flask, request, jsonify
import os

# モジュールのインポート
from .modules.enterprisesearch import EnterpriseSearch

# 初期化
enterprise_search = EnterpriseSearch()

app = Flask(__name__)

@app.route('/api/test', methods=['GET'])
def api_test():
    return 'This is Test'

@app.route('/api/upload_blob', methods=['POST'])
def upload_blob():
    # 保存先ディレクトリがなければ生成
    if os.path.exists(UPLOAD_FOLDER):
        pass
    else:
        os.mkdir(UPLOAD_FOLDER)

    # ファイルを保存
    file = request.files['file']
    if allowed_file(file.filename):
        save_path = os.path.join(UPLOAD_FOLDER, file.filename)
        file.save(save_path)

    # ファイルをBLOBストレージにアップロードする
    response, _ = enterprise_search.upload_blob(save_path)

    response = {'message': response}
    return jsonify(response)


@app.route('/api/search', methods=['POST'])
def search():
    question = request.json['question']

    response = enterprise_search.search_and_answer(question)
    response = {'message': response}

    return jsonify(response)

if __name__ == '__main__':
    app.run()
modules/enterprisesearch.py
from pypdf import PdfReader
import os
from azure.storage.blob import BlobServiceClient
import requests

char_limit = 1000  # 分割する文字数の制限

class EnterpriseSearch():
    def __init__(self):
        # Blob Storageの接続情報
        connection_string = os.getenv("AZURE_STORAGE_CONNECTION_STRING")
        self.container_name = os.getenv("STORAGE_CONTAINER_NAME")

        # Blob Service Clientの作成
        self.blob_service_client = BlobServiceClient.from_connection_string(connection_string)

    def upload_blob(self, file_path):
        try:
            # 分割後のファイル名のプレフィックス
            len_extension = len(file_path.split(".")[-1])
            file_base_name = os.path.basename(file_path)[:(-len_extension-1)]


            # ファイルの読み込みと分割処理
            with open(file_path, "rb") as pdf_file:
                # PDFを読み込み、各ページのテキストを取得する
                pdf_reader = PdfReader(pdf_file)
                page_texts = []

                for page in pdf_reader.pages:
                    page_text = page.extract_text()
                    page_texts.append(page_text)

                # 文字列を分割し、txtファイルとしてBLOBアップロード
                for index, page_text in enumerate(page_texts):
                    text_fragments = self._split_text_by_limit(page_text, char_limit)

                    for fragment_index, fragment in enumerate(text_fragments):
                        blob_file_name = f"{file_base_name}_{index+1}_{fragment_index+1}.txt"
                        container_client = self.blob_service_client.get_container_client(self.container_name)
                        container_client.upload_blob(blob_file_name, data=fragment, overwrite=True)
            
            return "!!! Uploaded !!!", 200
        except Exception as e:
            print(e)
            return "!!! Server Error !!!", 500

    def _split_text_by_limit(self, text, char_limit):
            text_fragments = []
            current_fragment = ""
            current_char_count = 0
            prev_text = ""

            for char in text:
                if prev_text != "":  # 前回遡りが発生している場合
                    current_fragment = prev_text[::-1]
                    prev_text = ""

                current_fragment += char
                current_char_count += 1

                if  current_char_count >= char_limit:
                    # 指定文字数を超えた場合、後ろに遡って最初の句読点や空白文字を探す
                    while current_fragment[-1] not in [".", "", "!", "", "?", "", "\n"]:
                        prev_text += current_fragment[-1] # 遡った文字を保持。次回に反映
                        current_fragment = current_fragment[:-1]
                        current_char_count -= 1
                    
                    text_fragments.append(current_fragment)
                    current_fragment = ""
                    current_char_count = 0


            if current_fragment:
                text_fragments.append(current_fragment)

            return text_fragments
    
    def search_and_answer(self, question):
        ### Todo: questionからquery, file_nameを生成 ###
        query = question
        file_name = "*"

        api_key = os.getenv("SEARCH_API_KEY")
        endpoint = os.getenv("SEARCH_ENDPOINT")

        headers = {
            "api-key": api_key,
            "Content-Type": "application/json"
        }

        query = {
            "search": query,    # 検索ワード
            "searchMode": "any",     # 部分一致 (デフォルト)
            "top": 3,                # 上位3件
            "filter": f"search.ismatch('{file_name}', 'metadata_storage_name')"  # ファイル名にfile_nameが含まれているか
        }

        response = requests.post(endpoint, headers=headers, json=query)
        results = response.json()["value"]

        results = results[0]["metadata_storage_name"]

        return results


if __name__ == "__main__":
    test = EnterpriseSearch()

    # file_path = "./data/p13-14_情報セキュリティ対策ガイドライン.pdf"
    # test.upload_blob(file_path)

    results = test.search_and_answer("クラウド")

    for result in results:
        print(result)

実行イメージ

image.png


次回の内容

現状、ユーザの入力した単語が含まれているファイルをCognitive Searchから取得する機能となっています。
また、単純に検索で得られたファイル名を返しているだけです。
Azure OpenAIを用いて、ユーザの入力からCongnitive Searchのクエリを生成する機能を作っていこうかと思います。

やっとバックエンド×大規模言語モデルでアレコレできる、、
プロンプト作成はguidanceでやろうかなあ
(guidanceについては以下参照)

5
3
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
5
3

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?