LoginSignup
0
0

LangChain v0.352 × azure-search-documents v11.4.0: Azure AI Search Vectorstoreモジュールのクラス継承による最新のAzure SDK対応

Last updated at Posted at 2023-12-28

問題意識とやってみたこと

LangChain v0.352で、LangChain-Communityパッケージに属するAzure AI Searchのvectorstoreモジュールを使用するには、azure-search-documentsのv11.4.0(最新のstable版)ではなくv11.4.0b8という古いβバージョンを使用しないといけない。そのため、Langchainのモジュールを通してAzure AI Searchの操作を行うときに、少し古いβバージョーンのAPI仕様を意識しないといけない(ベクトル検索の設定など)。
わざわざ古い仕様を覚えるのは嫌なので、LangChain v0.352のAzure AI Searchのvectorstoreモジュールをazure-search-documentsのv11.4.0で動くようにクラスを継承してカスタムクラスを作る。

継承して作ったカスタムクラスのコード

全量

CustomAzureSearchクラスの定義
import os
from google.colab import userdata

from langchain.embeddings import OpenAIEmbeddings
from langchain.vectorstores.azuresearch import AzureSearch

from azure.core.credentials import AzureKeyCredential
from azure.core.exceptions import ResourceNotFoundError
from azure.identity import DefaultAzureCredential, InteractiveBrowserCredential
from azure.search.documents import SearchClient
from azure.search.documents.indexes import SearchIndexClient
from azure.search.documents.models import VectorizedQuery
from azure.search.documents.indexes.models import (
    SearchIndex,
    ScoringProfile,
    SearchableField,
    SearchField,
    SearchFieldDataType,
    SimpleField,
    TextWeights,
    CorsOptions,
)
from azure.search.documents.indexes.models import (
    VectorSearch,
    VectorSearchProfile,
    HnswAlgorithmConfiguration,  # HnswVectorSearchAlgorithmConfiguration is old
    SemanticPrioritizedFields,  # PrioritizedFields outdated
    SemanticSearch,  # SemanticSettings outdated
    SemanticConfiguration,
    SemanticField,
)

FIELDS_ID = "id"
FIELDS_CONTENT = "content"
FIELDS_CONTENT_VECTOR = "content_vector"
FIELDS_METADATA = "metadata"
MAX_UPLOAD_BATCH_SIZE = 1000

import numpy as np
import base64
import json
import uuid
from typing import (
    TYPE_CHECKING,
    Any,
    Callable,
    Dict,
    Iterable,
    List,
    Optional,
    Tuple,
    Type,
    Union,
)
from langchain_core.documents import Document
from langchain_core.embeddings import Embeddings

