2
0

AWS で ハイブリッド検索+リランクを実装(AmazonOpenSearch(HybridSearch) + Kendra(Rerank))

Last updated at Posted at 2024-09-01
  • AWS の OpenSearch で インデックス登録、ハイブリッド検索、リランク を使用する為の備忘録
  • ハイブリッド検索には Amazon OpenSearch Service (マネージド型クラスター)を使用
  • リランクには Amazon Kendra Intelligent Ranking を使用
    • 図では OpenSearch から Kendra にリランクを飛ばしていますが、
      その部分は実装出来なかった(プラグインの導入部で打ち切り)のでユーザ側からリランク処理を飛ばした
      image.png

データ登録パイプラインの作成

  • パイプラインを登録しておく事であらかじめ定義した処理を自動化出来るため、
    データ登録の際に自身でデータをベクトル化しておく必要が無くなる
  1. ユーザがデータ登録を実行
  2. [パイプライン処理] 自動的に指定された列のデータをベクトル化
  3. データをDBへ登録
    image.png

作成時条件

対象 備考
Amazon OpenSearch Service バージョン:2.13
Python バージョン:3.10.11
  • opensearch-py-ml ライブラリは1年ほど pypl の更新がされていない(1.1.0のまま)ため、
    git から現在の最新のコミット id を指定してライブラリを取得した
  • 最新版のライブラリでは OpenSearch でモデルをデプロイするための関数が追加されている
requirements.txt
python-dotenv==1.0.1
opensearch-py==2.7.1
git+https://github.com/opensearch-project/opensearch-py-ml#39284d6e99cd9b449bcfe99511c68a63d897ca78
boto3==1.35.5

前提条件

  • Amazon Bedrock で「Titan Embeddings G1 - Text」を使用可能な状態にしておく
    image.png

Amazon OpenSearch Service の作成

  1. [Amazon OpenSearch Service] > [マネージド型クラスター] > [ダッシュボード] > [ドメインの作成] を選択
    image.png
  2. 下記のように設定して作成
    • マスターユーザ、マスターパスワードは適当に入力する
    • インスタンスタイプは最低の安いものとした
      • 今回は OpenSearch のインスタンス内でモデルを動かす予定が無いため
        (リランク、Embeddings どちらも外部のものを呼び出す)
      • OpenSearch のインスタンス内で「リランクモデル」「Embeddingsモデル」など動かす予定がある場合はもう少し良いインスタンスにする
        • 以前作業していた時、GPU が無いインスタンスだとモデルデプロイの時にエラーで上手くいかなかった事がある(GPU 用モデルを GPU 無し環境にデプロイしようとして発生したエラーだった気がする)

image.png
作成後の状態
image.png

IAM 関連

OpenSearch から Bedrock へアクセスするためのロール作成(INGESTION_PIPELINE_ROLE)

  • Bedrock へのアクセス権を持つロールを作成して ARN をメモする
    image.png
  • ロール作成後に信頼ポリシーを編集する
    image.png

IAM:PassRolePolicy を作成

  • 後続の OpenSearch 用のマスターユーザに指定するポリシーを作成する
    image.png

OpenSearch 用のマスターユーザを作成

  • OpenSearch 用のマスターユーザを作成する

  • 先ほど作成したポリシーを指定する
    image.png

  • 作成したユーザの「ARN」をメモする

  • 「アクセスキーの作成」をして、生成された「アクセスキー」「シークレットキー」をメモする
    image.png

OpenSearch のマスタユーザを設定

  • 先ほど作成した IAM ユーザの ARN をマスターユーザとして登録する
    image.png

Kendra Intelligent Ranking 用ポリシーの作成

  • Kendra Intelligent Ranking ポリシーを作成する
    image.png
  • 先ほど作成した IAM ユーザにポリシーを追加する
    image.png

embeddings model のコネクタの作成とデプロイ

  • 以下のようなコードで作成する
  • 成功すると下記メッセージが出る
    image.png
