0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

【python】rank_bm25のインスタンスを高速にマージ

Last updated at Posted at 2024-10-01

概要

BM25スコアを計算するためのpythonライブラリであるrank_bm25のインスタンスを高速にマージする方法を検討しました。

文書頻度辞書を用いて効率的にIDFを計算する工夫により、シンプルな方法と比べて、特にコーパスサイズが大きいときに、効率化できることを確認しました。

環境

  • M1 Mac, macOS 14.5
  • Python 3.11.2
  • rank-bm25 0.2.2

以下の仮想環境上で実行しています。

uv venv
source .venv/bin/activate
uv pip install rank-bm25

背景

複数のサブコーパスに分けられる大きな親コーパスのBM25辞書を作りたく、サブコーパスのBM25辞書はすでに作成済みである状況を考えます。ライブラリとしてrank_bm25のBM25Okapiクラスを使います。
もっともシンプルな方法は、親コーパスでBM25Okapiを初期化することですが、コーパスサイズが大きいと時間がかかります。サブコーパスのインスタンスがすでにあるならそれを使って高速化したいです。

TFIDFスコアを計算するインスタンスで同じことを以前やりました。このときは、IDF辞書を取り出して効率的にマージすることで、計算時間をコーパスサイズにもよりますが、1/25に減らすことができました。
BM25もTFIDFをパラメータで補正したものなので、TFIDFと同じく、IDF辞書をマージすれば高速化できそうです。

rank_bm25のソースコードを見ながらインスタンスをマージする方法を検討します。

方針

親コーパスのIDF辞書を得るには、サブコーパスの文書頻度辞書に基づいて、親コーパスの文書頻度辞書を作り、そこからIDFを計算するとよいです。

rank_bm25のBM25Okapiのソースコードを読むと_initialize関数で文書頻度(変数名がややこしいですがndという変数名)を算出し、ndを_calculate_idfに渡してidfを計算しています。idfはインスタンスのプラパティに保存されますが、ndは保存されません。

_calculate_idfではidfを計算していますが、idfが負の場合はepsilonという定数に置き換えられています。したがって、idf辞書のみからndを完璧に復元することはできず、関数をオーバーライドするか、近似計算をする必要があります。

近似計算をする場合は、IDFの計算式から、文書頻度の割合が0.5より大きい場合に負になることがわかるので、IDFがepsilonの単語に関しては文書頻度の割合を0.5とみなすという方法があります。

今回はより厳密にやることを目指して、関数をオーバーライドします。オーバーライドする対象はndを計算している_initialize関数です。この関数の中で、ndをプラパティに保存するようにします。

実装

_initializeのオーバーライド

BM25Okapi._initialize関数をオーバーライドします。
もとの関数との差はndをself.ndに保存する行が追加されただけです。

from rank_bm25 import BM25Okapi
def _initialize(self, corpus):
    nd = {}  # word -> number of documents with word
    num_doc = 0
    for document in corpus:
        self.doc_len.append(len(document))
        num_doc += len(document)

        frequencies = {}
        for word in document:
            if word not in frequencies:
                frequencies[word] = 0
            frequencies[word] += 1
        self.doc_freqs.append(frequencies)

        for word, freq in frequencies.items():
            try:
                nd[word]+=1
            except KeyError:
                nd[word] = 1

        self.corpus_size += 1

    self.avgdl = num_doc / self.corpus_size
    # ndを保存
    self.nd = nd
    return nd
BM25Okapi._initialize = _initialize

マージ関数

BM25Okapiインスタンスのリストを受け取って、マージされたものを返す関数merge_bm25を定義します。

def merge_bm25(bm25_list: list[BM25Okapi]) -> BM25Okapi:
    if not bm25_list:
        raise ValueError("The bm25_list is empty")

    # Combine all tokenized corpora
    merged_corpus_size = 0
    merged_doc_freqs = []
    merged_doc_len = []
    
    for bm25 in bm25_list:
        merged_corpus_size += bm25.corpus_size
        merged_doc_freqs += bm25.doc_freqs
        merged_doc_len += bm25.doc_len
    
    merged_nd = {}
    for bm25 in bm25_list:
        for word, nd in bm25.nd.items():
            merged_nd[word] = merged_nd.get(word, 0) + nd
            
    merged_bm25 = BM25Okapi(["a"]
                            , tokenizer = bm25_list[0].tokenizer
                            , k1 = bm25_list[0].k1
                            , b = bm25_list[0].b
                            , epsilon = bm25_list[0].epsilon)
    
    merged_bm25.corpus_size = merged_corpus_size
    merged_bm25.doc_freqs = merged_doc_freqs
    merged_bm25.doc_len = merged_doc_len
    merged_bm25.avgdl = sum(merged_doc_len) / merged_corpus_size
    merged_bm25.nd = merged_nd
    
    merged_bm25._calc_idf(merged_nd)
    
    return merged_bm25

