Edited at

b-Bit MinHashを使ったサイトのカテゴリ分類

More than 5 years have passed since last update.


動機

URLをカテゴリ分けしたいと考えb-Bit MinHashを使ってみました。

具体的には以下のようなことをするためです。

cookieから抽出したサイト閲覧情報を素性としてユーザのCV確率を求める場合、素性ベクトルがかなりスパースになってしまいます。そこで、URL単位でカテゴリに分け、ある程度素性ベクトルの密度を上げたいと考えました。そのために標題について実験をしています。


このブログに書いたこと


  • b-Bit MinHashの概要

  • 参考コード


b-Bit MinHashの概要


参考文献


b-Bit MinHashの手順

手法の概要については、上のSmartNewsさんの開発ブログとその文中に出てくるPFIの岡野原さんの資料を参照していただきたく思います。私が説明を書くよりはるかにわかりやすく書いてありますしw

ということで、ここではコードを書く際の手順について簡単に書きます。


まずは教師データの準備


  • 教師データの準備(カテゴリを付したクローリング対象のURLのリストを準備)

  • URL、カテゴリ、単語集合、最小Hash値内のbi-Bit分のk個のリスト(以下、最小hash値リスト)を保存するためのデータベースを作る(kはhash関数(seed)の種類)

  • URL、カテゴリ、取得した単語集合をデータベースに保存

  • 取得した単語集合から最小hash値リストを作成し、データベースを更新する


次にlearnerのデータの準備


  • learnerのURLリストを作成

  • URL、カテゴリ、単語集合、最小Hash値内のbi-Bit分のk個のリストを保存するためのデータベースを作る

  • URL、取得した単語集合をデータベースに保存


分類の実施


  • learnerのデータベースから各URLから抽出した単語集合を呼び出し、最小hash値リストを作成、DBの更新

  • 作成した最小hash値リストと教師DBの最小hash値リストでMinHashの実施

  • カテゴリを取得しDB更新


参考コード

以下、作成したコードを記載しました。

いろいろ切り貼りして作ったので、命名規則がぐちゃぐちゃです。。

例によってエンジニアでない人間が書いていますので、相当お見苦しいコードになっておりますが、広い心で読んでいただけると非常に助かります。。


教師URLからテキストデータを取得し、単語集合などをDBに格納

# coding: utf-8


import urllib2
from BeautifulSoup import BeautifulSoup, NavigableString, \
Declaration, Comment, BeautifulStoneSoup
import unicodedata
import chardet
from urlparse import urljoin
import sqlite3 as sqlite
import pickle
import SeparateWords as spw

# 無視すべき単語のリスト
ignorewords = set(['the', 'of', 'to', 'and', 'a', 'in', 'is', 'it', '-', '/', '#', "'", '_', ',', '(', ')', '[', ']', '〜', '!', '|', '♪', '...', '>', '<', ':', '!!', '&', '/', '+', '*'])
# ブロックとして認識するタグのリスト
block_tags = frozenset(['p', 'div', 'table', 'dl', 'ul', 'ol', 'form', 'address',
'blockquote', 'h1', 'h2', 'h3', 'h4', 'h5', 'h6', 'fieldset',
'hr', 'pre' 'article', 'aside', 'dialog', 'figure',
'footer', 'header', 'legend', 'nav', 'section'])

class crawler:
# データベースの名前でクローラを初期化
def __init__(self, dbname):
self.dbname = dbname
self.con=sqlite.connect(dbname)
self.con.text_factory = str # utf-8を使用するためにstrを指定

def __del__(self):
self.con.close()

def dbcommit(self):
self.con.commit()

# 個々のページをインデックスする
def addtoindex(self,url,category,wordlist):
if self.isindexed(url): return
print 'Indexing' + url

# url毎に単語リストをDBに格納
self.con.execute( "insert into wordvector values(?,?,?,?)" , \
(url, category, pickle.dumps(wordlist), '') )
self.dbcommit()

def getNavigableStrings(self,soup):
# soupがcontentsを含む型かを判定
if isinstance(soup, NavigableString): # NavigableStringオブジェクトはcontentsとstringを除く全てのメンバ変数を持つ
if type(soup) not in (Comment, Declaration):
yield soup
# contentsを含んでいてprogram codeでなければテキストを取得
elif soup.name not in ('script', 'style'):
is_block = soup.name in block_tags # ブロックタグの存在を確認
if is_block:
yield u'\n'
for c in soup.contents:
for g in self.getNavigableStrings(c):
yield replace_str(g)
if is_block:
yield u'\n'