参考コード
.env
# AWS OPEN SEARCH のエンドポイント
# cluster endpoint, for example: my-test-domain.us-east-1.es.amazonaws.com
AWS_OPENSEARCH_ENDPOINT="search-aoss-test-XXXXXX.ap-northeast-1.es.amazonaws.com"
# IAMユーザ(OpenSearch用のマスターユーザ)
AWS_ACCESS_KEY_ID="メモしたアクセスキー"
AWS_SECRET_ACCESS_KEY="メモしたシークレットキー"
AWS_DEFAULT_REGION ="ap-northeast-1"
# BEDROCK
BEDROCK_EMBEDDINGS_MODEL_ID="amazon.titan-embed-text-v1"
# デプロイするモデル名
EMBEDDINGS_MODEL_NAME="titan-embedded-model"
# Bedrock へのアクセス権を持つロール
INGESTION_PIPELINE_ROLE="arn:aws:iam::XXXXXXXXX:role/ingestion-pipeline-role"
utils/body.py
MODEL_CONNECTOR_BODY = {
            "name": "Amazon Bedrock Connector: embedding",
            "description": "The connector to bedrock Titan embedding model",
            "version": 1,
            "protocol": "aws_sigv4",
            "parameters": {
                "region": os.getenv("AWS_DEFAULT_REGION"),
                "service_name": "bedrock",
                "model": os.getenv("BEDROCK_EMBEDDINGS_MODEL_ID")
            },
            "credential": {
                "roleArn": os.getenv("INGESTION_PIPELINE_ROLE")
            },
            "actions": [
                {
                "action_type": "predict",
                "method": "POST",
                "url": "https://bedrock-runtime.{0}.amazonaws.com/model/{1}/invoke".format(os.getenv("AWS_DEFAULT_REGION"), os.getenv("BEDROCK_EMBEDDINGS_MODEL_ID")),
                "headers": {
                    "content-type": "application/json",
                    "x-amz-content-sha256": "required"
                },
                "request_body": "{ \"inputText\": \"${parameters.inputText}\" }",
                "pre_process_function": "\n    StringBuilder builder = new StringBuilder();\n    builder.append(\"\\\"\");\n    String first = params.text_docs[0];\n    builder.append(first);\n    builder.append(\"\\\"\");\n    def parameters = \"{\" +\"\\\"inputText\\\":\" + builder + \"}\";\n    return  \"{\" +\"\\\"parameters\\\":\" + parameters + \"}\";",
                "post_process_function": "\n      def name = \"sentence_embedding\";\n      def dataType = \"FLOAT32\";\n      if (params.embedding == null || params.embedding.length == 0) {\n        return params.message;\n      }\n      def shape = [params.embedding.length];\n      def json = \"{\" +\n                 \"\\\"name\\\":\\\"\" + name + \"\\\",\" +\n                 \"\\\"data_type\\\":\\\"\" + dataType + \"\\\",\" +\n                 \"\\\"shape\\\":\" + shape + \",\" +\n                 \"\\\"data\\\":\" + params.embedding +\n                 \"}\";\n      return json;\n    "
                }
            ]
        }
utils/opensearch.py
import os
from opensearchpy import OpenSearch, RequestsHttpConnection, AWSV4SignerAuth
from opensearch_py_ml.ml_commons.ml_commons_client import MLCommonClient
import boto3
from dotenv import load_dotenv
load_dotenv()

class OpenSearchAws():
    client = None
    ml_client = None
    region = os.getenv("AWS_DEFAULT_REGION")
    service = "es"
    index_name = None

    def __init__(self, index_name) -> None:
        # opensearch client を作成
        self.client = self.createClient()
        # opensearch ml cient の作成
        self.ml_client = MLCommonClient(self.client)
        self.index_name = index_name

    # opensearch client の作成
    def createClient(self):
        host = os.getenv("AWS_OPENSEARCH_ENDPOINT")
        region = self.region
        service = self.service
        credentials = boto3.Session(
            aws_access_key_id=os.getenv("AWS_ACCESS_KEY_ID"),
            aws_secret_access_key=os.getenv("AWS_SECRET_ACCESS_KEY")
        ).get_credentials()
        auth = AWSV4SignerAuth(credentials, region, service)

        client = OpenSearch(
            hosts = [{'host': host, 'port': 443}],
            http_auth = auth,
            use_ssl = True,
            verify_certs = True,
            connection_class = RequestsHttpConnection,
            timeout=500,
            pool_maxsize = 20
        )
        return client

    # 外部 embeddings model のコネクタの作成とデプロイ
    def registerConnectorAndDeploy(self, body):
        # embeddings モデル向けコネクタを作成
        response = self.ml_client.connector.create_standalone_connector(body)
        titan_connector_id = response["connector_id"]

        # モデルデプロイ情報の設定
        model_info_body = {
            "name": os.getenv("EMBEDDINGS_MODEL_NAME"),
            "function_name": "remote",
            "connector_id": titan_connector_id,
        }
        # モデルのデプロイ
        titan_model_id  = self.ml_client._send_model_info(model_meta_json=model_info_body)
        response = self.ml_client.deploy_model(titan_model_id)
        return titan_model_id
