LoginSignup
3
2

More than 5 years have passed since last update.

Azure Databricks(Apache Spark)で、SparceVectorをAggregate

Posted at

概要

この世界で一体何人の役に立てるかわからないですが、pysparkで、Dataframe内の疎行列を集約する方法です。微妙にハマるので。

TL;DL

こちらを大変参考に参考にさせていただきながら、和訳しました。

from pyspark.ml.feature import CountVectorizer

# Input DataとなるDataframe
df = spark.createDataFrame([
    (100, "a b c".split(" ")),
    (100, "a b b c a".split(" "))
], ["id", "words"])

# ワードカウントで、疎行列を作る
cv = CountVectorizer(inputCol="words", outputCol="features", vocabSize=3, minDF=2.0)
model = cv.fit(df)
df = model.transform(df)

# show Action
result.show(truncate=False)
# 出力です
(7) Spark Jobs
df:pyspark.sql.dataframe.DataFrame = [id: long, words: array ... 1 more fields]
+---+---------------+-------------------------+
|id |words          |features                 |
+---+---------------+-------------------------+
|100|[a, b, c]      |(3,[0,1,2],[1.0,1.0,1.0])|
|100|[a, b, b, c, a]|(3,[0,1,2],[2.0,2.0,1.0])|
+---+---------------+-------------------------+
from pyspark.sql import functions as F
from pyspark.sql import types as T
from pyspark.ml.linalg import SparseVector, DenseVector

# Sparse行列を、普通の配列にするUDFの準備
def sparse_to_array(v):
  v = DenseVector(v)
  new_array = list([float(x) for x in v])
  return new_array

# UDFの登録
sparse_to_array_udf = F.udf(sparse_to_array, T.ArrayType(T.FloatType()))

# Driverに返さないまま、Sparseをリストに変換したカラムを追加する
df = df.withColumn('features_array', sparse_to_array_udf('features'))
df.show()

# Driverに返さないまま、リストをループで回しながらsumし、またArrayに変換する。
# ここのF.sumを変更すれば、平均なども算出可能
df_agg = df.groupBy('id').agg(F.array(*[F.sum(F.col('features_array')[i]) for i in range(3)]).alias("averages"))
df_agg.show()
# 出力です
(10) Spark Jobs
df:pyspark.sql.dataframe.DataFrame = [id: long, words: array ... 2 more fields]
df_agg:pyspark.sql.dataframe.DataFrame = [id: long, averages: array]
+---+---------------+--------------------+---------------+
| id|          words|            features| features_array|
+---+---------------+--------------------+---------------+
|100|      [a, b, c]|(3,[0,1,2],[1.0,1...|[1.0, 1.0, 1.0]|
|100|[a, b, b, c, a]|(3,[0,1,2],[2.0,2...|[2.0, 2.0, 1.0]|
+---+---------------+--------------------+---------------+

+---+---------------+
| id|       averages|
+---+---------------+
|100|[3.0, 3.0, 2.0]|
+---+---------------+

解説

  • Dataframe内に疎行列が現れることはよくある(特にNLP)。
  • それらを集約したいシチュエーションに、上記が有効。
  • 疎行列そのままだと集約できないので、一度配列に変更し、配列の各要素を足し合わせながら、また配列に復元している。

ユースケース

  • 大体NLPで使うと思う。あるデータをFeature Hashingして疎行列ベクトルで表現した後に、カテゴリごとに集約したい場合等。
3
2
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
3
2