SparkのMLlibはMLに移行しつつあります。Spark2.0からはRDDベースのMLlib APIは保守のみになり、今後はDataFrameベースのAPIが標準になるそうです。ここではPySparkでML APIを使い、主成分分析を行ってみます。
※DataFrameはPandasのDataFrameとは異なります。
新聞10紙の紙面の傾向分析
下記のリンク先にある新聞10紙のニュース、ビジネス、スポーツの充実度の評価から紙面の傾向を分析する、という例題をSparkで実装してみます。
データ
新聞10紙について、記事の内容をニュース・ビジネス・スポーツについてその充実度を10点評価で調査した、とのこと。スケールは0から10で大変充実しているが10,充実していないが0。
no,ニュース,ビジネス,スポーツ
1,8,9,4
2,2,5,7
3,8,5,6
4,3,5,4
5,7,4,9
6,4,3,4
7,3,6,8
8,6,8,2
9,5,4,5
10,6,7,6
コード
SparkSession初期化
以前はSparkContext
から始めるのが定番でしたが、2.0ではSparkSession
を使います。SQLContext
, HiveContext
はSparkSession
に統合されました。
from pyspark.sql import SparkSession
spark = (SparkSession
.builder
.appName("news")
.enableHiveSupport()
.getOrCreate())
データ読み込み
spark.read.csv
でCSVを読み込みDataFrameに格納します。
df = spark.read.csv(filename, header=True, inferSchema=True, mode="DROPMALFORMED", encoding='UTF-8')
print("==== 生データ ====")
df.show(truncate=False)
実行結果
日本語が含まれているとテーブルが崩れます。字幅が考慮されていません。
$ export PYTHONIOENCODING=utf8
$ spark-submit ex1.py
==== 生データ ====
+---+----+----+----+
|no |ニュース|ビジネス|スポーツ|
+---+----+----+----+
|1 |8 |9 |4 |
|2 |2 |5 |7 |
|3 |8 |5 |6 |
|4 |3 |5 |4 |
|5 |7 |4 |9 |
|6 |4 |3 |4 |
|7 |3 |6 |8 |
|8 |6 |8 |2 |
|9 |5 |4 |5 |
|10 |6 |7 |6 |
+---+----+----+----+
データ形式変換
PCA()はベクトル形式の変量を要求します。VectorAssembler
を使って、[ニュース, ビジネス, スポーツ]をベクトルとし、変量
カラムに格納します。.transform(df)
は新たなDataFrameを作ります。
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(inputCols=df.columns[1:], outputCol="変量")
feature_vectors = assembler.transform(df)
feature_vectors.show(truncate=False)
実行結果
変量
カラムにベクトルが追加になっています。
+---+----+----+----+-------------+
| no|ニュース|ビジネス|スポーツ| 変量|
+---+----+----+----+-------------+
| 1| 8| 9| 4|[8.0,9.0,4.0]|
| 2| 2| 5| 7|[2.0,5.0,7.0]|
| 3| 8| 5| 6|[8.0,5.0,6.0]|
| 4| 3| 5| 4|[3.0,5.0,4.0]|
| 5| 7| 4| 9|[7.0,4.0,9.0]|
| 6| 4| 3| 4|[4.0,3.0,4.0]|
| 7| 3| 6| 8|[3.0,6.0,8.0]|
| 8| 6| 8| 2|[6.0,8.0,2.0]|
| 9| 5| 4| 5|[5.0,4.0,5.0]|
| 10| 6| 7| 6|[6.0,7.0,6.0]|
+---+----+----+----+-------------+
入力データの標準化
リンク先に倣い、ここでもデータを標準化してから計算します。主成分分析では一般に標準化してから計算する方が良い結果が得られます。MLにはStandardScaler
がありますのでこれを使います。
入力は変量
カラムのベクトルで、出力は標準化変量
とします。このAPIはまず.fit
により入力データからモデルを作り、そして.transform
によって再度入力データを与え実際に変換します。
from pyspark.ml.feature import StandardScaler
# step1
scaler = StandardScaler(inputCol="変量", outputCol="標準化変量", withStd=True, withMean=True)
scalerModel = scaler.fit(feature_vectors)
# step2
std_feature_vectors = scalerModel.transform(feature_vectors)
#標準化変量だけ表示
print("==== 標準化されたデータ ====")
std_feature_vectors.select("標準化変量").show(truncate=False)
実行結果
リンク先の表と比較すると微妙に異なっています。StandardScaler
は不偏分散(n-1)を用いているのですが、リンク先では標本分散(n)が使われています。詳しい話は他記事をご参照ください。
==== 標準化されたデータ ====
+---------------------------------------------------------------+
|標準化変量 |
+---------------------------------------------------------------+
|[1.3023647131866891,1.7919573407620815,-0.7071067811865476] |
|[-1.4884168150705013,-0.3162277660168382,0.7071067811865476] |
|[1.3023647131866891,-0.3162277660168382,0.23570226039551587] |
|[-1.0232865603609695,-0.3162277660168382,-0.7071067811865476] |
|[0.8372344584771575,-0.8432740427115681,1.649915822768611] |
|[-0.5581563056514377,-1.370320319406298,-0.7071067811865476] |
|[-1.0232865603609695,0.21081851067789167,1.1785113019775793] |
|[0.3721042037676257,1.2649110640673515,-1.649915822768611] |
|[-0.09302605094190601,-0.8432740427115681,-0.23570226039551587]|
|[0.3721042037676257,0.7378647873726216,0.23570226039551587] |
+---------------------------------------------------------------+
PCA
やっと主成分分析APIを呼び出せます。入力は標準化変量
で出力は主成分得点
です。先ほどのStandardScaler
と同じように、まずモデルを作り、次に実際の計算を行います。構築されたモデルからは固有ベクトル、寄与率が得られます。k=3は第三主成分までを計算するという指示です。本来はkを大きな値にして一度計算し、寄与率の上位からの累計が80%あたりになるようなkを選び再度計算します。
from pyspark.ml.feature import PCA
pca = PCA(k=3, inputCol="標準化変量", outputCol="主成分得点")
pcaModel = pca.fit(std_feature_vectors)
print("==== 固有ベクトル ====")
print(pcaModel.pc)
print("==== 寄与率 ====")
print(pcaModel.explainedVariance)
pca_score = pcaModel.transform(std_feature_vectors).select("主成分得点")
print("==== 主成分得点 ====")
pca_score.show(truncate=False)
実行結果
固有ベクトルの結果は、第1列目(左端の縦一列)が第一主成分の固有ベクトル、同様に2列目が第二主成分、3列目が第三主成分になります。
寄与率は、第一主成分が52%, 第二主成分が30%、第三主成分が17.6%ということになりました。第一と第二の累計が82%ですので、第三主成分は削っても良いでしょう。その場合k=2にします。
固有値は得られませんが、寄与率は固有値/固有値の総和
であるから大概の場合は寄与率で足りるはず。
各新聞の主成分得点は第一列が第一主成分、第二列が第二主成分、第三列が第三主成分です。第一主成分の得点の符号がリンク先と逆さまになっていますが、ベクトルの向きが180度反対を向いているというだけで、分析結果に影響はありません。不偏分散・標本分散の違いから少し値が異なっています。
==== 固有ベクトル ====
DenseMatrix([[-0.53130806, 0.68925233, -0.49258803],
[-0.67331251, 0.00933405, 0.73929908],
[ 0.51416145, 0.72446125, 0.45912296]])
==== 寄与率 ====
[0.52355344314,0.300887148322,0.175559408538]
==== 主成分得点 ====
+---------------------------------------------------------------+
|主成分得点 |
+---------------------------------------------------------------+
|[-2.2620712255691466,0.4021126641946994,0.35861418406317674] |
|[1.3672950172090064,-0.516574975843834,0.8240383763102186] |
|[-0.35784774304549694,1.0654633785914394,-0.7670998522924913] |
|[0.3930334607140129,-1.220525792393691,-0.05437714111925901] |
|[0.9712806670593661,1.7644947192188811,-0.2783291638335238] |
|[0.8556397135650156,-0.9097726336587761,-1.0627843972001996] |
|[1.0076787432724863,0.1504509197015279,1.2009982469039933] |
|[-1.8977055313059759,-0.9270196509736093,-0.005660728153863093]|
|[0.4960234396284956,-0.24274673811341405,-0.6858245266064249] |
|[-0.5733265415277634,0.43411810927677885,0.47042500192836967] |
+---------------------------------------------------------------+
まとめ
以上SparkのML APIのうち、DataFrame, PCA, StandardScalerを使った主成分分析の実例を紹介しました。
-
PCA
の入力はベクトル形式の変量、出力は主成分得点 - PCAモデル作りと実際の計算の2段階になっている
- PCAモデルからは固有ベクトルと寄与率が得られる
- 固有値は得られない
- 符号は実装依存なので他の実装と比較して符号が逆のこともある
- 入力の標準化には
StandardScaler
を使う - StandardScalerは不偏分散を使っている
新聞の傾向の分析は...リンク先の記事をご覧ください(^^;
全ソースコード
# -*- coding: utf-8 -*-
from pyspark.sql import SparkSession
from pyspark.ml.feature import PCA, VectorAssembler, StandardScaler
# Initialize SparkSession
spark = (SparkSession
.builder
.appName("news")
.enableHiveSupport()
.getOrCreate())
# Read raw data
df = spark.read.csv('news.csv', header=True, inferSchema=True, mode="DROPMALFORMED", encoding='UTF-8')
print("==== 生データ ====")
df.show(truncate=False)
assembler = VectorAssembler(inputCols=df.columns[1:], outputCol="変量")
feature_vectors = assembler.transform(df)
feature_vectors.show()
scaler = StandardScaler(inputCol="変量", outputCol="標準化変量", withStd=True, withMean=True)
scalerModel = scaler.fit(feature_vectors)
std_feature_vectors = scalerModel.transform(feature_vectors)
print("==== 標準化されたデータ ====")
std_feature_vectors.select("標準化変量").show(truncate=False)
# build PCA model
pca = PCA(k=3, inputCol="標準化変量", outputCol="主成分得点")
pcaModel = pca.fit(std_feature_vectors)
print("==== 固有ベクトル ====")
print(pcaModel.pc)
print("==== 寄与率 ====")
print(pcaModel.explainedVariance)
pca_score = pcaModel.transform(std_feature_vectors).select("主成分得点")
print("==== 主成分得点 ====")
pca_score.show(truncate=False)