app.py
import json
from utils.opensearch import OpenSearchAws
from utils.requestbody import MODEL_CONNECTOR_BODY
from dotenv import load_dotenv
load_dotenv()

if __name__ == "__main__":
    print("test start")
    open_search = OpenSearchAws(index_name="test")
    titan_model_id = open_search.registerConnectorAndDeploy(body=MODEL_CONNECTOR_BODY)

ingest pipeline の作成

  • ingest pipeline を作成
  • 成功すると下記メッセージが出る
    image.png
参考コード
utils/body.py
INGEST_PIPELINE_BODY = """{{
    "description": "embedding ingest pipeline",
    "processors": [
        {{
            "text_embedding": {{
                "model_id": "{}",
                "field_map": {{"body": "body_embedding"}}
            }}
        }}
    ]
}}"""
utils/opensearch.py
# 略
    # ingest pipeline を登録
    def registerIngestPipeline(self, id, body):
        response = self.client.ingest.put_pipeline(
            id=id,
            body=body,
        )
        self.getSearchPipeline(id)

    # ingest pipeline を検索
    def getIngestPipeline(self, id):
        result = self.client.ingest.get_pipeline(id=id)
        print(f"get_ingest_pipeline: {result}")
app.py
# 略
from utils.requestbody import MODEL_CONNECTOR_BODY, INGEST_PIPELINE_BODY

if __name__ == "__main__":
    print("test start")
    titan_model_id = open_search.registerConnectorAndDeploy(body=MODEL_CONNECTOR_BODY)
    open_search.registerIngestPipeline(id="text-embedding-ingest-pipeline", body=INGEST_PIPELINE_BODY.format(TITAN_MODEL_ID=titan_model_id))

Amazon Kendra Ranking の作成

  • Kendra Ranking を作成して動作の確認を行う
参考コード
utils/kendra.py
import boto3
import os
from botocore.exceptions import ClientError
import pprint
import time
from dotenv import load_dotenv
load_dotenv()

class KendraAws():
    kendraRankingClient = None
    region = os.getenv("AWS_DEFAULT_REGION")

    def __init__(self,) -> None:
        # kendra ranking client の作成
        self.kendraRankingClient = self.createKendraRankingClient()

    # kendra ranking client の作成
    def createKendraRankingClient(self):
        session = boto3.Session(
            aws_access_key_id=os.getenv("AWS_ACCESS_KEY_ID"),
            aws_secret_access_key=os.getenv("AWS_SECRET_ACCESS_KEY"),
            region_name=self.region
        )
        return session.client("kendra-ranking")
    
    # リランクプランの作成
    # https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/kendra-ranking/client/create_rescore_execution_plan.html
    # Provide a name for the rescore execution plan
    # Set your required additional capacity units
    # Don't set capacity units if you don't require more than 1 unit given by default
    def createRescoreExecutionPlan(self, name, capacity_units: int = 1):
        print("Create a rescore execution plan.")
        try:
            rescore_execution_plan_response = self.kendraRankingClient.create_rescore_execution_plan(
                Name = name,
                CapacityUnits = {"RescoreCapacityUnits":capacity_units}
            )
            pprint.pprint(rescore_execution_plan_response)
            rescore_execution_plan_id = rescore_execution_plan_response["Id"]
            print("Wait for Amazon Kendra to create the rescore execution plan.")
            while True:
                # Get the details of the rescore execution plan, such as the status
                rescore_execution_plan_description = self.kendraRankingClient.describe_rescore_execution_plan(
                    Id = rescore_execution_plan_id
                )
                # When status is not CREATING quit.
                status = rescore_execution_plan_description["Status"]
                print(" Creating rescore execution plan. Status: " + status)
                time.sleep(60)
                if status != "CREATING":
                    break

        except ClientError as e:
                print("%s" % e)

        print("Program ends.")

    # リスコアの実行
    # https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/kendra-ranking/client/rescore.html
    def rescore(self,):
        response = self.kendraRankingClient.rescore(
            RescoreExecutionPlanId='リランクプランのID(rescore_execution_plan_id)',
            SearchQuery='朝に言う挨拶はなんですか',
            Documents=[
                {
                    'Id': '0',
                    # 'GroupId': 'string',
                    # 'Title': 'string',
                    'Body': 'おはよう',
                    # 'TokenizedTitle': [
                    #     'string',
                    # ],
                    # 'TokenizedBody': [
                    #     'string',
                    # ],
                    'OriginalScore': 0.33152309
                },{
                    'Id': '1',
                    'Body': 'さようなら',
                    'OriginalScore': 0.3152309
                },{
                    'Id': '2',
                    'Body': 'こんにちは',
                    'OriginalScore': 0.4152309
                },{
                    'Id': '3',
                    'Body': 'こんばんは',
                    'OriginalScore': 0.3552309
                },{
                    'Id': '4',
                    'Body': 'ごきげんよう',
                    'OriginalScore': 0.4052309
                }
            ]
        )
        print(json.dumps(response, indent=2))