# URLが既にインデックスされていたらtureを返す
def isindexed(self,url):
u=self.con.execute \
("select words from wordvector where url='%s'" % url).fetchone()
if u!=None:
return True
return False

# ページのリストを受け取り、与えられた深さで幅優先の検索を行い
# ページをインデクシングする
def crawl(self,pages,category,depth=2):
for _ in xrange(depth):
newpages=set()
for page in pages:
try:
response=urllib2.urlopen(page).read()
except:
print "Could not open %s" % page
continue
en = chardet.detect(response)['encoding']
soup=BeautifulSoup(response.decode(en), # 文字コードをサイトに合わせて変換
convertEntities = BeautifulStoneSoup.HTML_ENTITIES) # HTML特殊文字の文字変換
text = u''.join( self.getNavigableStrings(soup) )
text = normalizeText(text) # 文字列の正規化
words = spw.separateWords(text) # 単語集合の取得
self.addtoindex(page,category,words) # url, category, 単語集合をDBに保存

# 深さ2以上の探索を行う場合のためにaタグのURLを取得
links=soup('a')
for link in links:
if ('href' in dict(link.attrs)):
url=urljoin(page,link['href'])
if url.find("'")!=-1: continue
url=url.split('#')[0] # フラグメント識別子を取り除く
if url[0:4]=='http' and not self.isindexed(url):
newpages.add(url)
pages=newpages

# データベースのテーブルを作る
def createindextables(self):
name = 'wordvector'
sql="SELECT name FROM sqlite_master WHERE type='table' AND name='MYTABLE';" \
.replace('MYTABLE', name)
res = self.con.execute(sql).fetchone()
if res is None:
self.con.execute('create table wordvector(url, category, words, hash_vector)')
self.con.execute('create index urlidx on wordvector(url)')
self.dbcommit()
else:
self.con.execute('drop table wordvector')
self.con.execute('create table wordvector(url, category, words, hash_vector)')
self.con.execute('create index urlidx on wordvector(url)')
self.dbcommit()

def nonEmptyLines(text):
""" 不要な空白を取り除き,空行以外を返す """
for line in text.splitlines():
line = u' '.join(line.split())
if line:
yield line

def normalizeText(text):
""" 正規化の後で不要な空白・改行を取り除く """
Text = unicodedata. normalize ('NFKC', text) # 日本語半角を全角に変換+英数字,記号は半角
return u'\n'.join(nonEmptyLines(text))

def replace_str(line):
return line.replace('&amp;','&')

if __name__=='__main__':
fname = 'teacherURL.txt'
crawler=crawler('teacher.db')
crawler.createindextables()
with open(fname, 'r') as f:
for line in f:
s = line.strip().split('\t')
print s[0]
category = s[0]
pagelist = [s[1]]
crawler.crawl(pagelist, category, depth=1)

テキストから単語を抽出する部分(spw.separateWords(text))は以下のようなコードにしました。


テキストから単語を抽出

# coding: utf-8

import MeCab

# 無視すべき単語のリスト
ignorewords = set(['the', 'of', 'to', 'and', 'a', 'in', 'is', 'it', 'の'])
igonoresymbols = set(['-', '/', '#', "'", '_', ',', '(', ')', '[', ']',
'~', '!', '|', '♪', '...', '>', '<', ':', '!',
'&', '/', '+', '*', '【', '】', '(', ')', '!', ':', 'ー',
'_', '?','%', '「', '」','~','.', '{', '}','"',
'<', '>', '/'])

def separateWords(text):
tagger = MeCab.Tagger('-Ochasen')
node = tagger.parseToNode(text.encode('utf-8'))
word_set = set([])
while node:
feat = node.feature.split(",")
if feat[0] == "名詞" \
and (feat[1] == "一般"
or feat[1] == "固有名詞" or feat[1] == "形容動詞語幹"):
#word = node.surface.decode(en)
word = node.surface
if word not in ignorewords:
word_set.add( word )
node = node.next
return word_set

続いて、取得した単語集合から教師用の最小hashリストを作成し、DBを更新する部分を記載します。


