追記
2024/10/02追記:PR#22950がマージされたことで、EnsebmleRetrieverの初期化時に、id_keyを渡すことで、該当するmetadataの異同によりDocumentの異同を判断することができるようになりました。id_keyを指定しない場合は、従来通りpage_contentに基づきます。 以下は記録として残しておきます。
概要
langchainのEnsembleRetrieverが異なるretrieverから検索の結果得られたDocumentをアンサンブルするとき、どういう条件でDocumentの異同を判断しているのか調べました。
結論としては、page_contentが同じとき、同一のDocumentとみなすようです。
検証
検証バージョンは以下です。
langchain 0.2.3
langchain-community 0.0.29
langchain-core 0.2.5
まず全く同じtextのリスト2つ用意し、それぞれでretrieverを作って、均等な重みでensembleしてみます。
from langchain_community.retrievers import BM25Retriever
from langchain.retrievers import EnsembleRetriever
from langchain_core.documents import Document
texts1 = ["I have an apple", "You are tall", "There is a dog"]
texts2 = ["I have an apple", "You are tall", "There is a dog"]
retriever1 = BM25Retriever.from_texts(texts1)
retriever2 = BM25Retriever.from_texts(texts2)
enssemble_retriever = EnsembleRetriever(retrievers=[retriever1, retriever2], weights=[0.5, 0.5])
docs = enssemble_retriever.get_relevant_documents("You have an apple")
docs
[Document(page_content='I have an apple'),
Document(page_content='You are tall'),
Document(page_content='There is a dog')]
retrieverに最終的に与えているDocumentのリストはのべ6つですが、返ってきた結果は3つのみです。
よって、同じ文章がまとめられていることがわかります。
次にtext2の一部をtext2と異なる文にしてみます。
texts1 = ["I have an apple", "You are tall", "There is a dog"]
texts2 = ["I have two apples", "You are tall", "There is a dog"]
retriever1 = BM25Retriever.from_texts(texts1)
retriever2 = BM25Retriever.from_texts(texts2)
enssemble_retriever = EnsembleRetriever(retrievers=[retriever1, retriever2], weights=[0.5, 0.5])
docs = enssemble_retriever.get_relevant_documents("You have an apple")
docs
Document(page_content='You are tall'),
Document(page_content='There is a dog'),
Document(page_content='I have an apple'),
Document(page_content='I have two apples')]
変更したテキストであるI have an appleとI have two applesが両方結果に登場するようになりました。よって、テキストの表層が異なるとアンサンブル時に別のDocumentとみなされるようです。
念の為、retrieverの生成時にDocumentを直接与える方法でも試してみます。
texts1 = ["I have an apple", "You are tall", "There is a dog"]
texts2 = ["I have an apple", "You are tall", "There is a dog"]
documents1 = [Document(page_content = t) for t in texts1]
documents2 = [Document(page_content = t) for t in texts2]
retriever1 = BM25Retriever.from_documents(documents1)
retriever2 = BM25Retriever.from_documents(documents2)
enssemble_retriever = EnsembleRetriever(retrievers=[retriever1, retriever2], weights=[0.5, 0.5])
docs = enssemble_retriever.get_relevant_documents("You have an apple")
docs
[Document(page_content='I have an apple'),
Document(page_content='You are tall'),
Document(page_content='There is a dog')]
Documentとして与えてもpage_contentが同じものは1つにまとめられるようです。
更に念の為、異なるmetadataを与えてみます。
texts1 = ["I have an apple", "You are tall", "There is a dog"]
texts2 = ["I have an apple", "You are tall", "There is a dog"]
documents1 = [Document(page_content = t, metadata = {"source": 1}) for t in texts1]
documents2 = [Document(page_content = t, metadata = {"source": 2}) for t in texts2]
retriever1 = BM25Retriever.from_documents(documents1)
retriever2 = BM25Retriever.from_documents(documents2)
enssemble_retriever = EnsembleRetriever(retrievers=[retriever1, retriever2], weights=[0.5, 0.5])
docs = enssemble_retriever.get_relevant_documents("You have an apple")
docs
[Document(page_content='I have an apple', metadata={'source': 1}),
Document(page_content='You are tall', metadata={'source': 1}),
Document(page_content='There is a dog', metadata={'source': 1})]
結果が3つであることから、metadataが異なってもpage_contentが同一であれば同じDocumentとみなされるようです。
ソースコードの確認
上記の挙動をソースコードから理解してみます。
EnsembleRetrieverではweighted_reciprocal_rankという関数がensembleの役割を担っています。
def weighted_reciprocal_rank(
self, doc_lists: List[List[Document]]
) -> List[Document]:
"""
Perform weighted Reciprocal Rank Fusion on multiple rank lists.
You can find more details about RRF here:
https://plg.uwaterloo.ca/~gvcormac/cormacksigir09-rrf.pdf
Args:
doc_lists: A list of rank lists, where each rank list contains unique items.
Returns:
list: The final aggregated list of items sorted by their weighted RRF
scores in descending order.
"""
if len(doc_lists) != len(self.weights):
raise ValueError(
"Number of rank lists must be equal to the number of weights."
)
# Associate each doc's content with its RRF score for later sorting by it
# Duplicated contents across retrievers are collapsed & scored cumulatively
rrf_score: Dict[str, float] = defaultdict(float)
for doc_list, weight in zip(doc_lists, self.weights):
for rank, doc in enumerate(doc_list, start=1):
rrf_score[doc.page_content] += weight / (rank + self.c)
# Docs are deduplicated by their contents then sorted by their scores
all_docs = chain.from_iterable(doc_lists)
sorted_docs = sorted(
unique_by_key(all_docs, lambda doc: doc.page_content),
reverse=True,
key=lambda doc: rrf_score[doc.page_content],
)
return sorted_docs
doc_listsは各retrieverの検索結果のリストです。よってList[List[Document]]という2重のリスト型になっています。
rrf_scoreという辞書を定義して、そこにfusionのスコア結果を保存しています。この辞書ではpage_contentがキーとなっているため、page_contentが同一のDocumentが同一視されることになります。
感想
page_contentを同一判定の基準にされると、厳密な運用を考えたときに、使いにくさがあります。
まず同じ文書でも、page_contentはretrieverごとに適したものに変えたいというケースがありえます。例えば、埋込検索では平文を使い、キーワード検索では自立語のみ抽出して分かち書きされた文章を対象としたい、などです。
あるいは、異なる文書であっても、偶然page_contentが一致することはあるかもしれません。チャンクを細かく分割した場合などは特に起こりがちでしょう。EnsembleRetrieverの実装方法では、異なる文書でたまたま同じpage_contentとなった場合には、そのpage_contentのscoreがfusionされることで、不当に高くなるおそれがあります。
また、単一のretrieverではpage_contentが同じものも異なるDocumentとして扱われますので、EnsembleRetrieverにした瞬間に異同の判定基準が変わってしまい、開発者にとって分かりにくい懸念もあります。
texts1 = ["I have an apple", "I have an apple", "You are tall", "There is a dog"]
retriever1 = BM25Retriever.from_texts(texts1)
docs = retriever1.get_relevant_documents("You have an apple")
docs
[Document(page_content='You are tall'),
Document(page_content='There is a dog'),
Document(page_content='I have an apple'),
Document(page_content='I have an apple')]
これらの懸念を考慮すると、page_contentだけでなく、metadataの特定のキーを基準としてDocumentの異同を判定できるような拡張がEnsembleRetrieverになされるともっと使いやすくなると個人的には思いました。
例えば以下のような感じです。id_keyを追加で引数として取れるようにして、id_keyが与えられた場合は、metadata[id_key]の値をfusionの基準とするようにしています。
def weighted_reciprocal_rank(
self, doc_lists: List[List[Document]], id_key: Optional[str] = None
) -> List[Document]:
"""
Perform weighted Reciprocal Rank Fusion on multiple rank lists.
You can find more details about RRF here:
https://plg.uwaterloo.ca/~gvcormac/cormacksigir09-rrf.pdf
Args:
doc_lists: A list of rank lists, where each rank list contains unique items.
Returns:
list: The final aggregated list of items sorted by their weighted RRF
scores in descending order.
"""
if len(doc_lists) != len(self.weights):
raise ValueError(
"Number of rank lists must be equal to the number of weights."
)
# Associate each doc's content with its RRF score for later sorting by it
# Duplicated contents across retrievers are collapsed & scored cumulatively
rrf_score: Dict[str, float] = defaultdict(float)
for doc_list, weight in zip(doc_lists, self.weights):
for rank, doc in enumerate(doc_list, start=1):
# id_keyが存在する場合はmetadataのid_keyを文書の同一性判定の基準とする
rrf_score[doc.metadata.get(id_key, doc.page_content)] += weight / (rank + self.c)
# Docs are deduplicated by their contents then sorted by their scores
all_docs = chain.from_iterable(doc_lists)
sorted_docs = sorted(
unique_by_key(all_docs, lambda doc: doc.metadata.get(id_key, doc.page_content)),
reverse=True,
key=lambda doc: rrf_score[doc.metadata.get(id_key, doc.page_content)],
)
return sorted_docs