app.py
# 略
from utils.kendra import KendraAws

if __name__ == "__main__":
    print("test start")
    # kendra-ranking
    kendra = KendraAws()
    # kendra-intelligent-ranking のデプロイ
    kendra.createRescoreExecutionPlan(name="MyRescoreExecutionPlan")
    # 確認用
    kendra.rescore()
  • 朝のあいさつについて訪ねた時「おはよう」のスコアの上昇が大きいのできちんとリランクは動いていそうです
  • ただ元のスコアも加味するため、最上位にはならず
    image.png

index の作成

  • index を作成して応答を確認

image.png

参考コード
utils/body.py
INDEX_BODY = {
    "settings": {
        "default_pipeline": "text-embedding-ingest-pipeline",
        "index": {
            "analysis": {
                "analyzer": {
                    "custom_kuromoji_analyzer": {
                        "tokenizer": "kuromoji_tokenizer",
                        "filter": ["kuromoji_baseform", "ja_stop"],
                        "char_filter": ["icu_normalizer"],
                    }
                }
            },
            "knn": "true"
        }
    },
    "mappings": {
        "properties": {
            "title": {
                "type": "text",
                "analyzer": "custom_kuromoji_analyzer"
            },
            "body": {
                "type": "text",
                "analyzer": "custom_kuromoji_analyzer"
            },
            "body_embedding": {
                "type": "knn_vector",
                "dimension": 1536,
                "method": {
                    "engine": "lucene",
                    "space_type": "l2",
                    "name": "hnsw",
                    "parameters": {}
                }
            }
        }
    }
}
utils/opensearch.py
# 略
    # index の作成
    def createIndex(self, body: dict):
        response = self.client.indices.create(index=self.index_name, body=body)
        print(response)
        
    # index の検索
    def getIndex(self):
        response = self.client.indices.get(index=self.index_name)
        print(response)
app.py
# 略
    # インデックスの作成・検索
    open_search.createIndex(body=INDEX_BODY)
    open_search.getIndex()

index に Intelligent Rankingプラグインの追加(未実施)

  • 別途プラグインを入れる必要がある
    EC2 などで立ち上げると手順通りになるので動きそう
    マネージド型クラスターにもプラグインの追加項目があるので頑張ればプラグインを入れられるかもしれないが今回はスキップした
  • 参考サイト
    https://docs.aws.amazon.com/kendra/latest/dg/opensearch-rerank.html
  • スキップする事でサーチパイプラインにリランクを入れる事が出来なくなったので、
    ハイブリッド検索 → 検索結果受取 → リランク → リランク結果受取 の手順を取る事にした

  • 下記参考コードはエラーとなる
    image.png

