Spark.ml には単純なPerceptronのモデルはないのでMultilayerPerceptronClassifierを使いました。
隠れ層などを追加すればニューラルネットの学習モデルとして使えると思います。
Spark.ml の基本的な使い方は、線形回帰の方を参照してください。
3.1 学習データの準備
前回の線形回帰と異なるのは以下の点です。
- csvファイルが3項目データ
- 学習データ用 DataFrame の features は2次元ベクトル
# read training data
dataFile = 'trainPC.csv'
input_DF = spark.read.format("com.databricks.spark.csv").option("inferSchema", "true").load(dataFile)
data_DF = input_DF.withColumnRenamed('_c0', 'x').withColumnRenamed('_c1', 'y').withColumnRenamed('_c2', 'label')
# transform data
from pyspark.ml.feature import VectorAssembler
vecAssembler = VectorAssembler(inputCols=data_DF.columns[0:2], outputCol="features")
featurized_DF = vecAssembler.transform(data_DF)
コードで違う点は以下になります。
- withColumnRenamed('_c1', 'label') → withColumnRenamed('_c2', 'label')
- inputCols=data_DF.columns[0:1] → inputCols=data_DF.columns[0:2]
3.2 モデルの作成
MultilayerPerceptronClassifier は、ニューラルネットの構成を layersという引数で指定することができます。これは各層のノードの数をリストで指定するもので、最初の数字が入力層のノード数、最後の数字が出力層のノード数を表し、その間は隠れ層のノード数を指定します。Perceptronは隠れ層なし。今回は2次元の入力と 0 or 1 の出力なので、layers=[2,2] となります。
# build the model
from pyspark.ml.classification import MultilayerPerceptronClassifier
layers = [2,2] # input layer = 2, output layer = 2
maxIter = 100
mlp = MultilayerPerceptronClassifier(maxIter=maxIter, layers=layers)
# training
model = mlp.fit(featurized_DF)
3.3 モデルを用いた予測
出力は scikit-learn編と同じく、(0,0)〜(10,5) の0.1単位のマトリックスによる評価ですが、入力データと同時に後の評価用の正解値も作っておきます。予測は fit() が返す model に対して transfer() を実行、ということで線形回帰と同じプロセスになります。
# (0, 0)-(10, 5)の範囲で 0.1 単位のマトリックスを作る
npax = np.arange(0.0, 10.0, 0.1)
npay = np.arange(0.0, 5.0, 0.1)
npx, npy = np.meshgrid(npax, npay)
a = 0.4
b = 0.8
npl = a * npx + b < npy # 正解データ
npxyz = np.c_[npx.ravel(), npy.ravel(), npl.ravel()]
evaldata_DF = sc.parallelize(npxyz.tolist()).toDF(['x', 'y', 'label'])
# predict
eval_DF = vecAssembler.transform(evaldata_DF)
results = model.transform(eval_DF)
- DataFrameを list から作る場合は sc.parallelize(リスト).toDF() を用います
予測値を predictionカラムから取り出す点も同じです。
spark.ml.classifier の評価用関数はMulticlassClassificationEvaluatorです。
評価から結果出力までのコードは以下。plotPC は scikit-learn編を参照ください。
# results
p0 = [float(elm.prediction) for elm in results.collect()]
npz = np.array(p0).reshape(npx.shape)
# evaluate
predictionAndLabels = results.select("prediction", "label")
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(metricName="accuracy")
ac = evaluator.evaluate(predictionAndLabels)
# output
from plotPC import plotPC
title = "predPC_with_spark"
plotPC(title, npx, npy, npz, a, b, maxIter, ac)
最後に全体のコードを載せておきます。
#!/usr/bin/env python
import numpy as np
import pyspark
# spark_submit で実行する場合は spark, sc を生成する
on_spark_submit = 1
from pyspark import SparkContext
from pyspark.sql import SparkSession
if on_spark_submit:
master = 'local'
appName = 'Perceptron Classification'
sc = SparkContext(master, appName)
spark = SparkSession.builder.master(master).appName(appName).config("spark.some.config.option", "some-value").getOrCreate()
# read training data
dataFile = 'trainPC.csv'
input_DF = spark.read.format("com.databricks.spark.csv").option("inferSchema", "true").load(dataFile)
data_DF = input_DF.withColumnRenamed('_c0', 'x').withColumnRenamed('_c1', 'y').withColumnRenamed('_c2', 'label')
# transform data
from pyspark.ml.feature import VectorAssembler
vecAssembler = VectorAssembler(inputCols=data_DF.columns[0:2], outputCol="features")
featurized_DF = vecAssembler.transform(data_DF)
# build the model
from pyspark.ml.classification import MultilayerPerceptronClassifier
layers = [2,2] # input layer = 2, output layer = 2
maxIter = 100
mlp = MultilayerPerceptronClassifier(maxIter=maxIter, layers=layers)
# training
model = mlp.fit(featurized_DF)
# (0, 0)-(10, 5)の範囲で 0.1 単位のマトリックスを作る
npax = np.arange(0.0, 10.0, 0.1)
npay = np.arange(0.0, 5.0, 0.1)
npx, npy = np.meshgrid(npax, npay)
a = 0.4
b = 0.8
npl = a * npx + b < npy
npxyz = np.c_[npx.ravel(), npy.ravel(), npl.ravel()]
evaldata_DF = sc.parallelize(npxyz.tolist()).toDF(['x', 'y', 'label'])
# predict
eval_DF = vecAssembler.transform(evaldata_DF)
results = model.transform(eval_DF)
# results
p0 = [float(elm.prediction) for elm in results.collect()]
npz = np.array(p0).reshape(npx.shape)
# evaluate
predictionAndLabels = results.select("prediction", "label")
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(metricName="accuracy")
ac = evaluator.evaluate(predictionAndLabels)
# output
from plotPC import plotPC
title = "predPC_with_spark"
plotPC(title, npx, npy, npz, a, b, maxIter, ac)