Edited at

Spark MLで主成分分析

More than 1 year has passed since last update.

SparkのMLlibはMLに移行しつつあります。Spark2.0からはRDDベースのMLlib APIは保守のみになり、今後はDataFrameベースのAPIが標準になるそうです。ここではPySparkでML APIを使い、主成分分析を行ってみます。

※DataFrameはPandasのDataFrameとは異なります。


新聞10紙の紙面の傾向分析

下記のリンク先にある新聞10紙のニュース、ビジネス、スポーツの充実度の評価から紙面の傾向を分析する、という例題をSparkで実装してみます。

http://ifs.nog.cc/gucchi24.hp.infoseek.co.jp/SHUSEIEX.htm


データ

新聞10紙について、記事の内容をニュース・ビジネス・スポーツについてその充実度を10点評価で調査した、とのこと。スケールは0から10で大変充実しているが10,充実していないが0。


news.csv

no,ニュース,ビジネス,スポーツ

1,8,9,4
2,2,5,7
3,8,5,6
4,3,5,4
5,7,4,9
6,4,3,4
7,3,6,8
8,6,8,2
9,5,4,5
10,6,7,6


コード


SparkSession初期化

以前はSparkContextから始めるのが定番でしたが、2.0ではSparkSessionを使います。SQLContext, HiveContextSparkSessionに統合されました。

from pyspark.sql import SparkSession

spark = (SparkSession
.builder
.appName("news")
.enableHiveSupport()
.getOrCreate())


データ読み込み

spark.read.csvでCSVを読み込みDataFrameに格納します。


ex1.py

df = spark.read.csv(filename, header=True, inferSchema=True, mode="DROPMALFORMED", encoding='UTF-8')

print("==== 生データ ====")
df.show(truncate=False)


実行結果

日本語が含まれているとテーブルが崩れます。字幅が考慮されていません。

$ export PYTHONIOENCODING=utf8

$ spark-submit ex1.py
==== 生データ ====
+---+----+----+----+
|no |ニュース|ビジネス|スポーツ|
+---+----+----+----+
|1 |8 |9 |4 |
|2 |2 |5 |7 |
|3 |8 |5 |6 |
|4 |3 |5 |4 |
|5 |7 |4 |9 |
|6 |4 |3 |4 |
|7 |3 |6 |8 |
|8 |6 |8 |2 |
|9 |5 |4 |5 |
|10 |6 |7 |6 |
+---+----+----+----+


データ形式変換

PCA()はベクトル形式の変量を要求します。VectorAssemblerを使って、[ニュース, ビジネス, スポーツ]をベクトルとし、変量カラムに格納します。.transform(df)は新たなDataFrameを作ります。

from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(inputCols=df.columns[1:], outputCol="変量")
feature_vectors = assembler.transform(df)
feature_vectors.show(truncate=False)


実行結果

変量カラムにベクトルが追加になっています。

+---+----+----+----+-------------+

| no|ニュース|ビジネス|スポーツ| 変量|
+---+----+----+----+-------------+
| 1| 8| 9| 4|[8.0,9.0,4.0]|
| 2| 2| 5| 7|[2.0,5.0,7.0]|
| 3| 8| 5| 6|[8.0,5.0,6.0]|
| 4| 3| 5| 4|[3.0,5.0,4.0]|
| 5| 7| 4| 9|[7.0,4.0,9.0]|
| 6| 4| 3| 4|[4.0,3.0,4.0]|
| 7| 3| 6| 8|[3.0,6.0,8.0]|
| 8| 6| 8| 2|[6.0,8.0,2.0]|
| 9| 5| 4| 5|[5.0,4.0,5.0]|
| 10| 6| 7| 6|[6.0,7.0,6.0]|
+---+----+----+----+-------------+


入力データの標準化

リンク先に倣い、ここでもデータを標準化してから計算します。主成分分析では一般に標準化してから計算する方が良い結果が得られます。MLにはStandardScalerがありますのでこれを使います。

入力は変量カラムのベクトルで、出力は標準化変量とします。このAPIはまず.fitにより入力データからモデルを作り、そして.transformによって再度入力データを与え実際に変換します。

from pyspark.ml.feature import StandardScaler

# step1
scaler = StandardScaler(inputCol="変量", outputCol="標準化変量", withStd=True, withMean=True)
scalerModel = scaler.fit(feature_vectors)

# step2
std_feature_vectors = scalerModel.transform(feature_vectors)

#標準化変量だけ表示
print("==== 標準化されたデータ ====")
std_feature_vectors.select("標準化変量").show(truncate=False)


実行結果

リンク先の表と比較すると微妙に異なっています。StandardScalerは不偏分散(n-1)を用いているのですが、リンク先では標本分散(n)が使われています。詳しい話は他記事をご参照ください。

==== 標準化されたデータ ====

+---------------------------------------------------------------+
|標準化変量 |
+---------------------------------------------------------------+
|[1.3023647131866891,1.7919573407620815,-0.7071067811865476] |
|[-1.4884168150705013,-0.3162277660168382,0.7071067811865476] |
|[1.3023647131866891,-0.3162277660168382,0.23570226039551587] |
|[-1.0232865603609695,-0.3162277660168382,-0.7071067811865476] |
|[0.8372344584771575,-0.8432740427115681,1.649915822768611] |
|[-0.5581563056514377,-1.370320319406298,-0.7071067811865476] |
|[-1.0232865603609695,0.21081851067789167,1.1785113019775793] |
|[0.3721042037676257,1.2649110640673515,-1.649915822768611] |
|[-0.09302605094190601,-0.8432740427115681,-0.23570226039551587]|
|[0.3721042037676257,0.7378647873726216,0.23570226039551587] |
+---------------------------------------------------------------+