ポイントは以下です。

  • IDFの計算に必要なコーパスサイズ等のパラメータも計算して、マージされたインスタンスに渡すようにしています。

    • corpus_sizeはint、doc_freqsとdoc_lenはlistなので、それぞれサブコーパスのインスタンスから取得したものを足すことで、merge後の値を得ることができます。
    • avgdlはmerge後のcorpus_sizeとdoc_lenに基づき計算します。
  • merge後のインスタンスの作成

    • 空のリストでは初期化できないので、ダミーのcorpusを渡します。corpusそのものはプラパティに保存されることはなく、corpus_sizeなどもあとから設定するのでこれで問題ありません。
    • k1, b, epsilonはbm25_listの最初の要素のものを渡しています。すべてのサブコーパスで同じパラメータが使われるという前提です。
    • マージされたndを_calc_idfにわたすことでidfが計算され、プラパティにもセットされます。

テスト

スコア計算

merge_bm25関数を使用して作ったインスタンスによるスコア計算が、親コーパスで直接初期化した場合と一致するかどうかチェックします。

サブコーパスと検証用のクエリを用意します。

corpus_a = [
    "The quick brown fox jumps over the lazy dog.",
    "A journey of a thousand miles begins with a single step.",
    "To be or not to be, that is the question."
]

corpus_b = [
    "In the middle of difficulty lies opportunity.",
    "What we think, we become.",
    "The only limit to our realization of tomorrow is our doubts of today."
]

def tokenize(text):
    return text.split()

tokenized_corpus_a = [tokenize(doc) for doc in corpus_a]
tokenized_corpus_b = [tokenize(doc) for doc in corpus_b]

query = "The only limit to our realization of tomorrow is our doubts of today."

tokenized_query = tokenize(query)

merge_bm25関数を使ってマージした場合のスコアを確認します。

bm25a = BM25Okapi(tokenized_corpus_a)
bm25b = BM25Okapi(tokenized_corpus_b)

bm25_list = [bm25a, bm25b]
merged_bm25 = merge_bm25(bm25_list)
merged_bm25.get_scores(tokenized_query)
array([ 0.5926355 ,  0.        ,  1.12937176,  0.        ,  0.        ,
       11.3174922 ])

次に、親コーパスで初期化した場合のスコアを確認します。

merged_bm25_2 = BM25Okapi(tokenized_corpus_a + tokenized_corpus_b)
merged_bm25_2.get_scores(tokenized_query)
array([ 0.5926355 ,  0.        ,  1.12937176,  0.        ,  0.        ,
       11.3174922 ])

一致しましたので、おそらくうまく動いていそうです。

念の為、コーパスサイズを増やして、一致することを確かめます。

ag_newsにもとづき1000件のサブコーパスを2つつくります。

from datasets import load_dataset

# Load the AG News dataset
dataset = load_dataset('ag_news')

# Define sub-corpus size
sub_corpus_size = 1000

# Extract the text data from the dataset
corpus_a = [item['text'] for item in dataset['train'].select(range(sub_corpus_size))]  # First sub_corpus_size items for corpus_a
corpus_b = [item['text'] for item in dataset['train'].select(range(sub_corpus_size, 2 * sub_corpus_size))]  # Next sub_corpus_size items for corpus_b

# Tokenize the corpora
def tokenize(text):
    return text.split()

tokenized_corpus_a = [tokenize(doc) for doc in corpus_a]
tokenized_corpus_b = [tokenize(doc) for doc in corpus_b]

query = "The only limit to our realization of tomorrow is our doubts of today."
tokenized_query = tokenize(query)

2つの方法でマージした場合の、検索結果を比較します。

bm25a = BM25Okapi(tokenized_corpus_a)
bm25b = BM25Okapi(tokenized_corpus_b)
bm25_list = [bm25a, bm25b]

merged_bm25_nd = merge_bm25(bm25_list)
merged_bm25_simple = BM25Okapi(tokenized_corpus_a+tokenized_corpus_b)

scores_nd = merged_bm25_nd.get_scores(tokenized_query)
scores_simple = merged_bm25_simple.get_scores(tokenized_query)

