はじめに
pysparkでコンテンツの類似度を図るためのコードを作成しました。
日本語の文献が少なかったので、メモします。
sparkを一から導入するのはめんどくさかったので、dockerを使用しました。
dockerでpysparkの導入
dockerのインストール
dockerの導入には
http://qiita.com/hshimo/items/e24b1fbfbf775ec7c941
を参考にしました。
基本的には
Get started with Docker for Mac
よりdmgファイルをダウンロードしてきてインストールするだけです。
spark+jupyterのdockerイメージの使用
spark+jupyterのdockerイメージをcloneします。
cloneしたのはhttps://github.com/busbud/jupyter-docker-stacks/tree/master/all-spark-notebookです。
README内に記されているように、
$ docker run -d -p 8888:8888 jupyter/all-spark-notebook -e GRANT_SUDO=yes
terminalで上のコマンドを打ちます。
これにより、pysparkを使用することができるjupyter notebookがlocalhost:8888に立ち上がります。
(-e GRANT_SUDO=yesはユーザー権限なしにjupyterを利用できるするようにするオプションです。)
docker psコマンドを打つと
$ docker ps
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
91fc42290759 jupyter/all-spark-notebook "tini -- start-not..." 3 hours ago Up 3 hours 0.0.0.0:8888->8888/tcp nifty_bartik
というように、指定したdockerが起動していることがわかります。
コード
作成したコードは
https://github.com/kenchin110100/machine_learning/blob/master/samplePysparkSimilarity.ipynb
にあります。
初期化
まず必要なライブラリをインポートして、pysparkの初期化を行います。
# coding: utf-8
"""
pysparkで類似度を図るためのサンプル
"""
import numpy as np
from pyspark import SQLContext, sql
import pyspark
from pyspark.sql import functions, Row
from pyspark.mllib.linalg import DenseVector
sc = pyspark.SparkContext('local[*]')
sqlContext = sql.SQLContext(sc)
scはpysparkのRDD型を利用するために必要なインスタンス、
sqlContextはDataFrame型を利用するために必要なインスタンスです。
サンプルデータの作成
次に類似度を図るためのサンプルデータを作成しました。
# サンプルデータの作成
samples = [['aaa', 'a', 30, 1,2,3,4,5] + np.random.randn(5).tolist(),
['aaa', 'b', 30,2,1,3,4,1] + np.random.randn(5).tolist(),
['bbb', 'a', 30,4,5,3,2,4] + np.random.randn(5).tolist(),
['bbb', 'b', 30,1,2,4,3,1] + np.random.randn(5).tolist(),
['ccc', 'a', 30,4,5,2,1,2] + np.random.randn(5).tolist(),
['ccc', 'b', 30,1,2,5,4,1] + np.random.randn(5).tolist(),]
# カラム名の作成
colnames = [
'mc', 'mtc', 'area_cd',
'label1', 'label2', 'label3', 'label4', 'label5',
'label6', 'label7', 'label8', 'label9', 'label10'
]
colnames1 = [col + '_1' for col in colnames]
colnames2 = [col + '_2' for col in colnames]
# 作成したサンプルデータをpysparkのDataFrame型に変換
df1 = sqlContext.createDataFrame(sc.parallelize(samples), colnames1)
df2 = sqlContext.createDataFrame(sc.parallelize(samples), colnames2)
mc, mtcをユニークのkeyとして、label1~label10を素性として考えています。
同じサンプルデータを2つのデータフレームに格納しているのは、
類似度を図る組み合わせをjoinで作成するためのです。
samplesをsc.parallelize(samples)でRDD型に変換してから、createDataFrameでDataFrame型にしています。
組み合わせの列挙
次にDataFrame型のjoinを使用して、類似度を測定する組み合わせを列挙します。
joined_df = df1.join(df2, df1.area_cd_1 == df2.area_cd_2).filter(functions.concat(df1.mc_1, df1.mtc_1) != functions.concat(df2.mc_2, df2.mtc_2))
DataFrame型のjoinは
df1.join(df2, <条件>, 'left' or 'inner' or ...)
で作成することができます。
今回作成したコードでは、joinの後にfilterを行うことで、
自分自身との類似度を計らないようにしています。
functions.concat(df1.mc_1, df1.mtc_1)
ではユニークにしたいmcとmtcの2つのkeyを結合させることで、1つのユニークなkeyとして扱っています。
類似度の計算
ここまでで作成したDataFrameを使用して、類似度の計算を行なっていきます。
関数の定義
まず関数を定義します。
def match_sim(row1 ,row2):
keys = row1.asDict().keys()
total = len(keys)
count = 0
for key in keys:
if row1[key] == row2[key]:
count += 1
return float(count)/total
def cosine_sim(vec1 ,vec2):
dot = abs(vec1.dot(vec2))
n1 = vec1.norm(None)
n2 = vec1.norm(None)
return float(dot/(n1*n2))
match_simはカテゴリ変数の類似度を測定するための関数で、
pysparkのRow型を渡します。
一致していれば1、一致していなければ0を返し、比較した素性数で割った値を返します。
cosine_simはコサイン類似度を計算するための関数で、
pyspark.mllibのDenseVector型を渡します。
この関数を利用して、行ごとの類似度を計算していきます。
類似度の計算
joined_rdd = joined_df.rdd.map(lambda x: (
Row(mc_1=x.mc_1, mtc_1=x.mtc_1, mc_2=x.mc_2, mtc_2=x.mtc_2),
Row(label1=x.label1_1, label2=x.label2_1, label3=x.label3_1, label4=x.label4_1, label5=x.label5_1),
DenseVector([x.label6_1,x.label7_1,x.label8_1,x.label9_1,x.label10_1]),
Row(label1=x.label1_2, label2=x.label2_2, label3=x.label3_2, label4=x.label4_2, label5=x.label5_2),
DenseVector([x.label6_2,x.label7_2,x.label8_2,x.label9_2,x.label10_2])
)) \
.map(lambda x: (x[0], match_sim(x[1], x[3]), cosine_sim(x[2], x[4]))) \
.map(lambda x: (x[0].mc_1, x[0].mtc_1, x[0].mc_2, x[0].mtc_2, x[1], x[2]))
まずは、先ほど作成したDataFrame型であるjoined_dfをrdd型に変換してmapします(1行目)
5種類のデータ型を各組み合わせに対して保存します。
Row(mc_1=x.mc_1 ...)は類似度を図るユニークのkeyを保存するためのRow(2行目)
Row(label1=x.label1_1 ...)はカテゴリ変数を保存するためのRow(3行目)
DenseVector(x.label6_1, ...)は連続変数を保存するためのVector(4行目)
5行目、6行目は類似度を図るもう一方の行のカテゴリ変数、連続変数を保存します。
このように作成した5種類のデータ型を保存したRDDに対してさらにmapをします(8行目)。
x[0]はそのまま、x[1]とx[3]に対しては一致による類似度を、x[2], x[4]に対してはコサイン類似度を計算します。
最後に、再びDataFrame型に渡すために、整形します(9行目)。
結果の出力
このように作成した類似度テーブルは以下のようになります。
sqlContext.createDataFrame(joined_rdd, ['tar_mc', 'tar_mtc', 'res_mc', 'res_mtc', 'match_sim', 'cosine_sim']).show()
+------+-------+------+-------+---------+--------------------+
|tar_mc|tar_mtc|res_mc|res_mtc|match_sim| cosine_sim|
+------+-------+------+-------+---------+--------------------+
| aaa| a| aaa| b| 0.4| 0.2979433262317515|
| aaa| a| bbb| a| 0.2| 0.2161103600613806|
| aaa| a| bbb| b| 0.4| 0.6933162039799152|
| aaa| a| ccc| a| 0.0| 0.34941331375143353|
| aaa| a| ccc| b| 0.6| 0.5354750033557132|
| aaa| b| aaa| a| 0.4| 0.19428899651078324|
| aaa| b| bbb| a| 0.2| 0.10702152405150611|
| aaa| b| bbb| b| 0.2| 0.4033681950723296|
| aaa| b| ccc| a| 0.0| 0.20097172584128625|
| aaa| b| ccc| b| 0.4| 0.6861144738544892|
| bbb| a| aaa| a| 0.2| 0.3590385377694502|
| bbb| a| aaa| b| 0.2| 0.27266040008605663|
| bbb| a| bbb| b| 0.0| 1.1313716028957246|
| bbb| a| ccc| a| 0.4|0.009321106239696326|
| bbb| a| ccc| b| 0.0| 1.0017633803368193|
| bbb| b| aaa| a| 0.4| 0.2176828683879606|
| bbb| b| aaa| b| 0.2| 0.194213765887726|
| bbb| b| bbb| a| 0.0| 0.21381230488831227|
| bbb| b| ccc| a| 0.0| 0.21074015342537053|
| bbb| b| ccc| b| 0.6| 0.34536679942567616|
+------+-------+------+-------+---------+--------------------+
only showing top 20 rows
それぞれのkeyに対して、カテゴリの一致度による類似度とコサイン類似度が計算されています。
終わりに
今回はdockerのjupyter-sparkイメージを使用して、データ間の類似度を測定するためのサンプルを作成しました。
もっと簡潔な書き方もあるはずですが(MLlibのColumnSimilarityを使用するなど)、諸々の理由があり、今回はこのような回りくどいコードを書きました。
docker及びsparkの初心者なので、これからもう少し練習をしていきたいと思います。