Sparkの機械学習ライブラリは、RDDベースAPIのmllib と DataFrameベースAPIのml の2つありますが、2.0以降は ml が推奨されているようなので、ここも ml の API を使用します。
scikit-learnの線形回帰は非常に簡単でしたが、Sparkは全てDataFrameを介するので少し面倒でした。
3.1 学習データの準備
Sparkのデータフレームは sparkのライブラリ関数でcsvファイルを読み込めば作成できます。ここは簡単と言えば簡単です。
# read data
dataFile = 'sampleLR.csv'
input_DF = spark.read.format("com.databricks.spark.csv").option("inferSchema", "true").load(dataFile)
データ読み込みについては以下のこともわかりました。
- option("inferSchema", "true") は、データが数値の場合に数値に変換して読み込んでくれます。デフォルトは false なので、このオプション無しだと数値は文字列データとなります。
- tsv を読み込む場合は、区切り記号のオプションを追加します。
spark.read.format("com.databricks.spark.csv").option("delimiter", "\t").option("inferSchema", "true").load(dataFile)
- option("header", "true") を使うと、先頭行をデータのカラム名と認識します。オプション指定がない場合はデフォルトのカラム名 '_c0', '_c1' ... が設定されます。
spark.ml の学習データは、x1, x2 ... をベクトル化した features というカラムと y 用の label というカラムを持つデータフレームにする必要があります(カラム名はモデル作成時にオプションで指定可能)。カラムをベクトル化するには、VectorAssembler を使用すると簡単です。
# カラム名を設定する
data_DF = input_DF.withColumnRenamed('_c0', 'x').withColumnRenamed('_c1', 'label')
# x をベクトル化して、featuresカラムにする
from pyspark.ml.feature import VectorAssembler
vecAssembler = VectorAssembler(inputCols=data_DF.columns[0:1], outputCol="features")
featurized_DF = vecAssembler.transform(data_DF)
3.2 モデルの作成
Spark.ml の線形回帰ライブラリは GeneralizedLinearRegression です。features, label のカラム名を持つデータフレームをモデルに渡して fit() を行うと線形回帰が行われます。計算結果の a1, a2 ... は model.coefficients、b は model.intercept で参照できます。
# build the model
from pyspark.ml.regression import GeneralizedLinearRegression
glr = GeneralizedLinearRegression()
# training
model = glr.fit(featurized_DF)
# model parameters
a = model.coefficients[0]
b = model.intercept
3.3 モデルを用いた予測
fit() で作成した model に transform() を行うと予測データが返ってきます。transform() が返すのは prediction コラムが追加されたデータフレームで、予測値はここに収められています。
- spark データフレームをリストにするには .collect() を使います。
# prediction
results = model.transform(featurized_DF)
elms = results.collect()
x0 = [float(elm.features[0]) for elm in elms]
y0 = [float(elm.label) for elm in elms]
p0 = [float(elm.prediction) for elm in elms]
x = np.array(x0)
y = np.array(y0)
p = np.array(p0)
# output
from plotLR import plotLR
title = 'predLR_with_Spark'
plotLR(title, x, y, p, 0.4, 0.8, a, b)
plotLR は scikit-learn の線形回帰で使ったものと同じです。Spark.ml の出力結果は以下で、a, b とも scikit-learn と同じになりました。
3.4 spark-submit
spark の python API を使用するには、通常のpython ではなく、pyspark を使います。しかし、ver 2.1 現在で、pyspark は対話的に使用できても、コマンド形式での実行はできません。
# pyspark は成功
% pyspark
Python 3.5.2 (default, Dec 15 2016, 00:35:09)
...
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/__ / .__/\_,_/_/ /_/\_\ version 2.1.0
/_/
Using Python version 3.5.2 (default, Dec 15 2016 00:35:09)
SparkSession available as 'spark'.
>>>
# pyspark x.py は失敗
pyspark spLR.py
Running python applications through 'pyspark' is not supported as of Spark 2.0.
Use ./bin/spark-submit <python file>
コマンド形式で実行したい場合は上のメッセージ通り spark-submit を使いますが、この時、pysparkでデフォルトで存在する spark と sc オブジェクトを自分で作っておく必要があります。spark-submit で動作するコードは以下になります。
plotLR.py は scikit-learn編にコードがあります。
import numpy as np
import pyspark
# spark-submit で実行する場合は spark, sc を生成する
on_spark_submit = 1
from pyspark import SparkContext
from pyspark.sql import SparkSession
if on_spark_submit:
master = 'local'
appName = 'Linear Regeession'
sc = SparkContext(master, appName)
spark = SparkSession.builder.master(master).appName(appName).config("spark.some.config.option", "some-value").getOrCreate()
# read data
dataFile = 'sampleLR.csv'
input_DF = spark.read.format("com.databricks.spark.csv").option("inferSchema", "true").load(dataFile)
data_DF = input_DF.withColumnRenamed('_c0', 'x').withColumnRenamed('_c1', 'label')
# transform data
from pyspark.ml.feature import VectorAssembler
vecAssembler = VectorAssembler(inputCols=data_DF.columns[0:1], outputCol="features")
featurized_DF = vecAssembler.transform(data_DF)
# build the model
from pyspark.ml.regression import GeneralizedLinearRegression
glr = GeneralizedLinearRegression()
# training
model = glr.fit(featurized_DF)
# model parametersa
a = model.coefficients[0]
b = model.intercept
# prediction
results = model.transform(featurized_DF)
elms = results.collect()
x0 = [float(elm.features[0]) for elm in elms]
y0 = [float(elm.label) for elm in elms]
p0 = [float(elm.prediction) for elm in elms]
x = np.array(x0)
y = np.array(y0)
p = np.array(p0)
# output
from plotLR import plotLR
title = 'predLR_with_Spark'
plotLR(title, x, y, p, 0.4, 0.8, a, b)