0. 全体像
今回作っていく環境は以下の通りです。
Microsoftのエンタープライズサーチのアーキテクチャの理解を深めるため、
自分で作ってみようというのがモチベーションです。
今回は、①質問を入力, ③クエリを基にファイル検索の機能を実装したので、その備忘録です。
※今回作る環境とAzure-Samplesとの差異
- セマンティック検索は使用しません。
- PDFファイルを文字数で分割し、txtファイルとしてBLOBに保存する構成とします。
1. Congnitive Searchを学ぶ
①BLOBストレージに手動でファイルをアップロード
- ストレージアカウントを作成
- コンテナーを作成
- 作成したコンテナーを選択し、検索対象とするファイルをアップロード
- アクセスキーから接続文字列をコピーしておく
補足
今回は、クラウドサービス提供における情報セキュリティ対策ガイドライン
と令和3年版 情報通信白書
の一部をアップロード。
②Azure Congnitive Searchの作成・設定
Azure Congnitive Searchを作成
- Azureポータルからフリープランで作成
a. データのインポートを実施
- データソース: Azure BLOBストレージを選択
- 抽出されるデータ: コンテンツとメタデータに
- 接続文字列: 先ほどコピーしたもの
- コンテナ名: 先ほど作成したコンテナーの名前
補足
様々なデータソースを選択可能。
b. コグニティブスキルの追加
- 今回は省略
c. インデックスをカスタマイズ
- 検索時に利用できる属性をカスタマイズする
- 今回はファイルの中身とファイル名で検索・フィルターできるように設定
補足
- その他サポートされるインデックスフィールドの種類
- ファイルタイプによって異なる
③外部からCognitive Searchを呼び出す
今回はPythonのrequestsモジュールを使って検索を試します。
a. PythonからAPI呼び出しするためのセットアップ
- Cognitive SearchのAPIキーをコピー
- 左側メニューのキーをクリックし、管理者キーまたはクエリキーをコピー
-
Cognitive Searchのリソース画面からエンドポイントを確認
-
概要
からURLをコピー -
概要→インデックス
からインデックス名をコピー
-
-
環境変数を設定
$Env:API_KEY="your api key"
$Env:ENDPOINT="{URL}/indexes/{インデックス名}/docs/search?api-version=2020-06-30"
b. Pythonから検索実行 (クエリを指定しない)
- 検索を実行するソースコード (クエリを指定しない場合)
import requests
import os
api_key = os.getenv("API_KEY")
endpoint = os.getenv("ENDPOINT")
headers = {
"api-key": api_key,
"Content-Type": "application/json"
}
query = {}
response = requests.post(endpoint, headers=headers, json=query)
results = response.json()["value"]
for result in results:
print(f"score: {result['@search.score']}")
print(f"content: {result['content']}")
print("!!!"*20)
全ての文書が検索により取得され、scoreは1.0となっています。よさそうです。
c. Pythonから検索実行 (クエリを指定する)
上記のソースコードのqueryにクエリ情報を埋め込みます。
- 単純なクエリを指定して実行
user_input = "クラウドサービス"
# user_input = "マルウェア セキュリティ"
query = {
"search": user_input, # 検索ワード
"searchMode": "any", # 部分一致 (デフォルト)
# "orderby": "xxx" # デフォルトでスコアの降順になる
"top": 3 # 上位3件
}
クラウドサービス
という単語はどちらの文書にも含まれているため、
二つのファイルが検索結果として表示されます。
マルウェア セキュリティ
という単語は片方の文書にしか含まれていないため、
検索結果には一つの文書しか表示されない結果となりました。
d. インデックスを利用して絞り込み
- filterにインデックスのフィールドを利用する
- 今回はインデックス作成時に、
content
,metadata_storage_path
を指定している - 以下では
metadata_storage_name
を利用してファイル名で絞り込みする
- 今回はインデックス作成時に、
import requests
import os
api_key = os.getenv("API_KEY")
endpoint = os.getenv("ENDPOINT")
headers = {
"api-key": api_key,
"Content-Type": "application/json"
}
user_input = "クラウドサービス"
file_name = "ガイドライン"
query = {
"search": user_input, # 検索ワード
"searchMode": "any", # 部分一致 (デフォルト)
"top": 3, # 上位3件
"filter": f"search.ismatch('{file_name}', 'metadata_storage_name')" # ファイル名にfile_nameが含まれているか
}
response = requests.post(endpoint, headers=headers, json=query)
results = response.json()["value"]
for result in results:
print(f"score: {result['@search.score']}")
print(f"content: {result['content']}")
print("!!!"*20)
クラウドサービス
という単語は全ての文書に含まれていますが、
ファイル名にガイドライン
が含まれているのは片方の文書だけです。
そのため一つの文書のみを検索結果として得ることができます。
その他・補足
-
APIバージョンの安定版は
2020-06-30
-
複雑なクエリは以下サイトが参考になりそう。
2. ファイルの分割アップロード機能を作る
今回Azure OpenAIで使うモデルはgpt-35-turboです。
Microsoftが公開しているエンタープライズサーチのアーキテクチャでは、
GPT3.5のmax_tokensの制限を回避するために、ファイルをBLOBストレージにアップロードする際に分割しておく手法が採用されています。
この機能を実装します。
※以降、バックエンド(Azure Functions)の処理のみ記載します
- ストレージの接続文字列とコンテナ名を、環境変数に設定する
$Env:AZURE_STORAGE_CONNECTION_STRING="接続文字列"
$Env:STORAGE_CONTAINER_NAME="コンテナ名"
- 必要なモジュールをインストールする
# DO NOT include azure-functions-worker in this file
# The Python Worker is managed by Azure Functions platform
# Manually managing azure-functions-worker may cause unexpected issues
azure-functions
flask
azure-storage-blob
pypdf
- エンドポイント作成
-
/api/upload_blob
のPOSTリクエストが来た場合、テキスト分割&ファイルをBLOBストレージにアップロード -
/api/search
のPOSTリクエストが来た場合、Cognitive Searchに対してクエリを送信
-
from flask import Flask, request, jsonify
import os
# モジュールのインポート
from .modules.enterprisesearch import EnterpriseSearch
# 初期化
enterprise_search = EnterpriseSearch()
app = Flask(__name__)
@app.route('/api/test', methods=['GET'])
def api_test():
return 'This is Test'
@app.route('/api/upload_blob', methods=['POST'])
def upload_blob():
# 保存先ディレクトリがなければ生成
if os.path.exists(UPLOAD_FOLDER):
pass
else:
os.mkdir(UPLOAD_FOLDER)
# ファイルを保存
file = request.files['file']
if allowed_file(file.filename):
save_path = os.path.join(UPLOAD_FOLDER, file.filename)
file.save(save_path)
# ファイルをBLOBストレージにアップロードする
response, _ = enterprise_search.upload_blob(save_path)
response = {'message': response}
return jsonify(response)
@app.route('/api/search', methods=['POST'])
def search():
question = request.json['question']
response = enterprise_search.search_and_answer(question)
response = {'message': response}
return jsonify(response)
if __name__ == '__main__':
app.run()
from pypdf import PdfReader
import os
from azure.storage.blob import BlobServiceClient
import requests
char_limit = 1000 # 分割する文字数の制限
class EnterpriseSearch():
def __init__(self):
# Blob Storageの接続情報
connection_string = os.getenv("AZURE_STORAGE_CONNECTION_STRING")
self.container_name = os.getenv("STORAGE_CONTAINER_NAME")
# Blob Service Clientの作成
self.blob_service_client = BlobServiceClient.from_connection_string(connection_string)
def upload_blob(self, file_path):
try:
# 分割後のファイル名のプレフィックス
len_extension = len(file_path.split(".")[-1])
file_base_name = os.path.basename(file_path)[:(-len_extension-1)]
# ファイルの読み込みと分割処理
with open(file_path, "rb") as pdf_file:
# PDFを読み込み、各ページのテキストを取得する
pdf_reader = PdfReader(pdf_file)
page_texts = []
for page in pdf_reader.pages:
page_text = page.extract_text()
page_texts.append(page_text)
# 文字列を分割し、txtファイルとしてBLOBアップロード
for index, page_text in enumerate(page_texts):
text_fragments = self._split_text_by_limit(page_text, char_limit)
for fragment_index, fragment in enumerate(text_fragments):
blob_file_name = f"{file_base_name}_{index+1}_{fragment_index+1}.txt"
container_client = self.blob_service_client.get_container_client(self.container_name)
container_client.upload_blob(blob_file_name, data=fragment, overwrite=True)
return "!!! Uploaded !!!", 200
except Exception as e:
print(e)
return "!!! Server Error !!!", 500
def _split_text_by_limit(self, text, char_limit):
text_fragments = []
current_fragment = ""
current_char_count = 0
prev_text = ""
for char in text:
if prev_text != "": # 前回遡りが発生している場合
current_fragment = prev_text[::-1]
prev_text = ""
current_fragment += char
current_char_count += 1
if current_char_count >= char_limit:
# 指定文字数を超えた場合、後ろに遡って最初の句読点や空白文字を探す
while current_fragment[-1] not in [".", "。", "!", "!", "?", "?", "\n"]:
prev_text += current_fragment[-1] # 遡った文字を保持。次回に反映
current_fragment = current_fragment[:-1]
current_char_count -= 1
text_fragments.append(current_fragment)
current_fragment = ""
current_char_count = 0
if current_fragment:
text_fragments.append(current_fragment)
return text_fragments
def search_and_answer(self, question):
### Todo: questionからquery, file_nameを生成 ###
query = question
file_name = "*"
api_key = os.getenv("SEARCH_API_KEY")
endpoint = os.getenv("SEARCH_ENDPOINT")
headers = {
"api-key": api_key,
"Content-Type": "application/json"
}
query = {
"search": query, # 検索ワード
"searchMode": "any", # 部分一致 (デフォルト)
"top": 3, # 上位3件
"filter": f"search.ismatch('{file_name}', 'metadata_storage_name')" # ファイル名にfile_nameが含まれているか
}
response = requests.post(endpoint, headers=headers, json=query)
results = response.json()["value"]
results = results[0]["metadata_storage_name"]
return results
if __name__ == "__main__":
test = EnterpriseSearch()
# file_path = "./data/p13-14_情報セキュリティ対策ガイドライン.pdf"
# test.upload_blob(file_path)
results = test.search_and_answer("クラウド")
for result in results:
print(result)
実行イメージ
次回の内容
現状、ユーザの入力した単語が含まれているファイルをCognitive Searchから取得する機能となっています。
また、単純に検索で得られたファイル名を返しているだけです。
Azure OpenAIを用いて、ユーザの入力からCongnitive Searchのクエリを生成する機能を作っていこうかと思います。
やっとバックエンド×大規模言語モデルでアレコレできる、、
プロンプト作成はguidanceでやろうかなあ
(guidanceについては以下参照)