Edited at

pythonで検索エンジンの工作

More than 3 years have passed since last update.

前々から興味のあった検索エンジンを冬休みの工作として作ってみました。(といっても、めちゃくちゃちゃちいですが。。)


作業環境

標準入力から検索クエリを取得しているので、一応環境も書いておきます。環境は以下のとおりです。

Mac OSX 10.9.5 Core i7(64bit)

Python 2.7.9


参考資料

[Web開発者のための]大規模サービス技術入門

検索アルゴリズムの話だけでなく、メモリやディスクの話含め幅広く記載があります。また、実際の経験を元に失敗事例なども書かれており、読み物としても興味深い書籍でした。


やったこと

データ圧縮、転置インデックスの作成、検索という3つの機能の作成が主になります。


データ圧縮用コードの作成

今回はVBcodeによる圧縮を行っています。

他にもPForDelta, Golomb符号など様々な圧縮方法があるようです。

そもそもデータ圧縮をする理由ですが、大容量のストレージのが安い値段で購入できる現在においては全体のスループットをあげる、というのが主眼となります。

上記の書籍によれば、


スループットの観点からは、扱うデータを圧縮して置いたほうが早いという場面は割りと多い。

計算機にはCPUとI/O二種類の負荷がある。

ある特定の処理をするのにI/Oを待っている間は、その処理にCPUは使用できない。

ファイルを圧縮しておくとCPUに負担がかかるが、I/O待ちを減らす事ができる。

CPUが暇でI/Oが忙しいということは多いので、圧縮によりI/O負荷を減らしてCPUに肩代わりさせることで、全体のスループットが上がる。


という説明があります。

他にも、圧縮したものは解凍が必要なので、圧縮・解凍にかかる負荷よりもI/O部分のメリットが大きい場合に使用する、または、メリットが出る圧縮方法を使用する、という点も考慮する必要があると思います。

圧縮方法についてですが、VBcodeでは冗長なバイトを取り除いて最小限のバイトで情報を記述することによって圧縮を行っています。

例えば、整数の5は固定長バイナリ符号では00000000 00000000 00000000 00000101ですが、VBcodeでは10000101と表現できます。

8ビットが1塊なので、コード中では128(2進数で10000000)をうまく活用する事がキモとなります。

他に、整数値をソートして、数値のギャップを取ることによって圧縮効果が得られます。これは、小さな数値は出現しやすいが、大きい数値は出現しづらい、という確率分布に由来しているようです。


転置インデックスの作成と保存

今回はbi-gramを使って転置インデックスを作成しました。

転置インデックスはご存知の方が多いと思うので説明は省きます。

下の実装では

{ term: {(doc_id1, doc_id2,...): [[location_list1],[location_list2],...]} }

のようにtermをキーにlocationを含んだpostinglistを辞書に格納しています。

(もっとまともな方法があると思うので、お分かりの方はご教示頂けますと助かります。)

作成した転置インデックス(辞書)はファイルにcPickleで保存しました。


文書の検索

あまり説明もないので箇条書きします。


  • 標準入力から検索クエリを入力

  • 検索クエリからbi-gramでtermを作成

  • locationで各document中のtermの連続性を確認しながら検索

  • hitしたdocumentのタイトルとスニペットを表示される


使用データ

技術評論社のサポートページからダウンロードお願いします。

タイトルデータの入った10000entries.txtというファイルと、document_id毎に分けられた文書ファイルが入っているtextsというフォルダを使用します。

容量はタイトルファイルが1.4M、textsフォルダが全体で321Kです。


参考コード

かなり汚いコードで申し訳有りませんが、以下貼り付けます。

まずはVBcodeによる圧縮/解凍を行うコード。


vbcode.py

# coding: utf-8


from struct import pack, unpack

def vbencode_number(number):
number = int(number)
_bytes = []
while 1:
_bytes.insert(0, number % 128)
if number < 128: break
number /= 128
_bytes[-1] += 128 #ビット列の終わりを宣言(先頭の1ビットをマスクとして利用)
return pack('%dB' % len(_bytes), *_bytes)

def vbencode(numbers):
bytestream = ''
for number in numbers:
_bytes = vbencode_number(number)
bytestream += _bytes
return bytestream