参考コード
utils/opensearch.py
# 略
    # Intelligent Rankingプラグインの追加
    def putIntelligentRankingPlugin(self):
        setting_body = {
            "index": {
                "plugin" : {
                    "searchrelevance" : {
                        "result_transformer" : {
                            "kendra_intelligent_ranking": {
                                "order": 1,
                                "properties": {
                                    "title_field": "title",
                                    "body_field": "body"
                                }
                            }
                        }
                    }
                }
            }
        }
        response = self.client.indices.put_settings(index=self.index_name, body=setting_body)
        print(response)
app.py
# 略
    # index に Intelligent Rankingプラグインの追加
    open_search.putIntelligentRankingPlugin()

search pipeline の作成

  • 検索パイプラインを作成し、デフォルトパイプランに設定

image.png

参考コード
utils/body.py
HYBRID_SEARCH_PIPELINE_BODY = {
    "description": "Post processor for hybrid search",
    "phase_results_processors": [{
        "normalization-processor": {
            "normalization": {
                "technique": "min_max"
            },
            "combination": {
                "technique": "arithmetic_mean",
                "parameters": {
                    "weights": [
                        0.3,
                        0.7
                    ]
                }
            }
        }
    }]
}
utils/opensearch.py
# 略
# ハイブリッド検索のパイプラインを作成
def createHybridSearchPipeline(self, id: str, body: dict):
    # https://opensearch.org/docs/latest/search-plugins/hybrid-search/
    response = self.client.search_pipeline.put(id=id, body=body)
    print(response)

# デフォルトの検索パイプラインを設定
def setDefaultSearchPipeline(self, id : str):
    setting_body = {
        "index.search.default_pipeline" : id
    }
    response = self.client.indices.put_settings(index=self.index_name, body=setting_body)
    print(response)
app.py
# 略
    # ハイブリッド検索パイプラインの設定
    open_search.createHybridSearchPipeline(id="hybrid-search-pipeline", body=HYBRID_SEARCH_PIPELINE_BODY)
    open_search.setDefaultSearchPipeline(id="hybrid-search-pipeline")

デフォルトパイプラインを指定するとハイブリッド検索の結果が重複したり、スコアがマイナス値になるバグがあるので、設定しなくても良い。
(デフォルトパイプラインを指定しても検索時の params でパイプラインを指定すれば正しく動くので指定しても問題は無い)
https://forum.opensearch.org/t/negative-scores-and-duplicated-results-using-hybrid-search/18387/5

image.png

データ登録

  • Copilot で適当に童話の QA を作成
