目的
Sparkを使って見るという実験で、全てのペアについて距離を総当りで計算して近傍探索、閾値以下のものについてソートして出力ということをやって見た。
趣旨としては、特徴ベクトルで一定の近傍を探索し、「類似している」ものを提示する、ということである。
作業の実際
pickleからデータを読み込む。キー文字列のベクトルと、特徴量ベクトルのベクトルがタプルとして保存されているので、個別にRDDに変換してzipで結合する。
pickleから読み込んでいるのは、実験に使ったテストデータが、別件の実験でtensorflowで作成したものだったから。
import pickle
import numpy as np
# load from pickle
f=open("cvtest3.pickle", "rb")
cv3=pickle.load(f)
f.close()
# re-construct from ([vector of key], [vector of feature vector])
cv3R1 = sc.parallelize(cv3[0])
cv3R2 = sc.parallelize(cv3[1])
cv3R = cv3R1.zip(cv3R2)
キー文字列kiと特徴ベクトルviのペア(ki, vi)から、(ki, kj)のペアとvi, vj間の距離rijを直積を使って求める。
# make all i-j pair as ((i, v_i), (j, v_j))
B=cv3R.cartesian(cv3R)
# change to ((i, j), (v_i, v_j))
C=B.map(lambda x: ((x[0][0],x[1][0]),(x[0][1],x[1][1])))
# calculate Euclid distance
D=C.filter(lambda x:x[0][0]!=x[0][1])
E=D.map(lambda x: (x[0], x[1][0]-x[1][1]))
# ((i, j), r_ij)
F=E.map(lambda x:(x[0], np.sqrt(np.dot(x[1],x[1]))))
F.persist()
一旦、iに対して、それが持つ距離全てをまとめたベクトルを作る。statを使って、iごとにmin/max, 平均/標準偏差などを計算する。これがキーiの周囲の状況を示す。
##
## stat1
##
# make new (i, [all r_ij]) vector
G=F.map(lambda x: (x[0][0], x[1]))
H=G.groupByKey()
I=H.map(lambda x: (x[0], list(x[1])))
# stats for each i
J=I.map(lambda x:(x[0], np.array(x[1])))
# (i, (min, mean, std dev, max))
K=J.map(lambda x:(x[0], (np.min(x[1]),np.mean(x[1]),np.std(x[1]),np.max(x[1]))))
一方で、全部の距離だけの統計も求め、閾値をここから得ることにする。
閾値を mean - 2* std_dev としているが、データによっては空集合になったりするので個別に判断してやる必要がある(こういうのこそ人工知能が判断してほしい)。
##
## stat2
##
# extract only r_ij
L=F.map(lambda x: x[1])
# obtain stats
L.persist()
l=L.stats()
print(l.min(), l.mean(), l.stdev(), l.max())
thre = l.mean() - l.stdev() * 2
print(thre)
((ki, kj), rij)なるペアから(ki, (kj, rij))を計算し、kiについてまとめる。事前に閾値以下のもののみ抽出しておく。
## (i, [(j, r_ij)]) near than thre
O=F.filter(lambda x: x[1] < thre)
P=O.map(lambda x: (x[0][0], (x[0][1], x[1])))
Q=P.groupByKey()
# +magic
R=Q.map(lambda x: (x[0], list(x[1])))
全ペア間距離から求めた何らかの閾値以下の距離にある、キーiから見て比較的近くにあるキーjの集合を、その距離とともに得る。ついでに、キーiごとに、近傍のキーの数とか、平均/標準偏差などの近傍分布なども結合して出力しておく。
# size of i-j pair near by threshold
S=R.map(lambda x: (x[0], len(x[1])))
## join S and K; (i, (num of neighbor, (min, mean, stddev, max)))
SK=S.join(K)
## sort the neighbor list
T=R.map(lambda x: (x[0], sorted(x[1], key=lambda y: y[1])))
DIC=SK.join(T)
## obtain the result as a list
RES=DIC.collect()
余言
この件は元々はSparkが話題になった2015年6,7月頃に実験してみようと思ったものの、会社のクソ用事のせいで忙殺され、実際に手を動かせたのは2016年1月になっていた(2015年の夏ころは私物のHadoopクラスタの構築に手間取っていた)。2016年1月には、上で言うKの計算まではできていたが、ある要素周囲の状況を求めたところから先が思いつかずそのまま放置された。
その後1年近く、もっとクソな用事で忙殺されていたのがやっとカタがつき、全ての距離分布の平均・標準偏差から出した閾値で近傍リストを作るということを思いつき、1年越しでやっと片付けられた。2015年の流行からだと1.5年遅れである。