はじめに
面白そうだからなんていう突発的な理由で全文検索を(AIを頼りつつ)できる限り自力で実装していこうという記事です。
本当にわざわざ全文検索エンジンを自作してまでしたいことがあるわけでもなく、ただ実装してみたかっただけです... (MeilisearchとかElasticSearchで十分だし...)
使った手法とか
(全体的に調べて必死に書いてるので抜けてる部分はあるかもしれません)
転置インデックス
転置インデックスは特定の単語から結果を逆引きする方法です。文字に特定の単語が含まれているかで判断することもできますが、転置インデックスを使うことでより効率的に特定の単語が含まれる文書を取得することができます。
本当は実際のエンジンを参考にしたりして最強の全文検索を作る!ってしても良かったのかもしれませんが、そこまでの技術力と気力がないのでやめました🫠
TF-IDF
TF-IDFはその名前の通り、TF (単語が出現する頻度)とIDF (単語のレア度)を掛け合わせて (TFxIDF)スコアを導き出す指標です。
Okapi BM25
BM25 (Best Matching 25)はTF-IDFの発展形です。
具体的に異なる点としては、TF-IDFとは異なって一定回数以上その単語が出現すると、その単語のスコア貢献度が頭打ちになるという点です。これにより、特定の単語 (例えばパンケーキ)を100回繰り返しても適正な範囲に評価を抑えられます。
また、文書の長さが平均に対してどうかを考慮した上で評価されるので、短い文書の1語と長い文書の1語では前者のほうがその1語についての文書である可能性が高いためより高く評価されます。
転置インデックスを実装してみる
とりあえず、最初に転置インデックスを作成するコードを書いてみることにします。
ここでは形態素解析にfugashiを使うことにします。
インデックスに追加するだけならそこまで難しくもなくて、これだけでも追加できます。
def add_index(self, text: str) -> None:
with self.__lock:
texts_id = len(self.__results)
self.__results[len(self.__results)] = text
for word in self.__tagger(text):
orig_text = word.surface
fmt_text = word.feature.lemma
pos1 = word.feature.pos1
if (
pos1 == "名詞"
or pos1 == "動詞"
or pos1 == "形容詞"
or pos1 == "代名詞"
):
text_to_index = fmt_text if fmt_text else orig_text # レンマがない時もあるのでその時は元のテキストを使う
self.__index.setdefault(text_to_index, set()).add(texts_id)
self.__taggerはfugashi.Tagger()のインスタンスが格納されている変数です。とりあえずそのあたりの説明は省略します。
pos1 (ワードの種類)が名詞、動詞、形容詞、代名詞のいずれかだった場合はindexに追加するようなコードです、とても簡単ですね。
検索する
これも多分そこまで実装は難しくないです。 (ただ個人的にはこっちのほうがインデックスに追加するよりは難しかった)
def search(self, text: str) -> list[str]:
targets = []
final_result = []
for word in self.__tagger(text):
orig_text = word.surface
fmt_text = word.feature.lemma
pos1 = word.feature.pos1
if (
pos1 == "名詞"
or pos1 == "動詞"
or pos1 == "形容詞"
or pos1 == "代名詞"
):
text_to_index = fmt_text if fmt_text else orig_text
targets.append(text_to_index)
if not targets:
return []
sets_to_compare = []
for k in targets:
found_items = self.__index.get(k, set())
sets_to_compare.append(set(found_items))
for id in set.intersection(*sets_to_compare):
final_result.append(self.__results[id])
return final_result
これもクエリから名詞と動詞と形容詞、代名詞を取り出してその文字列たちをindexにひたすらgetしてそれを使っている文書のidを取ってまとめて返すだけです。多分簡単だと思います、きっと。
取り出していない文字列があるのはなぜ?
どうしてそれらだけをインデックスするのかというと、例えば
"PythonとRustが苦手です。"
これをmecabで解析してみるとこうなります。
Python Python Python Python 名詞-普通名詞-一般 0
と ト ト と 助詞-格助詞
Rust Rust Rust Rust 名詞-普通名詞-一般 0
が ガ ガ が 助詞-格助詞
苦手 ニガテ ニガテ 苦手 名詞-普通名詞-形状詞可能 0,3
です デス デス です 助動詞 助動詞-デス 終止形-一般
。 。 補助記号-句点
EOS
| 表層形 | 読み | カナ | 原形 | 品詞 | 活用型 | 活用形 | アクセント型 |
|---|---|---|---|---|---|---|---|
| Python | Python | Python | Python | 名詞-普通名詞-一般 | 0 | ||
| と | ト | ト | と | 助詞-格助詞 | |||
| Rust | Rust | Rust | Rust | 名詞-普通名詞-一般 | 0 | ||
| が | ガ | ガ | が | 助詞-格助詞 | |||
| 苦手 | ニガテ | ニガテ | 苦手 | 名詞-普通名詞-形状詞可能 | 0,3 | ||
| です | デス | デス | です | 助動詞 | 助動詞-デス | 終止形-一般 | |
| 。 | 。 | 補助記号-句点 |
もし仮に自立語以外もインデックスしてしまうとどうなってしまうのでしょうか?
...そうです、検索結果がとんでもないことになります。
ちゃんと解説すると、付属語までインデックスしてしまうとそれによってインデックスが肥大化して、検索結果がゴミだらけになってしまうというのが理由です。例えば、「と」とか、「が」が入っているからという理由だけで全ての文書を渡されても検索の意味がありませんし、困りますよね。「何について書かれたか」を特定できない言葉を捨てることで、検索の精度を犠牲にせず肥大化の解消も同時に行なっているのです。
実際に検索してみる
ここまでで書いたコードの全体像を書いてみると、こうなります。
import threading
from fugashi import Tagger
class SimpleFTS:
def __init__(self) -> None:
self.__tagger = Tagger()
self.__index: dict[str, set[int]] = {}
self.__results = {}
self.__lock = threading.RLock()
def add_index(self, text: str) -> None:
with self.__lock:
texts_id = len(self.__results)
self.__results[len(self.__results)] = text
for word in self.__tagger(text):
orig_text = word.surface
fmt_text = word.feature.lemma
pos1 = word.feature.pos1
if (
pos1 == "名詞"
or pos1 == "動詞"
or pos1 == "形容詞"
or pos1 == "代名詞"
):
text_to_index = fmt_text if fmt_text else orig_text
self.__index.setdefault(text_to_index, set()).add(texts_id)
def search(self, text: str) -> list[str]:
targets = []
final_result = []
for word in self.__tagger(text):
orig_text = word.surface
fmt_text = word.feature.lemma
pos1 = word.feature.pos1
if (
pos1 == "名詞"
or pos1 == "動詞"
or pos1 == "形容詞"
or pos1 == "代名詞"
):
text_to_index = fmt_text if fmt_text else orig_text
targets.append(text_to_index)
if not targets:
return []
sets_to_compare = []
for k in targets:
found_items = self.__index.get(k, set())
sets_to_compare.append(set(found_items))
for id in set.intersection(*sets_to_compare):
final_result.append(self.__results[id])
return final_result
次に、この全文検索に文書を追加してみましょう (文書の中身は適当なので気にしないでください):
fts = SimpleFTS()
fts.add_index("織田信長は戦国時代の風雲児。信長は桶狭間で今川義元を破り、天下布武を掲げた。信長亡き後は秀吉が継いだ。")
fts.add_index("豊臣秀吉は織田信長に仕え、信長の死後に天下を統一した。秀吉は大阪城を築き、太閤と呼ばれた。")
fts.add_index("徳川家康は江戸幕府を開いた。家康はかつて織田信長と同盟を結び、長い忍耐の末に天下を掴んだ。")
fts.add_index("猫はとても可愛いです")
そして、検索してみます:
results = fts.search("織田信長")
for i in range(len(results)):
print(f"結果{i}: {results[i]}")
# 結果0: 織田信長は戦国時代の風雲児。信長は桶狭間で今川義元を破り、天下布武を掲げた。信長亡き後は秀吉が継いだ。
# 結果1: 豊臣秀吉は織田信長に仕え、信長の死後に天下を統一した。秀吉は大阪城を築き、太閤と呼ばれた。
# 結果2: 徳川家康は江戸幕府を開いた。家康はかつて織田信長と同盟を結び、長い忍耐の末に天下を掴んだ。
しかし、現在はスコアリングを実装していないので、例えば順序を変えてみると:
結果0: 徳川家康は江戸幕府を開いた。家康はかつて織田信長と同盟を結び、長い忍耐の末に天下を掴んだ。
結果1: 織田信長は戦国時代の風雲児。信長は桶狭間で今川義元を破り、天下布武を掲げた。信長亡き後は秀吉が継いだ。
結果2: 豊臣秀吉は織田信長に仕え、信長の死後に天下を統一した。秀吉は大阪城を築き、太閤と呼ばれた。
この通り、『織田信長』についてメインで言及している文書よりも徳川家康についての文書が先に出てきてしまいました。検索エンジンの役割はこれでも果たせますが、少し不親切ですよね。
次に、簡単なスコアリングを実装してみようと思います。
スコアリングを実装してみる
ちゃんとBM25を使ったスコアリングを実装してみる前に、その単語が文書中にある数だけど基準にしたスコアリングを実装してみます。
def __update_count(self, data_list: set[tuple[int, int]], target_id: int) -> set[tuple[int, int]]:
d = dict(data_list)
d[target_id] = d.get(target_id, 0) + 1
return set(d.items())
↑上の関数はカウントを増加させるための補助関数です。私はこれがあると便利になると思い込んでいます。
次に、前述した__update_count関数を使うようにadd_indexも書き換えます。
def add_index(self, text: str) -> None:
with self.__lock:
texts_id = len(self.__results)
self.__results[len(self.__results)] = text
for word in self.__tagger(text):
orig_text = word.surface
fmt_text = word.feature.lemma
pos1 = word.feature.pos1
if (
pos1 == "名詞"
or pos1 == "動詞"
or pos1 == "形容詞"
or pos1 == "代名詞"
):
text_to_index = fmt_text if fmt_text else orig_text
- self.__index.setdefault(text_to_index, set()).add(texts_id)
+ self.__index[text_to_index] = self.__update_count(self.__index.setdefault(text_to_index, set()), texts_id)
次に、検索用のコードも置き換えていきます。
def search(self, text: str) -> list[str]:
targets = []
- final_result = []
+ counts = []
+ result = []
for word in self.__tagger(text):
orig_text = word.surface
fmt_text = word.feature.lemma
pos1 = word.feature.pos1
if (
pos1 == "名詞"
or pos1 == "動詞"
or pos1 == "形容詞"
or pos1 == "代名詞"
):
text_to_index = fmt_text if fmt_text else orig_text
targets.append(text_to_index)
if not targets:
return []
+ common_ids = set.intersection(*[{t[0] for t in self.__index.get(k, set())} for k in targets])
- sets_to_compare = []
- for k in targets:
- found_items = self.__index.get(k, set())
- sets_to_compare.append(set(found_items))
- for id in set.intersection(*sets_to_compare):
+ for id, count in {t for t in self.__index.get(targets[0], set()) if t[0] in common_ids}:
+ counts.append(count)
+ result.append(self.__results[id])
- final_result.append(self.__results[id])
+ sorted_pairs = sorted(zip(counts, result), reverse=True)
+ final_result = [result for count, result in sorted_pairs]
return final_result
全体コード
import threading
from fugashi import Tagger
class SimpleFTS:
def __init__(self) -> None:
self.__tagger = Tagger()
self.__index: dict[str, set[int]] = {}
self.__results = {}
self.__lock = threading.RLock()
def __update_count(self, data_list: set[tuple[int, int]], target_id: int) -> set[tuple[int, int]]:
d = dict(data_list)
d[target_id] = d.get(target_id, 0) + 1
return set(d.items())
def add_index(self, text: str) -> None:
with self.__lock:
texts_id = len(self.__results)
self.__results[len(self.__results)] = text
for word in self.__tagger(text):
orig_text = word.surface
fmt_text = word.feature.lemma
pos1 = word.feature.pos1
if (
pos1 == "名詞"
or pos1 == "動詞"
or pos1 == "形容詞"
or pos1 == "代名詞"
):
text_to_index = fmt_text if fmt_text else orig_text
self.__index[text_to_index] = self.__update_count(self.__index.setdefault(text_to_index, set()), texts_id)
def search(self, text: str) -> list[str]:
targets = []
counts = []
result = []
for word in self.__tagger(text):
orig_text = word.surface
fmt_text = word.feature.lemma
pos1 = word.feature.pos1
if (
pos1 == "名詞"
or pos1 == "動詞"
or pos1 == "形容詞"
or pos1 == "代名詞"
):
text_to_index = fmt_text if fmt_text else orig_text
targets.append(text_to_index)
if not targets:
return []
common_ids = set.intersection(*[{t[0] for t in self.__index.get(k, set())} for k in targets])
for id, count in {t for t in self.__index.get(targets[0], set()) if t[0] in common_ids}:
counts.append(count)
result.append(self.__results[id])
sorted_pairs = sorted(zip(counts, result), reverse=True)
final_result = [result for _, result in sorted_pairs]
return final_result
では、実行結果は変わるでしょう...
結果0: 徳川家康は江戸幕府を開いた。家康はかつて織田信長と同盟を結び、長い忍耐の末に天下を掴んだ。
結果1: 織田信長は戦国時代の風雲児。信長は桶狭間で今川義元を破り、天下布武を掲げた。信長亡き後は秀吉が継いだ。
結果2: 豊臣秀吉は織田信長に仕え、信長の死後に天下を統一した。秀吉は大阪城を築き、太閤と呼ばれた。
...か?
...。
...全く変わっていませんね!
とにかく、出現数ベースのソートだと不完全だということがわかります (ただし、私の実装に問題があるだけな可能性の方が高い)。なので、BM25を検索エンジンに実装していきます。
BM25の実装
ここで、全文検索らしくちゃんとしたスコアリングを導入します。それがBM25です。
BM25は、以下のようなスコアリング方法を使用しています。
- 文章中に同じ単語が含まれる回数分スコアを上げる (TF)
- ただし単語がスコアに与えられる影響には上限がある
- どの文章にも出てくる単語ほど、スコアへ与える影響は小さくなる (IDF)
- 単語の長さによる有利不利をなくす
数式はこのようになるらしいです (理解はしてない)
$$
\text{Score}(D, Q) = \sum_{i=1}^{n} \text{IDF}(q_i) \cdot \frac{f(q_i, D) \cdot (k_1 + 1)}{f(q_i, D) + k_1 \cdot \left(1 - b + b \cdot \frac{|D|}{\text{avgdl}}\right)}
$$
- $f(q_i, D)$ : 文書 $D$ における単語
- $q_i$ の出現回数(TF)
- $|D|$ : 文書 $D$ の長さ(総単語数)
- $avgdl$ : 全文書の平均の長さ
- $k_1, b$ : チューニングパラメータ(一般的に $k_1 = 1.2 \sim 2.0$, $b = 0.75$)
今回のIDFの計算は以下のようになります:
$$
IDF(q_i) = \ln\left(\frac{N - n(q_i) + 0.5}{n(q_i) + 0.5} + 1\right)
$$
- $N$ : 総文書数
- $n(q_i)$ : 単語 $q_i$ を含む文書数
実装していきます。まず、__init__に$k_1とb$を渡せるよう引数を追加します:
- def __init__(self) -> None:
+ def __init__(self, k1: float = 1.2, b: float = 0.75) -> None:
self.__tagger = Tagger()
self.__index: dict[str, set[int]] = {}
self.__results = {}
self.__lock = threading.RLock()
+ self.__bm25 = {"avgdl": 0, "N": 0}
+ self.__k1 = k1
+ self.__b = b
次に、index追加時に文章の長さも記録するようにします:
# 文書の長さを更新するコード
def __update_avgdl(self):
num_left = 0
results = self.__results.values()
for result in results:
num_left += result["dl"]
self.__bm25["avgdl"] = num_left / len(results)
self.__bm25["N"] = len(self.__results.keys())
def add_index(self, text: str) -> None:
with self.__lock:
texts_id = len(self.__results)
- self.__results[len(self.__results)] = text
+ self.__results[len(self.__results)] = {"dl": len(text), "text": text}
for word in self.__tagger(text):
orig_text = word.surface
fmt_text = word.feature.lemma
pos1 = word.feature.pos1
if (
pos1 == "名詞"
or pos1 == "動詞"
or pos1 == "形容詞"
or pos1 == "代名詞"
):
text_to_index = fmt_text if fmt_text else orig_text
self.__index[text_to_index] = self.__update_count(self.__index.setdefault(text_to_index, set()), texts_id)
+ self.__update_avgdl()
最後に、検索用のコードを書き換えていきます:
def search(self, text: str) -> list[str]:
+ scores: dict[int, int | float] = {}
targets = []
- counts = []
result = []
+ final_result = []
for word in self.__tagger(text):
orig_text = word.surface
fmt_text = word.feature.lemma
pos1 = word.feature.pos1
if (
pos1 == "名詞"
or pos1 == "動詞"
or pos1 == "形容詞"
or pos1 == "代名詞"
):
text_to_index = fmt_text if fmt_text else orig_text
targets.append(text_to_index)
if not targets:
return []
- common_ids = set.intersection(*[{t[0] for t in self.__index.get(k, set())} for k in targets])
- for id, count in {t for t in self.__index.get(targets[0], set()) if t[0] in common_ids}:
- counts.append(count)
- result.append(self.__results[id])
- sorted_pairs = sorted(zip(counts, result), reverse=True)
- final_result = [result for _, result in sorted_pairs]
- return final_result
+ common_ids = set.union(
+ *[{t[0] for t in self.__index.get(k, set())} for k in targets]
+ )
+ key_id = targets[0]
+ target_entries = self.__index.get(key_id, set())
+
+ filtered_entries = (
+ (entry_id, count)
+ for entry_id, count in target_entries
+ if entry_id in common_ids
+ )
+
+ avgdl = self.__bm25["avgdl"]
+ N = self.__bm25["N"]
+ n = len(target_entries)
+
+ for id, count in filtered_entries:
+ dl = self.__results[id]["dl"]
+ result.append(self.__results[id]["text"])
+ tf = count
+ denominator = tf + self.__k1 * (1.0 - self.__b + self.__b * (dl / avgdl))
+ idf = math.log(1.0 + (N - n + 0.5) / (n + 0.5))
+ score = idf * (tf * (self.__k1 + 1.0)) / denominator
+ scores[id] = scores.get(id, 0) + score
+
+ for id, score in scores.items():
+ text = self.__results[id]["text"]
+ scores[id] = self.__calculate_final_score(targets, text, score)
+
+ sorted_results = sorted(scores.items(), key=lambda x: x[1], reverse=True)
+ for id, final_score in sorted_results:
+ text = self.__results[id]["text"]
+ final_result.append(f"[{final_score:.4f}] {text}")
+
+ return final_result
BM25を実装したコードでもう一度さっきのクエリを投げてみます。すると、以下のように異なるスコアが出力されるはずです:
[0.0033] 豊臣秀吉は織田信長に仕え、信長の死後に天下を統一した。秀吉は大阪城を築き、太閤と呼ばれた。
[0.0033] 徳川家康は江戸幕府を開いた。家康はかつて織田信長と同盟を結び、長い忍耐の末に天下を掴んだ。
[0.0031] 織田信長は戦国時代の風雲児。信長は桶狭間で今川義元を破り、天下布武を掲げた。信長亡き後は秀吉が継いだ。
...順序は先程と変わっていませんね。各結果に織田信長は一つしか含まれていないためです。例えば、信長だけで検索してみると:
[0.0521] 織田信長は戦国時代の風雲児。信長は桶狭間で今川義元を破り、天下布武を掲げた。信長亡き後は秀吉が継いだ。
[0.0465] 豊臣秀吉は織田信長に仕え、信長の死後に天下を統一した。秀吉は大阪城を築き、太閤と呼ばれた。
[0.0331] 徳川家康は江戸幕府を開いた。家康はかつて織田信長と同盟を結び、長い忍耐の末に天下を掴んだ。
しっかり、信長という名前が多く出ているもののスコアが高くなっています
最後に
多分この記事の中で一番BM25の実装に時間をかけていると思います (なんならAIに教えを乞うレベルで時間がかかった)
実はこの記事は書いてから2ヶ月以上経っていて途中で力が尽きているので、バグがあったりするかもしれませんがお許しください...🫠
下に実際に郵便番号データを検索できるdemoを置いておきます、よければ使ってみてください (使い道はない):