参考コード
[
    {
        "id": "0",
        "title": "「桃太郎」のQA",
        "body": "Q: 桃太郎が連れて行った動物は何ですか? A: 犬、猿、キジです。"
    },{
        "id": "1",
        "title": "「桃太郎」のQA",
        "body": "Q: 桃太郎が鬼退治に向かう際に持っていったものは何ですか? A: きびだんごです。"
    },{
        "id": "2",
        "title": "「桃太郎」のQA",
        "body": "Q: 桃太郎を育てたのは誰ですか? A: おじいさんとおばあさんです。"
    },{
        "id": "3",
        "title": "「桃太郎」のQA",
        "body": "Q: 桃太郎が鬼退治で得たものは何ですか? A: 鬼の財宝です。"
    },{
        "id": "4",
        "title": "「桃太郎」のQA",
        "body": "Q: 桃太郎が住んでいた場所はどこですか? A: どことも言われていませんが、一般的には日本のどこかとされています。"
    },{
        "id": "5",
        "title": "「つるのおんがえし」のQA",
        "body": "Q: 老夫婦の家にやってきた鶴は、どのような姿で現れましたか? A: 人間の女性の姿で現れました。"
    },{
        "id": "6",
        "title": "「つるのおんがえし」のQA",
        "body": "Q: 鶴が老夫婦に恩返しをするために作った織物は、どのような特徴がありましたか? A: 非常に美しく、売れば大金になるほどの価値がある織物でした。"
    },{
        "id": "7",
        "title": "「つるのおんがえし」のQA",
        "body": "Q: 老夫婦は鶴にどのような約束をしましたか? A: 織物を作る際に部屋を覗かないという約束をしました。"
    },{
        "id": "8",
        "title": "「つるのおんがえし」のQA",
        "body": "Q: 物語の結末で、老夫婦は鶴に対してどのような感情を抱きましたか? A: 鶴の真実を知り、感謝とともに寂しさを感じました。"
    },{
        "id": "9",
        "title": "「ウサギとカメ」のQA",
        "body": "Q: カメがレースに勝ったのはなぜですか? A: 一貫して努力し続けたからです。"
    },{
        "id": "10",
        "title": "「ウサギとカメ」のQA",
        "body": "Q: カメがウサギに勝てたのは、どのような性格の特徴のおかげですか? A: 忍耐強さと地道な努力がカメの勝利につながりました。"
    },{
        "id": "11",
        "title": "「ウサギとカメ」のQA",
        "body": "Q: ウサギとカメの物語から、私たちが学べる最も重要な教訓は何ですか? A: 自己過信せず、目標に向かって着実に進むことの大切さを学びます。"
    },{
        "id": "12",
        "title": "「ウサギとカメ」のQA",
        "body": "Q: ウサギはレース中に何をしていましたか? A: 自信過剰から、途中で休憩を取り、居眠りをしてしまいました。"
    },{
        "id": "13",
        "title": "「ウサギとカメ」のQA",
        "body": "Q: カメはどのようにしてウサギを追い越しましたか? A: ウサギが休憩している間に、一歩一歩確実に前進し続けました。"
    },{
        "id": "14",
        "title": "「干支」のQA",
        "body": "Q: なぜネズミは干支の中で最初なのですか? A: ネズミは賢く、牛に乗って川を渡り、ゴール直前で飛び降りて一番になりました。"
    },{
        "id": "15",
        "title": "「干支」のQA",
        "body": "Q: 牛はどのようにして二番目になったのですか? A: 牛は渡河中にネズミを背中に乗せていたため、ネズミに次いで二番目にゴールしました。"
    },{
        "id": "16",
        "title": "「干支」のQA",
        "body": "Q: 寅年に生まれた人の性格はどのように言われていますか? A: 寅年に生まれた人は、勇敢で自信があり、リーダーシップの資質があると言われています。"
    },{
        "id": "17",
        "title": "「干支」のQA",
        "body": "Q: 干支の中で唯一の幻想的な生き物は何ですか? A: 龍です。龍は干支の中で唯一の神話上の生き物であり、強さと幸運の象徴です。"
    },{
        "id": "18",
        "title": "「干支」のQA",
        "body": "Q: 干支の物語において、どの動物が最後にゴールしたのですか? A: 豚です。豚は途中で食事と昼寝を楽しんだため、最後にゴールしました。"
    }
]
  • データを登録
参考コード
utils/opensearch.py
# 略
# list のまま送信するとエラーになるので 1 件ずつ送信
    # インデックスデータの登録・更新
    def registerIndex(self, data_list: list = []):
        for data in data_list:
            response = self.client.index(index=self.index_name, body=data)
            print(response)
app.py
# 略
# "INDEX_DATA" は上記で作成したテストデータ
    # インデックスにデータを追加
    open_search.registerIndex(data_list=INDEX_DATA)
- ロールの信頼ポリシーが足りないと下記のようなエラーになるので、 エラーが出た時はポリシーを見直す

opensearch-から-bedrock-へアクセスするためのロール作成

opensearchpy.exceptions.TransportError: TransportError(
    500,
    'a_w_s_security_token_service_exception',
    'User: arn:aws:sts::XXXX:assumed-role/cp-sts-grant-role/swift-ap-northeast-1-prod-XXXX is not authorized to perform: sts:AssumeRole on resource:arn:aws:iam::XXXX:role/ingestion-pipeline-role(
    Service: AWSSecurityTokenService; Status Code: 403;
    Error Code: AccessDenied;
    Request ID: 2412c7e4-d41e-4439-a276-26c0419924db;
    Proxy: null)'
)

データ検索+リランク

  • リランクをする事でどちらもなんとなく結果が良くなったかな?という印象
  • セマンティック検索とハイブリッド検索を比べてどちらが良いかはちょっと分からなかった
    →評価ツールを使えばはっきりしそうだけど、それはまたの機会に
  • ただセマンティック検索の結果をリスコアした場合の方がスコアの変化が大きい?

Semantic検索

  • あまり分かっていなのですが、
    OpenSearch の「セマンティック検索」=「ベクトル検索」…?
    けど、公式ドキュメントでは上2つは分かれているし、良く分かりません
  • Azure では、テキスト検索、ベクトル検索、左2つの組み合わせ=ハイブリッド検索、リランク=セマンティックランカーみたいな認識なので、ちょっと混乱
