LoginSignup
0
0

More than 5 years have passed since last update.

scikit-learn、Spark.ml、TensorFlow で Perceptron〜(3)spark.ml

Last updated at Posted at 2017-05-07

Spark.ml には単純なPerceptronのモデルはないのでMultilayerPerceptronClassifierを使いました。
隠れ層などを追加すればニューラルネットの学習モデルとして使えると思います。
Spark.ml の基本的な使い方は、線形回帰の方を参照してください。

3.1 学習データの準備

前回の線形回帰と異なるのは以下の点です。

  • csvファイルが3項目データ
  • 学習データ用 DataFrame の features は2次元ベクトル
spPC.py
# read training data                                                                 
dataFile = 'trainPC.csv'
input_DF = spark.read.format("com.databricks.spark.csv").option("inferSchema", "true").load(dataFile)
data_DF = input_DF.withColumnRenamed('_c0', 'x').withColumnRenamed('_c1', 'y').withColumnRenamed('_c2', 'label')

# transform data                                                                     
from pyspark.ml.feature import VectorAssembler
vecAssembler = VectorAssembler(inputCols=data_DF.columns[0:2], outputCol="features")
featurized_DF = vecAssembler.transform(data_DF)

コードで違う点は以下になります。

  • withColumnRenamed('_c1', 'label') → withColumnRenamed('_c2', 'label')
  • inputCols=data_DF.columns[0:1] → inputCols=data_DF.columns[0:2]

3.2 モデルの作成

MultilayerPerceptronClassifier は、ニューラルネットの構成を layersという引数で指定することができます。これは各層のノードの数をリストで指定するもので、最初の数字が入力層のノード数、最後の数字が出力層のノード数を表し、その間は隠れ層のノード数を指定します。Perceptronは隠れ層なし。今回は2次元の入力と 0 or 1 の出力なので、layers=[2,2] となります。

spPC.py
# build the model                                                                    
from pyspark.ml.classification import MultilayerPerceptronClassifier
layers = [2,2]  # input layer = 2, output layer = 2                                  
maxIter = 100
mlp = MultilayerPerceptronClassifier(maxIter=maxIter, layers=layers)

# training                                                                           
model = mlp.fit(featurized_DF)

3.3 モデルを用いた予測

出力は scikit-learn編と同じく、(0,0)〜(10,5) の0.1単位のマトリックスによる評価ですが、入力データと同時に後の評価用の正解値も作っておきます。予測は fit() が返す model に対して transfer() を実行、ということで線形回帰と同じプロセスになります。

spPC.py
# (0, 0)-(10, 5)の範囲で 0.1 単位のマトリックスを作る                                
npax = np.arange(0.0, 10.0, 0.1)
npay = np.arange(0.0, 5.0, 0.1)
npx, npy = np.meshgrid(npax, npay)

a = 0.4
b = 0.8
npl = a * npx + b < npy # 正解データ
npxyz = np.c_[npx.ravel(), npy.ravel(), npl.ravel()]
evaldata_DF = sc.parallelize(npxyz.tolist()).toDF(['x', 'y', 'label'])

# predict                                                                            
eval_DF = vecAssembler.transform(evaldata_DF)
results = model.transform(eval_DF)
  • DataFrameを list から作る場合は sc.parallelize(リスト).toDF() を用います

予測値を predictionカラムから取り出す点も同じです。
spark.ml.classifier の評価用関数はMulticlassClassificationEvaluatorです。
評価から結果出力までのコードは以下。plotPC は scikit-learn編を参照ください。

spPC.py
# results                                                                            
p0 = [float(elm.prediction) for elm in results.collect()]
npz = np.array(p0).reshape(npx.shape)

# evaluate                                                                           
predictionAndLabels = results.select("prediction", "label")
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(metricName="accuracy")
ac = evaluator.evaluate(predictionAndLabels)

# output                                                                             
from plotPC import plotPC
title = "predPC_with_spark"
plotPC(title, npx, npy, npz, a, b, maxIter, ac)

出力結果は以下になります。
predPC_with_spark_100.png

最後に全体のコードを載せておきます。

spPC.py
#!/usr/bin/env python                                                                

import numpy as np
import pyspark

# spark_submit で実行する場合は spark, sc を生成する                                 
on_spark_submit = 1
from pyspark import SparkContext
from pyspark.sql import SparkSession
if on_spark_submit:
    master = 'local'
    appName = 'Perceptron Classification'
    sc = SparkContext(master, appName)
    spark = SparkSession.builder.master(master).appName(appName).config("spark.some.config.option", "some-value").getOrCreate()

# read training data                                                                 
dataFile = 'trainPC.csv'
input_DF = spark.read.format("com.databricks.spark.csv").option("inferSchema", "true").load(dataFile)
data_DF = input_DF.withColumnRenamed('_c0', 'x').withColumnRenamed('_c1', 'y').withColumnRenamed('_c2', 'label')

# transform data                                                                     
from pyspark.ml.feature import VectorAssembler
vecAssembler = VectorAssembler(inputCols=data_DF.columns[0:2], outputCol="features")
featurized_DF = vecAssembler.transform(data_DF)

# build the model                                                                    
from pyspark.ml.classification import MultilayerPerceptronClassifier
layers = [2,2]  # input layer = 2, output layer = 2                                  
maxIter = 100
mlp = MultilayerPerceptronClassifier(maxIter=maxIter, layers=layers)

# training                                                                           
model = mlp.fit(featurized_DF)

# (0, 0)-(10, 5)の範囲で 0.1 単位のマトリックスを作る                                
npax = np.arange(0.0, 10.0, 0.1)
npay = np.arange(0.0, 5.0, 0.1)
npx, npy = np.meshgrid(npax, npay)
a = 0.4
b = 0.8
npl = a * npx + b < npy
npxyz = np.c_[npx.ravel(), npy.ravel(), npl.ravel()]
evaldata_DF = sc.parallelize(npxyz.tolist()).toDF(['x', 'y', 'label'])

# predict                                                                            
eval_DF = vecAssembler.transform(evaldata_DF)
results = model.transform(eval_DF)

# results                                                                            
p0 = [float(elm.prediction) for elm in results.collect()]
npz = np.array(p0).reshape(npx.shape)

# evaluate                                                                           
predictionAndLabels = results.select("prediction", "label")
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(metricName="accuracy")
ac = evaluator.evaluate(predictionAndLabels)

# output                                                                             
from plotPC import plotPC
title = "predPC_with_spark"
plotPC(title, npx, npy, npz, a, b, maxIter, ac)
0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0