最小hashリストの作成・保存

# coding: utf-8


import sqlite3 as sqlite
import pickle
import hashing

class CreateDB(object):
def __init__(self,dbname):
self.db = sqlite.connect(dbname)
self.db.text_factory = str # utf-8を使用するためにstrを指定
self.hashing = hashing.Hashing()

def __del__(self):
self.db.close()

def db_commit(self):
self.db.commit()

def create_hashtable(self, url, wordlist):
hash_v = self.hashing.get_min_vector(wordlist)
self.update_vector(url, pickle.dumps(hash_v))
self.db_commit()

def update_vector(self,url,vector):
self.db.execute("update wordvector set hash_vector=? where url=?" , \
(vector, url))

if __name__=='__main__':
dbname = 'teachre.db'

c_db = CreateDB(dbname)

con = sqlite.connect(dbname) # wordベクトルが格納されたDBを呼ぶ

itr = 1
while True:
sql = "select * from wordvector where rowid=='%d'"
res = con.execute(sql % itr).fetchone()
print res

if res is None:break

url = res[0]; category = res[1]; wordlist = res[2]
c_db.create_hashtable(url, wordlist)

itr += 1
con.close()

最小hashリストの作成は以下のように行っています。


最小hashリストの作成

# coding: utf-8


import numpy as np
import mmh3
from multiprocessing import *

class Hashing(object):
'''
各urlで取得した単語集合に対して、hashingをかけて、最小値を取得
url毎にk次元のベクトルを取得する
このk次元ベクトルを比較することによって類似度を判定する
'''

def __init__(self,bbit=1,k=128):
self.bbit = bbit
self.hash_cnt = k

def get_min_vector(self,feat_names):
''' この関数はurl分回すことになる '''
hash_vec = []
# 単語集合のhashの最小値のみをk回取得
for seed in xrange(self.hash_cnt): # self.hash_cnt分のseedを発生させる
pool = Pool(processes=8)
hash_val = pool.map( get_hash_value, ((feat_name, seed) for feat_name in feat_names) ) # k個のseedを使ってk個のhash関数を発生
pool.close()
pool.join()
min_val = int( str( min(hash_val) )[-self.bbit] )
hash_vec.append(min_val) # 最小値の1桁目のみ取得する
return hash_vec

def get_hash_value((feat_name,seed)):
return mmh3.hash(feat_name,seed)

ここまでで、教師データの準備が完了しました。

同様に学習者用の単語集合を取得し、DBに格納してください。

以下では取得した学習者の単語集合と作成した教師hashリストを使ってMinHashを実行します。

そして、学習者のURLにカテゴリを付与し、DBを更新して終了です。

以下、コード記載します。


b-BIt MinHashの実施

# coding: utf-8


import numpy as np
import sqlite3 as sqlite
import hashing
import pickle
from multiprocessing import *

class MinHash(object):
def __init__(self,train_dbname,teacher_dbname):
self.train_db = sqlite.connect(train_dbname)
self.teacher_db = sqlite.connect(teacher_dbname)
self.hashing = hashing.Hashing() # default values are bbit=1, k=128

def __del__(self):
self.train_db.close()
self.teacher_db.close()

def db_commit(self):
self.train_db.commit()

def get_category(self):
learner_urls = self.get_urls(self.train_db)
teacher_urls = self.get_urls(self.teacher_db)
for lrnr_url in learner_urls:
print "computing hash vector:", lrnr_url[0]
learner_hash = self.calculate_hashvecotr( lrnr_url[0] )

print "calculating similarity:", lrnr_url[0]
learner_words = self.get_wordslist( self.train_db,lrnr_url[0] )

pool = Pool(processes=8)
sim = pool.map( similarity_value,
( (learner_words, self.get_wordslist( self.teacher_db,tchr_url[0] ),
learner_hash, self.get_hash_vector(tchr_url[0]))
for tchr_url in teacher_urls ) ) # 類似度の算出
pool.close()
pool.join()

sim = np.array(sim)
print "sim: ",sim
idx = np.where(sim==max(sim))[0][0]
sim_url = teacher_urls[idx][0]

category = self.get_similer_category( sim_url )
print "similar category of this URL is: ", category
self.update_category(category, sim_url)