# Define the number of top ranks to consider
top_n = 1000

# Generate rankings (higher score -> higher rank)
rank_nd = np.argsort(scores_nd)[::-1][:top_n]
rank_simple = np.argsort(scores_simple)[::-1][:top_n]

# Check for exact matches in top ranks
exact_matches = np.sum(rank_nd == rank_simple)

print(f"Comparison of BM25 ranking (Top {top_n}):")
print(f"Exact matches in top ranks: {exact_matches} out of {top_n}")
Comparison of BM25 ranking (Top 1000):
Exact matches in top ranks: 1000 out of 1000

検索結果が一致していることがわかりました。

計算時間

計算時間を比較してみます。上記の少ないコーパスだと差がわかりにくいのでより大きいコーパスで実験します。

ag_newsから1000文書のサブコーパス2つを作ります。

from datasets import load_dataset

# Load the AG News dataset
dataset = load_dataset('ag_news')

# Define sub-corpus size
sub_corpus_size = 1000

# Extract the text data from the dataset
corpus_a = [item['text'] for item in dataset['train'].select(range(sub_corpus_size))]  # First sub_corpus_size items for corpus_a
corpus_b = [item['text'] for item in dataset['test'].select(range(sub_corpus_size))]   # First sub_corpus_size items for corpus_b

# Tokenize the corpora
def tokenize(text):
    return text.split()

tokenized_corpus_a = [tokenize(doc) for doc in corpus_a]
tokenized_corpus_b = [tokenize(doc) for doc in corpus_b]

query = "The only limit to our realization of tomorrow is our doubts of today."
tokenized_query = tokenize(query)

bm25a = BM25Okapi(tokenized_corpus_a)
bm25b = BM25Okapi(tokenized_corpus_b)
bm25_list = [bm25a, bm25b]

merge_bm25関数を使った場合と親コーパスを直接与えて初期化した場合で実行時間を比較します。

import time
import numpy as np

iterations = 100

def measure_execution_time(func, *args, iterations=10, **kwargs):
    times = []
    for _ in range(iterations):
        start_time = time.time()
        func(*args, **kwargs)
        end_time = time.time()
        times.append(end_time - start_time)
    
    average_time = np.mean(times)
    std_dev_time = np.std(times)
    
    return average_time, std_dev_time

# Measure time for merging BM25
average_time_merge, std_dev_time_merge = measure_execution_time(merge_bm25, bm25_list)
print(f"merge_bm25 function over {iterations} iterations: {average_time_merge} ± {std_dev_time_merge} seconds")

# Measure time for initializing BM25
average_time_init, std_dev_time_init = measure_execution_time(BM25Okapi, tokenized_corpus_a+tokenized_corpus_b)
print(f"BM25Okapi function over {iterations} iterations: {average_time_init} ± {std_dev_time_init} seconds")
merge_bm25 function over 100 iterations: 0.010911297798156739 ± 0.004137174651769788 seconds
BM25Okapi function over 100 iterations: 0.033219003677368165 ± 0.0008412621114826747 seconds

merge_bm25関数を使うことで1/3程度の時間短縮になっていそうです。

コードは省略しますが、コーパスサイズをさらに増やしたときの結果も以下に示します。

サブコーパスサイズが10000(先程の10倍)のときの結果が以下です。
時間短縮率が1/5ほどに改善しています。

merge_bm25 function over 10 iterations: 0.051044273376464847 ± 0.027766571104378794 seconds
BM25Okapi function over 10 iterations: 0.2597946643829346 ± 0.014289306550601151 seconds

サブコーパスサイズが50000のときの結果が以下です。
時間短縮率が1/8ほどになっています。コーパスサイズを増やすほど、本記事の工夫の効果が大きくなることがわかります。

merge_bm25 function over 10 iterations: 0.15507354736328124 ± 0.027483034591214345 seconds
BM25Okapi function over 10 iterations: 1.2384411811828613 ± 0.08893108260925459 seconds

おわりに

rank_bm25を効率的にマージする方法を検討しました。
結果として、コーパスサイズが50000件のときに、シンプルな方法と比べて1/8ほどに実行時間を短縮することができました。

rank_bm25の関数をオーバーライドすることになったのが課題といえば課題で、できればオーバーライドせずに実現する方法を探したいとも思っています。scikit-learnのTfidfVectorizerと同じAPIが使えるようにしたBM25Vectorizerを実装してくださった方がいますので、こちらを使うとよりスマートなマージが可能かもしれません。また機会があれば試してみたいと思います。

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?