Help us understand the problem. What is going on with this article?

Docker で始める PySpark 生活

はじめに

「Spark?雷属性でかっこいい!」という煩悩のもと,Docker + Spark + Jupyter Notebook で簡単な機械学習を回して見ました.
お馴染みのタイタニックデータを使い,線形回帰による予測を行いました.

そもそも Spark とは

download.png

Spark とは分散処理ライブラリの一つです.
分散処理といえば Hadoop という方も多いと思いますが,自分の理解だと Spark は Hadoop の欠点を補ったライブラリになります.Hadoop が登場したのが 2006 年,その後,2014 年に Hadoop が登場しました.

Hadoop VS Spark

上記,Spark は Hadoop の欠点を補ったとありますが,双方にメリット・デメリットがあるため簡単に表にまとめます.

メリット デメリット
Hadoop 大量のデータを扱うことができる ストレージアクセスがあるためリアルタイムの処理が苦手
Spark オンメモリでの処理によりリアルタイムでの処理が得意 Hadoopほど大規模なデータは扱えない

つまり,大きすぎるデータはHadoop,リアルタイムでの処理がしたかったらSparkを用いると良いでしょう.

また,Hadoopのクエリエンジンは Presto,Hive ですが,Spark には多彩な API が用意されていて Python や Scala などの言語から簡単に呼び出すことができます.

Docker Setup

まず,下記のイメージをダウンロードして,ビルドを行います.

$ docker pull jupyter/pyspark-notebook

$ docker run --name spark_test -it -p 8888:8888 -v $PWD:/home/jovyan/work jupyter/pyspark-notebook

上記,表示される URL にアクセスすればノートを開くことができます.

※ イメージのサイズが 3GB あるので一応気をつけてください

Python

ライブラリ

import pandas as pd

import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('pandasToSparkDF').getOrCreate()

from pyspark.sql.functions import mean, col, split, regexp_extract, when, lit

from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorAssembler, QuantileDiscretizer
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

データ読み込み

# データ読み込み
titanic_df = spark.read.csv('./titanic/train.csv', header='True', inferSchema='True')

基本的なデータ処理

# 欠損値対応
titanic_df = titanic_df.na.fill({"Embarked" : 'S'})

# 不要のカラムを落とす
titanic_df = titanic_df.drop("Cabin")

# 定数による列追加
titanic_df = titanic_df.withColumn('Alone', lit(0))

# 条件による値の挿入
titanic_df = titanic_df.withColumn("Alone", when(titanic_df["Family_Size"] == 0, 1).otherwise(titanic_df["Alone"]))

# ラベルエンコード
indexers = [StringIndexer(inputCol=column, outputCol=column+"_index").fit(titanic_df) for column in ["Sex", "Embarked", "Initial"]]
pipeline = Pipeline(stages=indexers)
titanic_df = pipeline.fit(titanic_df).transform(titanic_df)

# テストスプリット
trainingData, testData = feature_vector.randomSplit([0.8, 0.2], seed=9)

他にもいろいろな処理が有りますが,いったんめぼしいものだけ置いておきます.
より詳しいデータ処理に関しては下記をご覧ください.
https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/5722190290795989/3865595167034368/8175309257345795/latest.html

学習

# 学習
lr = LogisticRegression(labelCol="Survived", featuresCol="features")
lrModel = lr.fit(trainingData)

# 推論
lr_prediction = lrModel.transform(testData)
lr_prediction.select("prediction", "Survived", "features").show()

# 評価
evaluator = MulticlassClassificationEvaluator(labelCol="Survived", predictionCol="prediction", metricName="accuracy")
lr_accuracy = evaluator.evaluate(lr_prediction)

print("Accuracy of LogisticRegression is = %g"% (lr_accuracy))
print("Test Error of LogisticRegression = %g " % (1.0 - lr_accuracy))

他にも下記のモデルをライブラリとして使用できます.

  • LogisticRegression
  • DecisionTreeClassifier
  • RandomForestClassifier
  • Gradient-boosted tree classifier
  • NaiveBayes
  • Support Vector Machine

感想

ローカルだと遅い!

当たり前ですけど,大規模データを処理してこその分散処理のため,この程度のデータでは恩恵は全くありませんでした.
大きすぎるデータに出会った時に速度や精度の比較を行って行けたらと思います.

今回の学び

  • Docker は便利
  • 分散フレームワーク体系の整理
  • PySpark ってこう動かんだ…
tsuchiyaTaro
Data Scientist / AI Engineer
eaglys
"EAGLYSは、未だ活用しきれていない企業に眠るデータ資産を、 安全にデータ分析・AI構築・運用するサポートを行っています。"
https://eaglys.co.jp
Why not register and get more from Qiita?
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away