def _get_search_client(
    endpoint: str,
    key: str,
    index_name: str,
    semantic_configuration_name: Optional[str] = None,
    fields: Optional[List[SearchField]] = None,
    vector_search: Optional[VectorSearch] = None,
    semantic_search: Optional[SemanticSearch] = None,
    scoring_profiles: Optional[List[ScoringProfile]] = None,
    default_scoring_profile: Optional[str] = None,
    default_fields: Optional[List[SearchField]] = None,
    user_agent: Optional[str] = "langchain",
    cors_options: Optional[CorsOptions] = None,
) -> SearchClient:

    default_fields = default_fields or []
    if key is None:
        credential = DefaultAzureCredential()
    elif key.upper() == "INTERACTIVE":
        credential = InteractiveBrowserCredential()
        credential.get_token("https://search.azure.com/.default")
    else:
        credential = AzureKeyCredential(key)
    index_client: SearchIndexClient = SearchIndexClient(
        endpoint=endpoint, credential=credential, user_agent=user_agent
    )
    try:
        index_client.get_index(name=index_name)
    except ResourceNotFoundError:
        # Fields configuration
        if fields is not None:
            # Check mandatory fields
            fields_types = {f.name: f.type for f in fields}
            mandatory_fields = {df.name: df.type for df in default_fields}
            # Check for missing keys
            missing_fields = {
                key: mandatory_fields[key]
                for key, value in set(mandatory_fields.items())
                - set(fields_types.items())
            }
            if len(missing_fields) > 0:
                # Helper for formatting field information for each missing field.
                def fmt_err(x: str) -> str:
                    return (
                        f"{x} current type: '{fields_types.get(x, 'MISSING')}'. "
                        f"It has to be '{mandatory_fields.get(x)}' or you can point "
                        f"to a different '{mandatory_fields.get(x)}' field name by "
                        f"using the env variable 'AZURESEARCH_FIELDS_{x.upper()}'"
                    )

                error = "\n".join([fmt_err(x) for x in missing_fields])
                raise ValueError(
                    f"You need to specify at least the following fields "
                    f"{missing_fields} or provide alternative field names in the env "
                    f"variables.\n\n{error}"
                )
        else:
            fields = default_fields
        # Vector search configuration
        if vector_search is None:

            # >= 11.4.0:
            #   VectorSearch(algorithm_configuration) --> VectorSearch(profiles, algorithms)
            # HnswVectorSearchAlgorithmConfiguration --> HnswAlgorithmConfiguration
            vector_search = VectorSearch(
                profiles=[
                    VectorSearchProfile(
                        name="default",
                        algorithm_configuration_name="default",
                    )
                ],
                algorithms=[
                    HnswAlgorithmConfiguration(
                        name="default",
                        kind="hnsw",
                        parameters={  # type: ignore
                            "m": 4,
                            "efConstruction": 400,
                            "efSearch": 500,
                            "metric": "cosine",
                        },
                    )
                ]
            )

        # Create the semantic settings with the configuration
        if semantic_search is None and semantic_configuration_name is not None:

            # <=11.4.0: SemanticSettings --> SemanticSearch
            # PrioritizedFields(prioritized_content_fields)
            #   --> SemanticPrioritizedFields(content_fields)
            semantic_search = SemanticSearch(
                configurations=[
                    SemanticConfiguration(
                        name=semantic_configuration_name,
                        prioritized_fields=SemanticPrioritizedFields(
                            content_fields=[
                                SemanticField(field_name=FIELDS_CONTENT)
                            ],
                        ),
                    )
                ]
            )

        # Create the search index with the semantic settings and vector search
        index = SearchIndex(
            name=index_name,
            fields=fields,
            vector_search=vector_search,
            semantic_search=semantic_search,
            scoring_profiles=scoring_profiles,
            default_scoring_profile=default_scoring_profile,
            cors_options=cors_options,
        )
        index_client.create_index(index)
    # Create the search client
    return SearchClient(
        endpoint=endpoint,
        index_name=index_name,
        credential=credential,
        user_agent=user_agent,
    )