def vbdecode(bytestream):
numbers = []
n = 0
bytestream = unpack('%dB' % len(bytestream), bytestream)
for i in xrange(len(bytestream)):
if bytestream[i] < 128:
n = 128*n + bytestream[i]
else:
n = 128*n + bytestream[i] - 128
numbers.append(n)
n = 0
return numbers

if __name__=='__main__':
numbers = (1,2,128,145)
res = vbencode(numbers)
print vbdecode(res)


続いて転置インデックスの作成と保存です。


index.py

# codeing: utf-8


from __future__ import unicode_literals
from collections import defaultdict
from unicodedata import normalize
import MeCab
from vbcode import vbencode
import numpy as np
import os
import cPickle

def get_title_data(input_filename):
titles = defaultdict(str)
with open(input_filename, 'r', 10000) as f:
for line in f:
try:
doc_id, cat_id, url, title = line.strip().split(b'\t')
titles[int(doc_id)] = title
except Exception, e:
doc_id, cat_id, url = line.strip().split(b'\t')
titles[int(doc_id)] = ''
return titles

def create_index(doc_base_path,titles):
main_index = defaultdict(dict)
for doc_id in titles.keys():
try:
main_index = get_index_dic(doc_base_path, doc_id, main_index)
except Exception, e:
import traceback
print traceback.format_exc()
print e
compressed_main_index = compress_postinglist(main_index)
save_index(compressed_main_index)

def save_index(index):
with open('index_bigram.pkl','w') as f:
cPickle.dump(index, f)

def compress_postinglist(index):
for key_word, posting_dic in index.items():
tup_did_location = zip( posting_dic.keys()[0],posting_dic.values()[0] ) # [(),()]
tup_did_location.sort(key=lambda x:x[0]) # [(),()]

doc_id_list = []
lctn_pl = []
for doc_id, location_list in tup_did_location:
doc_id_list.append(doc_id)
lctn_pl.append( vbencode( get_differences(location_list) ) )
d_pl = get_differences(doc_id_list)
index[key_word] = { vbencode(d_pl) : lctn_pl}
return index

def get_differences(sorted_list):
if len(sorted_list)>1:
diff_list = ( np.array(sorted_list) - np.array([0]+sorted_list[:-1]) ).tolist()
else:
diff_list = sorted_list
return diff_list

def get_index_dic(doc_base_path, doc_id, main_index):
with open(doc_base_path+'/'+str(doc_id), 'r') as f:
doc = f.read().strip()
doc = normalize('NFKC',doc.decode('utf-8')).lower()
word_list, word_location = bi_gram_tokenize(doc)

for word, location in zip(word_list, word_location):
if main_index[word]:
_keys = main_index[word].keys()[0]
if doc_id in _keys:
main_index[word][_keys][-1] += [location]
else:
main_index[word][_keys].append( [location] )
main_index[word] = {_keys+(doc_id,): main_index[word].values()[0]}
else:
main_index[word][(doc_id,)] = [ [location] ]
return main_index

def mecab_tokenize(doc):
tagger = MeCab.Tagger(b'-Ochasen')
node = tagger.parseToNode(doc.encode('utf-8'))
word_list = []
while node:
ns = node.surface
word_list.append(ns)
node = node.next
return word_list

def bi_gram_tokenize(doc):
word_list = []
word_location = []
for i in xrange(len(doc)):
term = doc[i:i+2].strip()
if len(term)!=0:
#print "term:",term
word_list.append( term )
word_location.append( i )
return word_list, word_location

def main(input_filename, doc_base_path):
titles = get_title_data(input_filename)
create_index(doc_base_path,titles)

if __name__=='__main__':
ifpath = os.getcwd()+'/../works/input'
ifname = ifpath + '/10000entries.txt'
doc_base_path = ifpath + '/texts'
main(ifname, doc_base_path)


最後に検索部分のコードです。


search.py

# coding: utf-8


from __future__ import unicode_literals
import os
from collections import defaultdict
import sys
from vbcode import vbdecode
import cPickle as pickle
import numpy as np
from unicodedata import normalize

def get_titles(input_filename):
with open(input_filename,'r',10000) as f:
titles = defaultdict(dict)
for line in f:
try:
doc_id, cat_id, url, title = line.strip().split(b'\t')
titles[int(doc_id)] = {'title': title, 'url':url}
except Exception, e:
doc_id, cat_id, url = line.strip().split(b'\t')
titles[int(doc_id)] = {'title':'', 'url': url}
return titles

