Python
Spark
ApacheSpark

Sparkでコサイン類似度を求める

たまに、レコードが持つ変数間でコサイン類似度を計算したい時があるので、備忘として書いておく

なお、以下の例で扱うデータは、適当なIDと紐づけられたベクトルをもつデータフレームとする。

データ例
from pyspark.mllib.linalg import Vectors

data = sc.parallelize([
    ("a", Vectors.dense([0, 1, 2])),
    ("b", Vectors.dense([3, 4, 5])),
    ("c", Vectors.dense([6, 7, 8])) 
]).toDF(["id", "features"])

1対1で計算する

ベクトル型の関数を定義して、コサイン距離を計算する。
指定の組み合わせで計算する場合はこちらが楽。

1対1のコサイン類似度計算
# 適当に交差結合で組み合わせデータ
_data = data.select(col("id").alias("_id"), col("features").alias("_features"))
tgt_data = data.crossJoin(_data)

# コサイン類似度を計算
dist_data = tgt_data.rdd.map(
    lambda r: (r.id, r._id, r.features.dot(r._features) / (r.features.norm(2) * r._features.norm(2)) )
).toDF(["id", "_id", "similarity"])

※UDFを定義して計算する方法もあるが、ここでは使用していない。

全対全で計算する

1対1の指定の組み合わせで計算する場合の手順を拡張してもいいが、
RowMatrixのColumnSimilaritiesを使うと簡単。

DIMSUMを利用しているこちらの方が加工の手間を差し引いても(多分)計算量が少なくなる。

20180510 追記:
要素に欠損値があると、0として扱われて計算されるらしく、コサイン類似度で得たい結果を得ることができない。

全対全のコサイン類似度計算
from pyspark.mllib.linalg.distributed import CoordinateMatrix, MatrixEntry

# RowMatrixに変換
mat = RowMatrix(data.select("features").rdd.map(lambda r: r.features))

# RowMatrixを転置
mat_t = \
CoordinateMatrix(
    mat.rows.zipWithIndex().flatMap(
        lambda x: [MatrixEntry(x[1], j, v) for j, v in enumerate(x[0])])
).transpose()

# コサイン距離を全パターンで計算し、DF化
dist_mat = \
mat_t.toRowMatrix().columnSimilarities() \
.entries.map(lambda r: (r.i, r.j, r.value)).toDF(["i", "j", "similarity"])

関連