def calculate_hashvecotr(self, url):
""" hashベクトルを作りつつ、元のDBをupdate """
hash_vector = self.hashing.get_min_vector(
self.get_wordslist(self.train_db, url) )
self.train_db.execute( "update wordvector set hash_vector=? where url=?" ,
(url, pickle.dumps(hash_vector), ) ) # pickleで保存する
self.db_commit()
return hash_vector

def update_category(self,category,url):
self.train_db.execute( "update wordvector set category=? where url=?" ,(category, url, ) )
self.db_commit()

def get_wordslist(self,db,url):
wordlist = db.execute( "select words from wordvector where url=?" , (url,) ).fetchone()[0]
return pickle.loads(wordlist)

def get_urls(self,db):
return db.execute("select url from wordvector").fetchall()

def get_similer_category(self,url):
return self.teacher_db.execute( "select category from wordvector where url=?" ,(url, ) ).fetchone()[0]

def get_hash_vector(self,url):
hash_v = self.teacher_db.execute( "select hash_vector from wordvector where url=?" , (url, ) ).fetchone()[0]
return pickle.loads(hash_v)

def similarity_value( (lrnr_words, teacher_words, lrnr_hash, teacher_hash) ):
try:
feat_dim = 1 << len(lrnr_hash)
C1, C2 = calculate_distribusion(lrnr_words, teacher_words, feat_dim)

jaccard = float( sum( np.array(lrnr_hash)==np.array(teacher_hash) ) ) \
/len(lrnr_hash) # 類似度の算出
return (jaccard-C1)/(1-C2)

except Exception, e:
import sys
import traceback
sys.stderr.write(traceback.format_exc())

def calculate_distribusion(lrnr_words, teacher_words, feat_dim):
all_words = lrnr_words | teacher_words
D = len(all_words); f1 = len(lrnr_words); f2 = len(teacher_words) # 簡易的に2つの単語集合を結合したものを全単語集合とみなしている
r1 = float(f1)/D; r2 = float(f2)/D; sum_r = r1 + r2

A1 = calc_A(r1, feat_dim); A2 = calc_A(r2,feat_dim)

C1 = A1*r2/sum_r + A2*r1/sum_r
C2 = A1*r1/sum_r + A2*r2/sum_r

return C1, C2

def calc_A(r, feat_dim):
A_num = r*(1-r)**(feat_dim-1)
A_denom = 1-(1-r)**feat_dim
return A_num/A_denom

if __name__=='__main__':
traindbname = 'learner.db'
teacherdbname = 'teacher.db'

minhash = MinHash(traindbname,teacherdbname)
minhash.get_category()

論文読んでいただければわかるのですが、D(単語集合の大きさ)とk(hash関数の種類)が十分大きいことが前提条件です。

そうでない場合は近似が成り立ちませんので、おかしな値が出ると思いますのでご注意下さい。

上のコードではDを2つの単語集合の結合としていますが、実験のため簡易的におこないました。実際使用される時は全単語集合をDとして下さい。


実験結果

適当にファイナンス、経済、スポーツ(野球、ゴルフ)のカテゴリで実験してみました。

スポーツは野球とゴルフでは使用される単語がかなり異なるため、スポーツというくくりでは上手くカテゴリが付与されてませんでした。

実務で使用される場合はカテゴリ自体にも工夫が必要だと思います。

ちなみに、カテゴリ作成の際に私が参考にしたのがopen directory project(ODP)です。

計算時間に関して言えば、最小hash値リストの作成が最も時間を食います。一応、並列処理を行うようにはしているんですが、それでも相当待ち時間が発生する印象です。

(平均時間を計測する余裕がなかったので、印象だけ記載します。手抜きでスミマセン。。。)

それと、上のコード中では32bit hashを使っていますが、64bit hashの使用をお勧めします。64bitの方がコリジョンする確率が低いので。


まとめ(というか感想)

MinHashで使われている近似手法がまず面白いです。

mixiさんのブログで紹介されていた軽量データクラスタリングツールbayonでも同様の近似手法が使われていて、大量データを扱うのであれば、こういった近似手法を知っていないと(作れないと)効率の良い計算は難しいな、と感じたしだいです。。

ツールとしてのb-Bit MinHashは、簡単に作成できる割には結構精度良く判定してくれます。

また論文中にも記載がありますが、実務でも応用できる場面が多い気がしています。

お手数ですが間違いがあればご指摘いただけますと助かります。