参考コード
utils/body.py
def getSemanticQuery(query, model_id):
    return {
        "_source": {
            "exclude": ["body_embedding"]
        },
        "query": {
            "neural": {
                "body_embedding": {
                    "query_text": query,
                    "model_id": model_id,
                }
            }
        }
    }
utils/kendra.py
    # リスコアの実行
    # https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/kendra-ranking/client/rescore.html
    def rescore(self, q, documents, id: str = 'XXXXXXX'):
        response = self.kendraRankingClient.rescore(
            RescoreExecutionPlanId=id,
            SearchQuery=q,
            Documents=documents
        )
        print(json.dumps(response["ResultItems"], indent=2))
        return response["ResultItems"]
utils/opensearch.py
# 略
    # 検索の実行
    def searchIndex(self, body, params=None):
        response = self.client.search(
            index=self.index_name,
            body=body,
            params=params
        )
        print(json.dumps(response["hits"]["hits"], indent=2, ensure_ascii=False))
        documents = []
        for document in response["hits"]["hits"]:
            documents.append({
                'Id': document["_source"]["id"],
                'Body': document["_source"]["body"],
                'OriginalScore': document["_score"]
            })
        return response["hits"]["hits"], documents
app.py
# 略
import pprint
import operator
from utils.requestbody import getSemanticQuery

# opensearch の結果と rerank の結果を結合
def rerank(search_result, rerank_result):
    results = []
    for document in search_result:
            id = document["_source"]["id"]
            score = [rerank["Score"] for rerank in rerank_result if id == rerank["DocumentId"]]
            results.append({
                'Id': document["_source"]["id"],
                'Title': document["_source"]["title"],
                'Body': document["_source"]["body"],
                'OriginalScore': document["_score"],
                'ReScore': score[0]
            })
    pprint.pprint(sorted(results, key=operator.itemgetter('ReScore'), reverse=True))
    
    # インデックスを検索
    search_result, documents = open_search.searchIndex(getSemanticQuery(query="人として成長出来る話", model_id="モデルID"))
    rerank_result = kendra.rescore(q="人として成長出来る話", documents=documents)
    rerank(search_result, rerank_result)
  • リランクした結果
    • No13, No10 がスコアを上げたのでなんとなく良く…なった…?感じ

image.png

ハイブリッド検索

参考コード
utils/body.py
def getHybridSearchQuery(query, model_id):
    return {
        "_source": {
            "exclude": ["body_embedding"]
        },
        "query": {
            "hybrid": {
                "queries": [{
                    "match": {
                        "body": {
                            "query": query
                        }
                    }
                },{
                    "neural": {
                        "body_embedding": {
                            "query_text": query,
                            "model_id": model_id,
                            "k": 5
                        }
                    }
                }]
            }
        }
    }
utils/kendra.py
# セマンティック検索と同じなので省略
utils/opensearch.py
# セマンティック検索と同じなので省略
app.py
# 略
import pprint
import operator
from utils.requestbody import getSemanticQuery

# opensearch の結果と rerank の結果を結合
def rerank(search_result, rerank_result):
    results = []
    for document in search_result:
            id = document["_source"]["id"]
            score = [rerank["Score"] for rerank in rerank_result if id == rerank["DocumentId"]]
            results.append({
                'Id': document["_source"]["id"],
                'Title': document["_source"]["title"],
                'Body': document["_source"]["body"],
                'OriginalScore': document["_score"],
                'ReScore': score[0]
            })
    pprint.pprint(sorted(results, key=operator.itemgetter('ReScore'), reverse=True))
    
    # インデックスを検索
    search_result, documents = open_search.searchIndex(getHybridSearchQuery(query="人として成長出来る話", model_id="モデルID"), params={"search_pipeline": "hybrid-search-pipeline"})
    rerank_result = kendra.rescore(q="人として成長出来る話", documents=documents)
    rerank(search_result, rerank_result)
  • こちらに関してもリランクする事で下記のように結果が改善されたと思う
    • No17, No1 のスコアが低下
    • No10 が低い事以外は割と納得

image.png

2
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
0