はじめに
「Spark?雷属性でかっこいい!」という煩悩のもと,Docker + Spark + Jupyter Notebook で簡単な機械学習を回して見ました.
お馴染みのタイタニックデータを使い,線形回帰による予測を行いました.
そもそも Spark とは
Spark とは分散処理ライブラリの一つです.
分散処理といえば Hadoop という方も多いと思いますが,自分の理解だと Spark は Hadoop の欠点を補ったライブラリになります.Hadoop が登場したのが 2006 年,その後,2014 年に Hadoop が登場しました.
Hadoop VS Spark
上記,Spark は Hadoop の欠点を補ったとありますが,双方にメリット・デメリットがあるため簡単に表にまとめます.
メリット | デメリット | |
---|---|---|
Hadoop | 大量のデータを扱うことができる | ストレージアクセスがあるためリアルタイムの処理が苦手 |
Spark | オンメモリでの処理によりリアルタイムでの処理が得意 | Hadoopほど大規模なデータは扱えない |
つまり,大きすぎるデータはHadoop,リアルタイムでの処理がしたかったらSparkを用いると良いでしょう.
また,Hadoopのクエリエンジンは Presto,Hive ですが,Spark には多彩な API が用意されていて Python や Scala などの言語から簡単に呼び出すことができます.
Docker Setup
まず,下記のイメージをダウンロードして,ビルドを行います.
$ docker pull jupyter/pyspark-notebook
$ docker run --name spark_test -it -p 8888:8888 -v $PWD:/home/jovyan/work jupyter/pyspark-notebook
上記,表示される URL にアクセスすればノートを開くことができます.
※ イメージのサイズが 3GB あるので一応気をつけてください
Python
ライブラリ
import pandas as pd
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('pandasToSparkDF').getOrCreate()
from pyspark.sql.functions import mean, col, split, regexp_extract, when, lit
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorAssembler, QuantileDiscretizer
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
データ読み込み
# データ読み込み
titanic_df = spark.read.csv('./titanic/train.csv', header='True', inferSchema='True')
基本的なデータ処理
# 欠損値対応
titanic_df = titanic_df.na.fill({"Embarked" : 'S'})
# 不要のカラムを落とす
titanic_df = titanic_df.drop("Cabin")
# 定数による列追加
titanic_df = titanic_df.withColumn('Alone', lit(0))
# 条件による値の挿入
titanic_df = titanic_df.withColumn("Alone", when(titanic_df["Family_Size"] == 0, 1).otherwise(titanic_df["Alone"]))
# ラベルエンコード
indexers = [StringIndexer(inputCol=column, outputCol=column+"_index").fit(titanic_df) for column in ["Sex", "Embarked", "Initial"]]
pipeline = Pipeline(stages=indexers)
titanic_df = pipeline.fit(titanic_df).transform(titanic_df)
# テストスプリット
trainingData, testData = feature_vector.randomSplit([0.8, 0.2], seed=9)
他にもいろいろな処理が有りますが,いったんめぼしいものだけ置いておきます.
より詳しいデータ処理に関しては下記をご覧ください.
https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/5722190290795989/3865595167034368/8175309257345795/latest.html
学習
# 学習
lr = LogisticRegression(labelCol="Survived", featuresCol="features")
lrModel = lr.fit(trainingData)
# 推論
lr_prediction = lrModel.transform(testData)
lr_prediction.select("prediction", "Survived", "features").show()
# 評価
evaluator = MulticlassClassificationEvaluator(labelCol="Survived", predictionCol="prediction", metricName="accuracy")
lr_accuracy = evaluator.evaluate(lr_prediction)
print("Accuracy of LogisticRegression is = %g"% (lr_accuracy))
print("Test Error of LogisticRegression = %g " % (1.0 - lr_accuracy))
他にも下記のモデルをライブラリとして使用できます.
- LogisticRegression
- DecisionTreeClassifier
- RandomForestClassifier
- Gradient-boosted tree classifier
- NaiveBayes
- Support Vector Machine
感想
ローカルだと遅い!
当たり前ですけど,大規模データを処理してこその分散処理のため,この程度のデータでは恩恵は全くありませんでした.
大きすぎるデータに出会った時に速度や精度の比較を行って行けたらと思います.
今回の学び
- Docker は便利
- 分散フレームワーク体系の整理
- PySpark ってこう動かんだ…