LoginSignup
4
6

More than 5 years have passed since last update.

scikit-learn、Spark.ml、TensorFlow で線形回帰〜(3)spark.ml

Last updated at Posted at 2017-05-07

Sparkの機械学習ライブラリは、RDDベースAPIのmllib と DataFrameベースAPIのml の2つありますが、2.0以降は ml が推奨されているようなので、ここも ml の API を使用します。
scikit-learnの線形回帰は非常に簡単でしたが、Sparkは全てDataFrameを介するので少し面倒でした。

3.1 学習データの準備

Sparkのデータフレームは sparkのライブラリ関数でcsvファイルを読み込めば作成できます。ここは簡単と言えば簡単です。

spLR.py
# read data                                                                          
dataFile = 'sampleLR.csv'
input_DF = spark.read.format("com.databricks.spark.csv").option("inferSchema", "true").load(dataFile)

データ読み込みについては以下のこともわかりました。

  • option("inferSchema", "true") は、データが数値の場合に数値に変換して読み込んでくれます。デフォルトは false なので、このオプション無しだと数値は文字列データとなります。
  • tsv を読み込む場合は、区切り記号のオプションを追加します。 spark.read.format("com.databricks.spark.csv").option("delimiter", "\t").option("inferSchema", "true").load(dataFile)
  • option("header", "true") を使うと、先頭行をデータのカラム名と認識します。オプション指定がない場合はデフォルトのカラム名 '_c0', '_c1' ... が設定されます。

spark.ml の学習データは、x1, x2 ... をベクトル化した features というカラムと y 用の label というカラムを持つデータフレームにする必要があります(カラム名はモデル作成時にオプションで指定可能)。カラムをベクトル化するには、VectorAssembler を使用すると簡単です。

spLR.py
# カラム名を設定する
data_DF = input_DF.withColumnRenamed('_c0', 'x').withColumnRenamed('_c1', 'label')

# x をベクトル化して、featuresカラムにする                                                                    
from pyspark.ml.feature import VectorAssembler
vecAssembler = VectorAssembler(inputCols=data_DF.columns[0:1], outputCol="features")
featurized_DF = vecAssembler.transform(data_DF)

3.2 モデルの作成

Spark.ml の線形回帰ライブラリは GeneralizedLinearRegression です。features, label のカラム名を持つデータフレームをモデルに渡して fit() を行うと線形回帰が行われます。計算結果の a1, a2 ... は model.coefficients、b は model.intercept で参照できます。

spLR.py
# build the model                                                                    
from pyspark.ml.regression import GeneralizedLinearRegression
glr = GeneralizedLinearRegression()

# training                                                                           
model = glr.fit(featurized_DF)

# model parameters
a = model.coefficients[0]
b = model.intercept

3.3 モデルを用いた予測

fit() で作成した model に transform() を行うと予測データが返ってきます。transform() が返すのは prediction コラムが追加されたデータフレームで、予測値はここに収められています。

  • spark データフレームをリストにするには .collect() を使います。
# prediction                                                                         
results = model.transform(featurized_DF)
elms = results.collect()
x0 = [float(elm.features[0]) for elm in elms]
y0 = [float(elm.label) for elm in elms]
p0 = [float(elm.prediction) for elm in elms]
x = np.array(x0)
y = np.array(y0)
p = np.array(p0)

# output                                                                             
from plotLR import plotLR
title = 'predLR_with_Spark'
plotLR(title, x, y, p, 0.4, 0.8, a, b)

plotLR は scikit-learn の線形回帰で使ったものと同じです。Spark.ml の出力結果は以下で、a, b とも scikit-learn と同じになりました。
predLR_with_Spark.png

3.4 spark-submit

spark の python API を使用するには、通常のpython ではなく、pyspark を使います。しかし、ver 2.1 現在で、pyspark は対話的に使用できても、コマンド形式での実行はできません。

# pyspark は成功
% pyspark
Python 3.5.2 (default, Dec 15 2016, 00:35:09)
...
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 2.1.0
      /_/

Using Python version 3.5.2 (default, Dec 15 2016 00:35:09)
SparkSession available as 'spark'.
>>> 
# pyspark x.py は失敗
pyspark spLR.py
Running python applications through 'pyspark' is not supported as of Spark 2.0.
Use ./bin/spark-submit <python file>

コマンド形式で実行したい場合は上のメッセージ通り spark-submit を使いますが、この時、pysparkでデフォルトで存在する spark と sc オブジェクトを自分で作っておく必要があります。spark-submit で動作するコードは以下になります。
plotLR.py は scikit-learn編にコードがあります。

spLR.py
import numpy as np
import pyspark

# spark-submit で実行する場合は spark, sc を生成する                                 
on_spark_submit = 1
from pyspark import SparkContext
from pyspark.sql import SparkSession
if on_spark_submit:
    master = 'local'
    appName = 'Linear Regeession'
    sc = SparkContext(master, appName)
    spark = SparkSession.builder.master(master).appName(appName).config("spark.some.config.option", "some-value").getOrCreate()

# read data                                                                          
dataFile = 'sampleLR.csv'
input_DF = spark.read.format("com.databricks.spark.csv").option("inferSchema", "true").load(dataFile)
data_DF = input_DF.withColumnRenamed('_c0', 'x').withColumnRenamed('_c1', 'label')

# transform data                                                                     
from pyspark.ml.feature import VectorAssembler
vecAssembler = VectorAssembler(inputCols=data_DF.columns[0:1], outputCol="features")
featurized_DF = vecAssembler.transform(data_DF)

# build the model                                                                    
from pyspark.ml.regression import GeneralizedLinearRegression
glr = GeneralizedLinearRegression()

# training                                                                           
model = glr.fit(featurized_DF)

# model parametersa                                                                  
a = model.coefficients[0]
b = model.intercept

# prediction                                                                         
results = model.transform(featurized_DF)
elms = results.collect()
x0 = [float(elm.features[0]) for elm in elms]
y0 = [float(elm.label) for elm in elms]
p0 = [float(elm.prediction) for elm in elms]
x = np.array(x0)
y = np.array(y0)
p = np.array(p0)

# output                                                                             
from plotLR import plotLR
title = 'predLR_with_Spark'
plotLR(title, x, y, p, 0.4, 0.8, a, b)
4
6
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
4
6