class CustomAzureSearch(AzureSearch):
    def __init__(
        self,
        azure_search_endpoint: str,
        azure_search_key: str,
        index_name: str,
        embedding_function: Callable,
        search_type: str = "hybrid",
        semantic_configuration_name: Optional[str] = None,
        semantic_query_language: str = "en-us",
        fields: Optional[List[SearchField]] = None,
        vector_search: Optional[VectorSearch] = None,
        semantic_search: Optional[SemanticSearch] = None,
        scoring_profiles: Optional[List[ScoringProfile]] = None,
        default_scoring_profile: Optional[str] = None,
        cors_options: Optional[CorsOptions] = None,
        **kwargs: Any,
    ):
        """Initialize with necessary components."""
        # Initialize base class
        self.embedding_function = embedding_function
        default_fields = [
            SimpleField(
                name=FIELDS_ID,
                type=SearchFieldDataType.String,
                key=True,
                filterable=True,
            ),
            SearchableField(
                name=FIELDS_CONTENT,
                type=SearchFieldDataType.String,
            ),
            SearchField(
                name=FIELDS_CONTENT_VECTOR,
                type=SearchFieldDataType.Collection(SearchFieldDataType.Single),
                searchable=True,
                vector_search_dimensions=len(embedding_function("Text")),
                vector_search_profile_name="default",
            ),
            SearchableField(
                name=FIELDS_METADATA,
                type=SearchFieldDataType.String,
            ),
        ]
        user_agent = "langchain"
        if "user_agent" in kwargs and kwargs["user_agent"]:
            user_agent += " " + kwargs["user_agent"]
        self.client = _get_search_client(
            azure_search_endpoint,
            azure_search_key,
            index_name,
            semantic_configuration_name=semantic_configuration_name,
            fields=fields,
            vector_search=vector_search,
            semantic_search=semantic_search,
            scoring_profiles=scoring_profiles,
            default_scoring_profile=default_scoring_profile,
            default_fields=default_fields,
            user_agent=user_agent,
            cors_options=cors_options,
        )
        self.search_type = search_type
        self.semantic_configuration_name = semantic_configuration_name
        self.semantic_query_language = semantic_query_language
        self.fields = fields if fields else default_fields

    def vector_search_with_score(
        self, query: str, k: int = 4, filters: Optional[str] = None
    ) -> List[Tuple[Document, float]]:
        """Return docs most similar to query.

        Args:
            query: Text to look up documents similar to.
            k: Number of Documents to return. Defaults to 4.

        Returns:
            List of Documents most similar to the query and score for each
        """
        results = self.client.search(
            search_text="",
            vector_queries=[
                VectorizedQuery(
                    vector=np.array(
                        self.embedding_function(query), dtype=np.float32
                    ).tolist(),
                    k_nearest_neighbors=k,
                    fields=FIELDS_CONTENT_VECTOR,
                )
            ],
            filter=filters,
        )
        # Convert results to Document objects
        docs = [
            (
                Document(
                    page_content=result.pop(FIELDS_CONTENT),
                    metadata={
                        **(
                            {FIELDS_ID: result.pop(FIELDS_ID)}
                            if FIELDS_ID in result
                            else {}
                        ),
                        **(
                            json.loads(result[FIELDS_METADATA])
                            if FIELDS_METADATA in result
                            else {
                                k: v
                                for k, v in result.items()
                                if k != FIELDS_CONTENT_VECTOR
                            }
                        ),
                    },
                ),
                float(result["@search.score"]),
            )
            for result in results
        ]
        return docs

    def hybrid_search_with_score(
        self, query: str, k: int = 4, filters: Optional[str] = None
    ) -> List[Tuple[Document, float]]:
        """Return docs most similar to query with an hybrid query.

        Args:
            query: Text to look up documents similar to.
            k: Number of Documents to return. Defaults to 4.

        Returns:
            List of Documents most similar to the query and score for each
        """
        results = self.client.search(
            search_text=query,
            vector_queries=[
                VectorizedQuery(
                    vector=np.array(
                        self.embedding_function(query), dtype=np.float32
                    ).tolist(),
                    k_nearest_neighbors=k,
                    fields=FIELDS_CONTENT_VECTOR,
                )
            ],
            filter=filters,
            top=k,
        )
        # Convert results to Document objects
        docs = [
            (
                Document(
                    page_content=result.pop(FIELDS_CONTENT),
                    metadata={
                        **(
                            {FIELDS_ID: result.pop(FIELDS_ID)}
                            if FIELDS_ID in result
                            else {}
                        ),
                        **(
                            json.loads(result[FIELDS_METADATA])
                            if FIELDS_METADATA in result
                            else {
                                k: v
                                for k, v in result.items()
                                if k != FIELDS_CONTENT_VECTOR
                            }
                        ),
                    },
                ),
                float(result["@search.score"]),
            )
            for result in results
        ]
        return docs

    def semantic_hybrid_search_with_score_and_rerank(
        self, query: str, k: int = 4, filters: Optional[str] = None
    ) -> List[Tuple[Document, float, float]]:
        """Return docs most similar to query with an hybrid query.

        Args:
            query: Text to look up documents similar to.
            k: Number of Documents to return. Defaults to 4.

        Returns:
            List of Documents most similar to the query and score for each
        """
        results = self.client.search(
            search_text=query,
            vector_queries=[
                VectorizedQuery(
                    vector=np.array(
                        self.embedding_function(query), dtype=np.float32
                    ).tolist(),
                    k_nearest_neighbors=50,
                    fields=FIELDS_CONTENT_VECTOR,
                )
            ],
            filter=filters,
            query_type="semantic",
            query_language=self.semantic_query_language,
            semantic_configuration_name=self.semantic_configuration_name,
            query_caption="extractive",
            query_answer="extractive",
            top=k,
        )
        # Get Semantic Answers
        semantic_answers = results.get_answers() or []
        semantic_answers_dict: Dict = {}
        for semantic_answer in semantic_answers:
            semantic_answers_dict[semantic_answer.key] = {
                "text": semantic_answer.text,
                "highlights": semantic_answer.highlights,
            }
        # Convert results to Document objects
        docs = [
            (
                Document(
                    page_content=result.pop(FIELDS_CONTENT),
                    metadata={
                        **(
                            {FIELDS_ID: result.pop(FIELDS_ID)}
                            if FIELDS_ID in result
                            else {}
                        ),
                        **(
                            json.loads(result[FIELDS_METADATA])
                            if FIELDS_METADATA in result
                            else {
                                k: v
                                for k, v in result.items()
                                if k != FIELDS_CONTENT_VECTOR
                            }
                        ),
                        **{
                            "captions": {
                                "text": result.get("@search.captions", [{}])[0].text,
                                "highlights": result.get("@search.captions", [{}])[
                                    0
                                ].highlights,
                            }
                            if result.get("@search.captions")
                            else {},
                            "answers": semantic_answers_dict.get(
                                json.loads(result["metadata"]).get("key"), ""
                            ),
                        },
                    },
                ),
                float(result["@search.score"]),
                float(result["@search.reranker_score"]),
            )
            for result in results
        ]
        return docs

