概要
この世界で一体何人の役に立てるかわからないですが、pysparkで、Dataframe内の疎行列を集約する方法です。微妙にハマるので。
TL;DL
こちらを大変参考に参考にさせていただきながら、和訳しました。
from pyspark.ml.feature import CountVectorizer
# Input DataとなるDataframe
df = spark.createDataFrame([
(100, "a b c".split(" ")),
(100, "a b b c a".split(" "))
], ["id", "words"])
# ワードカウントで、疎行列を作る
cv = CountVectorizer(inputCol="words", outputCol="features", vocabSize=3, minDF=2.0)
model = cv.fit(df)
df = model.transform(df)
# show Action
result.show(truncate=False)
# 出力です
(7) Spark Jobs
df:pyspark.sql.dataframe.DataFrame = [id: long, words: array ... 1 more fields]
+---+---------------+-------------------------+
|id |words |features |
+---+---------------+-------------------------+
|100|[a, b, c] |(3,[0,1,2],[1.0,1.0,1.0])|
|100|[a, b, b, c, a]|(3,[0,1,2],[2.0,2.0,1.0])|
+---+---------------+-------------------------+
from pyspark.sql import functions as F
from pyspark.sql import types as T
from pyspark.ml.linalg import SparseVector, DenseVector
# Sparse行列を、普通の配列にするUDFの準備
def sparse_to_array(v):
v = DenseVector(v)
new_array = list([float(x) for x in v])
return new_array
# UDFの登録
sparse_to_array_udf = F.udf(sparse_to_array, T.ArrayType(T.FloatType()))
# Driverに返さないまま、Sparseをリストに変換したカラムを追加する
df = df.withColumn('features_array', sparse_to_array_udf('features'))
df.show()
# Driverに返さないまま、リストをループで回しながらsumし、またArrayに変換する。
# ここのF.sumを変更すれば、平均なども算出可能
df_agg = df.groupBy('id').agg(F.array(*[F.sum(F.col('features_array')[i]) for i in range(3)]).alias("averages"))
df_agg.show()
# 出力です
(10) Spark Jobs
df:pyspark.sql.dataframe.DataFrame = [id: long, words: array ... 2 more fields]
df_agg:pyspark.sql.dataframe.DataFrame = [id: long, averages: array]
+---+---------------+--------------------+---------------+
| id| words| features| features_array|
+---+---------------+--------------------+---------------+
|100| [a, b, c]|(3,[0,1,2],[1.0,1...|[1.0, 1.0, 1.0]|
|100|[a, b, b, c, a]|(3,[0,1,2],[2.0,2...|[2.0, 2.0, 1.0]|
+---+---------------+--------------------+---------------+
+---+---------------+
| id| averages|
+---+---------------+
|100|[3.0, 3.0, 2.0]|
+---+---------------+
解説
- Dataframe内に疎行列が現れることはよくある(特にNLP)。
- それらを集約したいシチュエーションに、上記が有効。
- 疎行列そのままだと集約できないので、一度配列に変更し、配列の各要素を足し合わせながら、また配列に復元している。
ユースケース
- 大体NLPで使うと思う。あるデータをFeature Hashingして疎行列ベクトルで表現した後に、カテゴリごとに集約したい場合等。