PCA

やっと主成分分析APIを呼び出せます。入力は標準化変量で出力は主成分得点です。先ほどのStandardScalerと同じように、まずモデルを作り、次に実際の計算を行います。構築されたモデルからは固有ベクトル、寄与率が得られます。k=3は第三主成分までを計算するという指示です。本来はkを大きな値にして一度計算し、寄与率の上位からの累計が80%あたりになるようなkを選び再度計算します。

from pyspark.ml.feature import PCA

pca = PCA(k=3, inputCol="標準化変量", outputCol="主成分得点")
pcaModel = pca.fit(std_feature_vectors)

print("==== 固有ベクトル ====")
print(pcaModel.pc)

print("==== 寄与率 ====")
print(pcaModel.explainedVariance)

pca_score = pcaModel.transform(std_feature_vectors).select("主成分得点")
print("==== 主成分得点 ====")
pca_score.show(truncate=False)


実行結果

固有ベクトルの結果は、第1列目(左端の縦一列)が第一主成分の固有ベクトル、同様に2列目が第二主成分、3列目が第三主成分になります。

寄与率は、第一主成分が52%, 第二主成分が30%、第三主成分が17.6%ということになりました。第一と第二の累計が82%ですので、第三主成分は削っても良いでしょう。その場合k=2にします。

固有値は得られませんが、寄与率は固有値/固有値の総和であるから大概の場合は寄与率で足りるはず。

各新聞の主成分得点は第一列が第一主成分、第二列が第二主成分、第三列が第三主成分です。第一主成分の得点の符号がリンク先と逆さまになっていますが、ベクトルの向きが180度反対を向いているというだけで、分析結果に影響はありません。不偏分散・標本分散の違いから少し値が異なっています。

==== 固有ベクトル ====

DenseMatrix([[-0.53130806, 0.68925233, -0.49258803],
[-0.67331251, 0.00933405, 0.73929908],
[ 0.51416145, 0.72446125, 0.45912296]])
==== 寄与率 ====
[0.52355344314,0.300887148322,0.175559408538]
==== 主成分得点 ====
+---------------------------------------------------------------+
|主成分得点 |
+---------------------------------------------------------------+
|[-2.2620712255691466,0.4021126641946994,0.35861418406317674] |
|[1.3672950172090064,-0.516574975843834,0.8240383763102186] |
|[-0.35784774304549694,1.0654633785914394,-0.7670998522924913] |
|[0.3930334607140129,-1.220525792393691,-0.05437714111925901] |
|[0.9712806670593661,1.7644947192188811,-0.2783291638335238] |
|[0.8556397135650156,-0.9097726336587761,-1.0627843972001996] |
|[1.0076787432724863,0.1504509197015279,1.2009982469039933] |
|[-1.8977055313059759,-0.9270196509736093,-0.005660728153863093]|
|[0.4960234396284956,-0.24274673811341405,-0.6858245266064249] |
|[-0.5733265415277634,0.43411810927677885,0.47042500192836967] |
+---------------------------------------------------------------+


まとめ

以上SparkのML APIのうち、DataFrame, PCA, StandardScalerを使った主成分分析の実例を紹介しました。



  • PCAの入力はベクトル形式の変量、出力は主成分得点

  • PCAモデル作りと実際の計算の2段階になっている

  • PCAモデルからは固有ベクトルと寄与率が得られる

  • 固有値は得られない

  • 符号は実装依存なので他の実装と比較して符号が逆のこともある

  • 入力の標準化にはStandardScalerを使う

  • StandardScalerは不偏分散を使っている

新聞の傾向の分析は...リンク先の記事をご覧ください(^^;


全ソースコード


pca.py

# -*- coding: utf-8 -*-

from pyspark.sql import SparkSession
from pyspark.ml.feature import PCA, VectorAssembler, StandardScaler

# Initialize SparkSession
spark = (SparkSession
.builder
.appName("news")
.enableHiveSupport()
.getOrCreate())

# Read raw data
df = spark.read.csv('news.csv', header=True, inferSchema=True, mode="DROPMALFORMED", encoding='UTF-8')

print("==== 生データ ====")
df.show(truncate=False)

assembler = VectorAssembler(inputCols=df.columns[1:], outputCol="変量")
feature_vectors = assembler.transform(df)
feature_vectors.show()

scaler = StandardScaler(inputCol="変量", outputCol="標準化変量", withStd=True, withMean=True)
scalerModel = scaler.fit(feature_vectors)
std_feature_vectors = scalerModel.transform(feature_vectors)

print("==== 標準化されたデータ ====")
std_feature_vectors.select("標準化変量").show(truncate=False)

# build PCA model
pca = PCA(k=3, inputCol="標準化変量", outputCol="主成分得点")
pcaModel = pca.fit(std_feature_vectors)

print("==== 固有ベクトル ====")
print(pcaModel.pc)

print("==== 寄与率 ====")
print(pcaModel.explainedVariance)

pca_score = pcaModel.transform(std_feature_vectors).select("主成分得点")
print("==== 主成分得点 ====")

pca_score.show(truncate=False)