主要な変更点

主な変化点は、ベクトル検索を行うFieldの設定方法と、VectorSearchの定義方法と、ベクトル検索の仕方にあります。すべてベクトル検索周りですね。

VectorSearchの定義方法(algorithmsだけでなくprofilesも引数に取るようになった)

vector_search = VectorSearch(
    profiles=[
        VectorSearchProfile(
            name="default",
            algorithm_configuration_name="default",
        )
    ],
    algorithms=[
        HnswAlgorithmConfiguration(
            name="default",
            kind="hnsw",
            parameters={  # type: ignore
                "m": 4,
                "efConstruction": 400,
                "efSearch": 500,
                "metric": "cosine",
            },
        )
    ]
)

ベクトル検索を行うFieldの設定方法(vector_search_profile_nameを新たな引数に持つようになった。これには、VectorSearchの定義で引数に渡したprofileの名前を渡す)

SearchField(
    name=FIELDS_CONTENT_VECTOR,
    type=SearchFieldDataType.Collection(SearchFieldDataType.Single),
    searchable=True,
    vector_search_dimensions=len(embedding_function("Text")),
    vector_search_profile_name="default",
),

ベクトル検索の仕方(searchメソッドの引数の書き方が変わった)

results = self.client.search(
    search_text=query,
    vector_queries=[
        VectorizedQuery(
            vector=np.array(
                self.embedding_function(query), dtype=np.float32
            ).tolist(),
            k_nearest_neighbors=k,
            fields=FIELDS_CONTENT_VECTOR,
        )
    ],
    filter=filters,
    top=k,
)

動作確認

LangChainのAzure AI Search VectorSotreのチュートリアルを参考に動作確認を行います。

OpenAIやAzure AI SearchのAPIKEYやエンドポイントの設定

Google colabolartoryのシークレットという機能を利用しています(userdata.getのところ)。

os.environ["OPENAI_API_KEY"] = userdata.get('OPENAI_API_BASE')
model: str = "text-embedding-ada-002"
vector_store_address: str = userdata.get('AZURE_SEARCH_SERVICE_ENDPOINT')
vector_store_password: str = userdata.get('AZURE_SEARCH_API_KEY')

インデックスの作成

embeddings: OpenAIEmbeddings = OpenAIEmbeddings(deployment=model, chunk_size=1)
embedding_function = embeddings.embed_query