def load_index(index_filename):
with open(index_filename, 'r') as f:
index = pickle.load(f)
return index

def search(titles, index, query, text_path):
query = normalize('NFKC',query.decode('utf-8'))
for q in query.strip().split(' '):
print "search:", query
if len(q.strip())==0: continue

bi_gram_qs = get_bigram_queries(q.strip())
if not index[bi_gram_qs[0]]: continue

# doc_idの積集合を取得
doc_ids = set( get_decoded_data(index[bi_gram_qs[0]].keys()[0]) )
for bi_gram_q in bi_gram_qs[1:]:
doc_ids = doc_ids & set( get_decoded_data(index[bi_gram_q].keys()[0]) )
doc_ids = list(doc_ids)

if len(doc_ids)==0: continue

location_lists = []
for bi_gram_q in bi_gram_qs:
# doc_idのindexリストを取得
all_doc_ids = get_decoded_data( index[bi_gram_q].keys()[0] )
idx_list = [ all_doc_ids.index(doc_id) for doc_id in doc_ids ]
# 各クエリの各document毎のlocationを取得
location_list = [ get_decoded_data(index[bi_gram_q].values()[0][idx]) for idx in idx_list ]
location_lists.append( location_list )

doc_id_idx_idx, locations4snippet = check_sequence(location_lists)
#print "doc_id_idx_idx:", doc_id_idx_idx
#print "doc_ids:", doc_ids
squewed_doc_ids = ( doc_ids[doc_id_idx] for doc_id_idx in doc_id_idx_idx )
for doc_id, location in zip( squewed_doc_ids, locations4snippet):
print "doc_id:",doc_id
if doc_id in titles:
print titles[doc_id]['title']
if location<50:
start = 0
else:
start = location - 50
end = location + 50
with open(text_path+'/'+str(doc_id), 'r') as f:
print f.read().strip().decode('utf-8')[start:end]

def get_bigram_queries(query):
bi_gram_qs = []
for i in xrange(0, len(query), 2):
bi_gram_q = query[i:i+2].strip()
if len(bi_gram_q) ==1 and i>0:
bi_gram_q = query[i-1:i+1]
bi_gram_qs.append( bi_gram_q )
return bi_gram_qs

def get_decoded_data(encoded_data):
decoded_data = vbdecode(encoded_data)
return np.cumsum(decoded_data).tolist()

def check_sequence(location_lists):
length = len(location_lists)

doc_id_idx_idx = []
locations4snippet = []
for doc_i, location_list in enumerate(location_lists[0]):
for location in location_list:
if length==1:
return [doc_i]
elif check_next_node(1, doc_i, location, location_lists, length):
doc_id_idx_idx.append(doc_i)
locations4snippet.append(location)
else:
continue
return doc_id_idx_idx, locations4snippet

def check_next_node(i, doc_i, location, location_lists, last_node):
for next_location in location_lists[i][doc_i]:
if location < next_location <= location+2:
if i < last_node-1:
return check_next_node(i+1, doc_i, location+2, location_lists, last_node)
else:
return True
else:
return False

if __name__=='__main__':
basepath = os.getcwd()
index_filename = basepath + '/index_bigram.pkl'
file_path = basepath + '/../works/input'
ifname = file_path + '/10000entries.txt'
text_path = file_path + '/texts'

titles = get_titles(ifname)
index = load_index(index_filename)

print "get search query"
query = raw_input()
search(titles, index, query, text_path)
print "done"


転置インデックスの作成にものすごく時間がかかります。

今回はメモリに全部乗る容量のデータだったので良かったのですが、メモリに乗らない場合は、データベースに格納しつつバッファしたインデックスをDBにマージして格納していくのかな、と想像しながら作っていました。

DBなのですがpythonの辞書型で作成するならMongoDBが楽なように思います。


コメント

データ圧縮とか検索技術とか深い専門分野だと思うのですが、単純に何も考えずに作ってみる分には、当初想像していたよりも簡単に出来ました。

今回着手していない、ランキングアルゴリズムの作成などにも今後挑戦していければと思っています。

お手数ですが間違いがありましたらご指摘いただけますと助かります。