fields = [
    SimpleField(
        name="id",
        type=SearchFieldDataType.String,
        key=True,
        filterable=True,
    ),
    SearchableField(
        name="content",
        type=SearchFieldDataType.String,
        searchable=True,
    ),
    SearchField(
        name="content_vector",
        type=SearchFieldDataType.Collection(SearchFieldDataType.Single),
        searchable=True,
        vector_search_dimensions=len(embedding_function("Text")),
        vector_search_profile_name="default",
    ),
    SearchableField(
        name="metadata",
        type=SearchFieldDataType.String,
        searchable=True,
    ),
    # Additional field to store the title
    SearchableField(
        name="title",
        type=SearchFieldDataType.String,
        searchable=True,
    ),
    # Additional field for filtering on document source
    SimpleField(
        name="source",
        type=SearchFieldDataType.String,
        filterable=True,
    ),
]

index_name: str = "langchain-vector-demo-custom"

vector_store: AzureSearch = CustomAzureSearch(
    azure_search_endpoint=vector_store_address,
    azure_search_key=vector_store_password,
    index_name=index_name,
    embedding_function=embedding_function,
    fields=fields,
)

ドキュメントのアップロード

# Data in the metadata dictionary with a corresponding field in the index will be added to the index
# In this example, the metadata dictionary contains a title, a source and a random field
# The title and the source will be added to the index as separate fields, but the random won't. (as it is not defined in the fields list)
# The random field will be only stored in the metadata field
vector_store.add_texts(
    ["Test 1", "Test 2", "Test 3"],
    [
        {"title": "Title 1", "source": "A", "random": "10290"},
        {"title": "Title 2", "source": "A", "random": "48392"},
        {"title": "Title 3", "source": "B", "random": "32893"},
    ],
)

検索

例1 ハイブリッド検索

res = vector_store.similarity_search(query="Test 3 source1", k=3, search_type="hybrid")
res

ハイブリッド検索の出力

[Document(page_content='Test 3', metadata={'id': 'NWE2OWJjNTQtMDM3MS00ZDg5LTgwMGYtN2UwMzNhZjhmNjlh', 'title': 'Title 3', 'source': 'B', 'random': '32893'}),
 Document(page_content='Test 1', metadata={'id': 'MTlkYmM3MTMtZDYyZS00Y2ExLThmMzctYTY2NGM2ODFiMmVh', 'title': 'Title 1', 'source': 'A', 'random': '10290'}),
 Document(page_content='Test 2', metadata={'id': 'N2RjNDc1YmEtZjc2MC00N2YwLTljNzItYmRlZDcyMWJkODk4', 'title': 'Title 2', 'source': 'A', 'random': '48392'})]

例2 フィルターありハイブリッド検索

フィルターありハイブリッド検索

res = vector_store.similarity_search(
    query="Test 3 source1", k=3, search_type="hybrid", filters="source eq 'A'"
)
res

フィルターありハイブリッド検索の出力

[Document(page_content='Test 1', metadata={'id': 'MTlkYmM3MTMtZDYyZS00Y2ExLThmMzctYTY2NGM2ODFiMmVh', 'title': 'Title 1', 'source': 'A', 'random': '10290'}),
 Document(page_content='Test 2', metadata={'id': 'N2RjNDc1YmEtZjc2MC00N2YwLTljNzItYmRlZDcyMWJkODk4', 'title': 'Title 2', 'source': 'A', 'random': '48392'})]

まとめと所感

最新のazure-search-moduleとLangChainとカスタムしたコードで、LangChainの公式のチュートリアルのコードが動くところまで確かめました。Semantic検索の部分は動かしていないのでもしかしたらうまくいかないかもです。
以下感想です。
LangChainのAzure AI SearchのVectorStoreの実装を見てみて、Azure AI Searchの機能、特に全文検索の機能をを活かしきれていないなと感じました。検索エンジンではなくて、VectorStoreなのだから、全文検索の機能を活かせなてなくてもその名に偽りがないとは思います。しかし、それでは他のベクトルDBに比べてAzure AI Searchを使う旨みが、機能面ではSemantic検索やハイブリッド検索を簡単に実施できることぐらいしか無さそう、という印象を持ちました。
Azure AI Searchの機能をもっと活かすなら、既存のLangChainの実装を参考に独自のVectorStoreを作ってもよいかなと思いました。個人的には、Documentクラスの扱い方など参考にしたいです。

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0