3
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

PySpark学習記録② Kaggle タイタニックコンペをPySpark縛りでやってみた

Last updated at Posted at 2020-12-29

はじめに

 PySparkでのデータ加工や機械学習の実装練習として、KaggleタイタニックコンペをPySpark縛りでやってみました。「PandasやScikit-learnでよくやるこの処理、PySparkだとどうする?」をテーマに、比較しながら書いていこうと思います。

実行環境

 DockerでJupyter/pyspark-notebook:latestのimageをpullしそのまま使っています(Python 3.8.6, Spark 3.0.1)。詳しくは前記事をご覧ください。

データ読み込み〜EDA

おまじない。

# Session起動
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("titanic").getOrCreate()

データ読み込み(csv)

# csvデータ読み込み
train = spark.read.csv('./data/train.csv', header=True, inferSchema=True)
test = spark.read.csv('./data/test.csv', header=True, inferSchema=True)

データ書き出し(parquet)

# DataFrameをParquet形式に書き出す
train.write.parquet('./data/train_parquet')
test.write.parquet('./data/test_parquet')

※出力先のディレクトリのpathが既に存在するとエラーになる

# データ再読み込み
train = spark.read.parquet('./data/train_parquet', header=True, inferSchema=True)
test = spark.read.parquet('./data/test_parquet', header=True, inferSchema=True)

shape(行数, 列数)の確認

 Pandasでの.shapeに相当するものはない。行数は.count()で取得。列数は.columnsでカラム名一覧をリストとして取得できるので、それをlen()で数える。

# shapeの確認
train_shape = (train.count(), len(train.columns))
test_shape = (test.count(), len(test.columns))
print('train:',train_shape)
print('test:',test_shape)

>>>
train: (891, 12)
test: (418, 11)

スキーマの確認

  各カラムのデータ型などを確認する。データ型はPythonのものでなくScalaのもので扱われている。

# スキーマの確認
train.printSchema()

>>>
root
 |-- PassengerId: integer (nullable = true)
 |-- Survived: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)

データを数件表示して中身を確認

 データを数行表示させて概要を掴む、Pandasでの.head()に相当するもの。PySparkの場合には.show()でDataFrame自体を表示させる方法か、head()でRowオブジェクト単位で表示させ方法がある。カラム数が多かったり、長めのテキストデータを含む場合などは後者の方が見やすいように思う。

# データを5件表示
train.show(5)

>>>
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25| null|       S|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|  C85|       C|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925| null|       S|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          113803|   53.1| C123|       S|
|          5|       0|     3|Allen, Mr. Willia...|  male|35.0|    0|    0|          373450|   8.05| null|       S|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
only showing top 5 rows
# データを5件表示
train.head(5)

>>>
[Row(PassengerId=1, Survived=0, Pclass=3, Name='Braund, Mr. Owen Harris', Sex='male', Age=22.0, SibSp=1, Parch=0, Ticket='A/5 21171', Fare=7.25, Cabin=None, Embarked='S'),
 Row(PassengerId=2, Survived=1, Pclass=1, Name='Cumings, Mrs. John Bradley (Florence Briggs Thayer)', Sex='female', Age=38.0, SibSp=1, Parch=0, Ticket='PC 17599', Fare=71.2833, Cabin='C85', Embarked='C'),
 Row(PassengerId=3, Survived=1, Pclass=3, Name='Heikkinen, Miss. Laina', Sex='female', Age=26.0, SibSp=0, Parch=0, Ticket='STON/O2. 3101282', Fare=7.925, Cabin=None, Embarked='S'),
 Row(PassengerId=4, Survived=1, Pclass=1, Name='Futrelle, Mrs. Jacques Heath (Lily May Peel)', Sex='female', Age=35.0, SibSp=1, Parch=0, Ticket='113803', Fare=53.1, Cabin='C123', Embarked='S'),
 Row(PassengerId=5, Survived=0, Pclass=3, Name='Allen, Mr. William Henry', Sex='male', Age=35.0, SibSp=0, Parch=0, Ticket='373450', Fare=8.05, Cabin=None, Embarked='S')]

統計量一覧

 Pandasでもよくやる.describe()。PySparkでも同様だが、表示するためには.show()が必要。Pandasの場合デフォルトでは数値変数のみ表示されるが、PySparkではカテゴリ変数も表示される。

# 統計量一覧を表示
train.describe().show()

>>>
+-------+------------------+-------------------+------------------+--------------------+------+-----------------+------------------+-------------------+------------------+-----------------+-----+--------+
|summary|       PassengerId|           Survived|            Pclass|                Name|   Sex|              Age|             SibSp|              Parch|            Ticket|             Fare|Cabin|Embarked|
+-------+------------------+-------------------+------------------+--------------------+------+-----------------+------------------+-------------------+------------------+-----------------+-----+--------+
|  count|               889|                889|               889|                 889|   889|              712|               889|                889|               889|              889|  202|     889|
|   mean|             446.0|0.38245219347581555|2.3115860517435323|                null|  null|29.64209269662921|0.5241844769403825|0.38245219347581555| 260763.9104704097|32.09668087739029| null|    null|
| stddev|256.99817277718313|0.48625968831477334|0.8346997785705753|                null|  null|14.49293290032352| 1.103704875596923| 0.8067607445174785|472255.95121695305|49.69750431670795| null|    null|
|    min|                 1|                  0|                 1|"Andersson, Mr. A...|female|             0.42|                 0|                  0|            110152|              0.0|  A10|       C|
|    max|               891|                  1|                 3|van Melkebeke, Mr...|  male|             80.0|                 8|                  6|         WE/P 5735|         512.3292|    T|       S|
+-------+------------------+-------------------+------------------+--------------------+------+-----------------+------------------+-------------------+------------------+-----------------+-----+--------+

欠損値の確認

 .describe()のcount行で確認するのが手っ取り早いように思う。一応カラムごとに.isNull()isNotNull()でフィルターをかけて確認することもできる。

# 欠損値確認
# train.describe().head()でも同様の結果
train.describe().collect()[0]

>>>
Row(summary='count', PassengerId='891', Survived='891', Pclass='891', Name='891', Sex='891', Age='714', SibSp='891', Parch='891', Ticket='891', Fare='891', Cabin='204', Embarked='889')
# 'Age'列の欠損値数を取得
train.filter(train['Age'].isNull()).count()

>>>
177
# 'Age'列の非欠損値数を取得
train.filter(train['Age'].isNotNull()).count()

>>>
714

カテゴリカル変数と数値変数のカラムを分ける

 .dtypesで、各カラムに対して(カラム名, データ型)が入ったタプルのリストを取得できるため、これを利用する。

train.dtypes

>>>
[('PassengerId', 'int'),
 ('Survived', 'int'),
 ('Pclass', 'int'),
 ('Name', 'string'),
 ('Sex', 'string'),
 ('Age', 'double'),
 ('SibSp', 'int'),
 ('Parch', 'int'),
 ('Ticket', 'string'),
 ('Fare', 'double'),
 ('Cabin', 'string'),
 ('Embarked', 'string')]

 今回はデータ型が'double'のものを数値変数、'string'または'int'のものをカテゴリ変数として扱うことにし、リスト内包表記で以下のように分けた。

# カテゴリカル変数のカラムと数値変数のカラムを分ける
num_cols = [dtype[0] for dtype in train.dtypes if dtype[1] == 'double']
cat_cols = [dtype[0] for dtype in train.dtypes if dtype[1] != 'double']

# 目的変数の'Survived'だけ除いておく
cat_cols.remove('Survived')

print('カテゴリカル変数: {}'.format(cat_cols))
print('数値変数:{}'.format(num_cols))


>>>
カテゴリカル変数 ['PassengerId', 'Pclass', 'Name', 'Sex', 'SibSp', 'Parch', 'Ticket', 'Cabin', 'Embarked']
数値変数['Age', 'Fare']

各カラムのユニーク数を調べる

 Pandasでの.nunique()に相当するもの。特徴量選択やカテゴリ変数の変換の際に参考にすることもあるかと。PySparkでは実装されていないため、やるなら以下のようにfor文で回す。(分散処理とfor文はあまり相性はよくないらしいが...)

# カテゴリ変数のユニーク数を調べる
nunique = {}

for col in cat_cols:
    value = train.select(col).distinct().count()
    nunique[col] = value

print(nunique)

>>>
{'PassengerId': 891,
 'Pclass': 3,
 'Name': 891,
 'Sex': 2,
 'SibSp': 7,
 'Parch': 7,
 'Ticket': 681,
 'Cabin': 148,
 'Embarked': 4}

カテゴリ変数の分布を調べる

 分類タスクの場合に不均衡データかどうかを確認したいときなど。

# 'Sex'の比率を確認する
train.groupBy('Sex').count().show()

>>>
+------+-----+
|   Sex|count|
+------+-----+
|female|  314|
|  male|  577|
+------+-----+

ピボットテーブルを作成

 .groupBy().pivot()を使って実装する。今回の例だと、(ものすごく有名だが)性別によって生死の割合が大きく異なっていることがわかる。

# ピボットテーブルを作る
train.groupBy('Survived').pivot("Sex").count().show()

>>>
+--------+------+----+
|Survived|female|male|
+--------+------+----+
|       1|   233| 109|
|       0|    81| 468|
+--------+------+----+

 下のように、表示する要素を絞ったり、数値変数の統計量を表示することもできる。

# 'Survived', 'Embarked'ごとの'Age'の平均値を調べる
# 'Embarked'に関しては'C','Q','S'に限定
train.groupBy('Survived').pivot('Embarked', ['C', 'Q', 'S']).mean('Age').show()

>>>
+--------+------------------+------+------------------+
|Survived|                 C|     Q|                 S|
+--------+------------------+------+------------------+
|       1| 28.97367088607595|  22.5| 28.11318407960199|
|       0|33.666666666666664|30.325|30.203966005665723|
+--------+------------------+------+------------------+

相関係数(相関行列)を調べる

 DataFrameに対し.corr()メソッドを使う。また、Spark MLのstatモジュールにあるCorrelationを使えば、相関行列も取得可能。

# 'Age'と'Fare'の相関係数
train.corr('Age', 'Fare')

>>>
0.135515853527051

前処理

訓練データと評価データに分割

 Scikit-learnのtrain_test_splitに相当するもの。PySparkの場合はDataFrameのメソッドとして実装されている。

# trainデータを7:3に分割
df_train, df_valid = train.randomSplit([0.7,0.3], seed=2020)

print('df_train: {} rows'.format(df_train.count()))
print('df_valid:  {} rows'.format(df_valid.count()))

>>>
df_train: 651 rows
df_valid: 240 rows

不要カラムの削除

 PySparkの場合、モデルに突っ込む前に改めて使用するカラムを一つのカラムにまとめる作業があるため、必ずしもここで不要カラムを削除する必要はないが、一応以下のようにできる。

# 不要なカラムを落とす
df_train = df_train.drop('PassengerId', 'Name', 'Cabin', 'Ticket')
df_valid = df_valid.drop('PassengerId', 'Name', 'Cabin', 'Ticket')
df_train.columns

>>>
['Survived', 'Pclass', 'Sex', 'Age', 'SibSp', 'Parch', 'Fare', 'Embarked']

欠損値処理(削除・補完)

 dropna()で欠損値を含む行を削除できる。引数は3つあり、howでは'any'か'all'を選択(デフォルト'any')。threshではhow='any'の時にいくつ以上欠損があったら削除するか指定(int, デフォルトでは'None')。subsetでは対象カラムを選択する(デフォルトでは'None')。

# 'Embarked' = null のデータを削除する
df_train = df_train.dropna(how='any', subset=['Embarked'])
df_valid = df_valid.dropna(how='any', subset=['Embarked'])

df_train.describe().show()

>>>
+-------+-------------------+------------------+------+-----------------+------------------+-------------------+-----------------+--------+
|summary|           Survived|            Pclass|   Sex|              Age|             SibSp|              Parch|             Fare|Embarked|
+-------+-------------------+------------------+------+-----------------+------------------+-------------------+-----------------+--------+
|  count|                889|               889|   889|              712|               889|                889|              889|     889|
|   mean|0.38245219347581555|2.3115860517435323|  null|29.64209269662921|0.5241844769403825|0.38245219347581555|32.09668087739029|    null|
| stddev|0.48625968831477334|0.8346997785705753|  null|14.49293290032352| 1.103704875596923| 0.8067607445174785|49.69750431670795|    null|
|    min|                  0|                 1|female|             0.42|                 0|                  0|              0.0|       C|
|    max|                  1|                 3|  male|             80.0|                 8|                  6|         512.3292|       S|
+-------+-------------------+------------------+------+-----------------+------------------+-------------------+-----------------+--------+

 平均値または中央値で補完する場合は、Spark MLのImputerが使える。一番右に新しく'Age_imputed'というカラムが追加されていることが確認できる。

# 'Age'カラムを中央値で補完する
from pyspark.ml.feature import Imputer

imputer = Imputer(
    strategy='median',
    inputCol='Age', 
    outputCol='Age_imputed'
)

model = imputer.fit(df_train)
model.transform(df_train)
model.transform(df_valid)

# 確認
df_train.describe().show()

>>>
+-------+-------------------+------------------+------+------------------+------------------+------------------+------------------+--------+------------------+
|summary|           Survived|            Pclass|   Sex|               Age|             SibSp|             Parch|              Fare|Embarked|       Age_imputed|
+-------+-------------------+------------------+------+------------------+------------------+------------------+------------------+--------+------------------+
|  count|                649|               649|   649|               521|               649|               649|               649|     649|               649|
|   mean|  0.386748844375963| 2.295839753466872|  null|29.335259117082533|0.5469953775038521|0.3913713405238829|31.948169645608576|    null|29.071910631741137|
| stddev|0.48738094472424587|0.8418076223501735|  null| 14.67636802626401|   1.1130653931477|0.7940671982196961|  46.4778648584037|    null| 13.15793243066804|
|    min|                  0|                 1|female|              0.42|                 0|                 0|               0.0|       C|              0.42|
|    max|                  1|                 3|  male|              80.0|                 8|                 6|          512.3292|       S|              80.0|
+-------+-------------------+------------------+------+------------------+------------------+------------------+------------------+--------+------------------+

数値変数の標準化

 Scikit-learn同様、Spark MLにもStandardScalerが実装されている。ただし、Spark MLの場合は1カラムずつしか扱うことができないため、複数カラムを同時に標準化したい場合は、先にVectorAssemblerで処理したいカラムを1カラムにまとめてから、標準化する必要がある。

# 'Age_imputed'と'Fare'をまとめた'num_cols'列を作成
from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(
    inputCols = ['Age_imputed', 'Fare'],
    outputCol='num_cols'
)
df_train = assembler.transform(df_train)
df_valid = assembler.transform(df_valid)
# 標準化
from pyspark.ml.feature import StandardScaler

sc = StandardScaler(inputCol='num_cols', outputCol='num_cols_scaled')
sc_model = sc.fit(df_train)
df_train = sc_model.transform(df_train)
df_valid = sc_model.transform(df_valid)

# 確認
df_train.select(['Age_imputed', 'Fare', 'num_cols', 'num_cols_scaled']).show(5)

>>>
+-----------+-------+--------------+--------------------+
|Age_imputed|   Fare|      num_cols|     num_cols_scaled|
+-----------+-------+--------------+--------------------+
|       22.0|   7.25|   [22.0,7.25]|[1.67199521018387...|
|       38.0|71.2833|[38.0,71.2833]|[2.88799172668123...|
|       26.0|  7.925|  [26.0,7.925]|[1.97599433930821...|
|       35.0|   53.1|   [35.0,53.1]|[2.65999237983797...|
|       35.0|   8.05|   [35.0,8.05]|[2.65999237983797...|
+-----------+-------+--------------+--------------------+
only showing top 5 rows

カテゴリ変数のOne-Hotエンコーディング

 One-Hotエンコーディングも同様に、Spark MLに実装されている。StandardScalerと違い複数カラムをいっぺんに処理することができるが、数値しか扱うことができない。そのため、文字列が入っているカラムは一度StringIndexserで数値を振ってから(つまりLabelエンコーディングをしてから)、OneHotEncoderで処理する必要がある。

# 'Sex', 'Embarked'を数値に変換
from pyspark.ml.feature import StringIndexer

idx_input = ['Sex', 'Embarked']
idx_output = [col + '_indexed' for col in idx_input]

indexer = StringIndexer(
    inputCols=idx_input, 
    outputCols=idx_output
)
idx_model = indexer.fit(df_train)
df_train = idx_model.transform(df_train)
df_valid = idx_model.transform(df_valid)

# 確認
df_train.select(['Sex', 'Sex_indexed', 'Embarked', 'Embarked_indexed']).show(5)

>>>
+------+-----------+--------+----------------+
|   Sex|Sex_indexed|Embarked|Embarked_indexed|
+------+-----------+--------+----------------+
|  male|        0.0|       S|             0.0|
|female|        1.0|       C|             1.0|
|female|        1.0|       S|             0.0|
|female|        1.0|       S|             0.0|
|  male|        0.0|       S|             0.0|
+------+-----------+--------+----------------+
only showing top 5 rows
# One-Hotエンコーディング 
from pyspark.ml.feature import OneHotEncoder

ohe_input = ['Pclass', 'Sex_indexed', 'SibSp', 'Parch', 'Embarked_indexed']
ohe_output = [col + '_encoded' for col in ohe_input]

ohe = OneHotEncoder(
    inputCols=ohe_input,
    outputCols=ohe_output,
    dropLast=True
)
ohe_model = ohe.fit(df_train)
df_train = ohe_model.transform(df_train)
df_valid = ohe_model.transform(df_valid)

# 'Embarked'を確認
df_train['Embarked', 'Embarked_indexed', 'Embarked_indexed_encoded'].show(10)

>>>
+--------+----------------+------------------------+
|Embarked|Embarked_indexed|Embarked_indexed_encoded|
+--------+----------------+------------------------+
|       S|             0.0|           (2,[0],[1.0])|
|       C|             1.0|           (2,[1],[1.0])|
|       S|             0.0|           (2,[0],[1.0])|
|       S|             0.0|           (2,[0],[1.0])|
|       S|             0.0|           (2,[0],[1.0])|
|       Q|             2.0|               (2,[],[])|
|       S|             0.0|           (2,[0],[1.0])|
|       S|             0.0|           (2,[0],[1.0])|
|       C|             1.0|           (2,[1],[1.0])|
|       S|             0.0|           (2,[0],[1.0])|
+--------+----------------+------------------------+

 このように、One-Hotエンコーディング後はSparseデータとなる。

学習に使用するカラムをまとめる

 Scikit-learnと異なる点の一つとして、PySparkではモデルの学習に使用する特徴量を1つのカラムにまとめる必要がある。標準化のところでも使用したVectorAssemblerを使い、カラム名は'features'とするのが一般的らしい。

# 特徴量をまとめる
assembler2 = VectorAssembler(
    inputCols=['Pclass_encoded',
               'Sex_indexed_encoded',
               'SibSp_encoded',
               'Parch_encoded',
               'Embarked_indexed_encoded',
               'num_cols_scaled'],
    outputCol='features'
)

df_train = assembler2.transform(df_train)
df_valid = assembler2.transform(df_valid)

df_train.select(['Survived', 'features']).show(5)

>>>
+--------+--------------------+
|Survived|            features|
+--------+--------------------+
|       0|(22,[3,5,12,18,20...|
|       1|(22,[1,5,12,19,20...|
|       1|(22,[4,12,18,20,2...|
|       1|(22,[1,5,12,18,20...|
|       0|(22,[3,4,12,18,20...|
+--------+--------------------+
only showing top 5 rows

 ちなみに、今回は元々カテゴリ変数がSparseデータ、数値変数がDenseデータ(普通のデータ)となっていましたが、これらをまとめると自動でSpaeseデータになるみたいです。

df_train.select(['features']).head(2)

>>>
# データ構造の確認
[Row(features=SparseVector(22, {3: 1.0, 5: 1.0, 12: 1.0, 18: 1.0, 20: 1.672, 21: 0.156})),
 Row(features=SparseVector(22, {1: 1.0, 5: 1.0, 12: 1.0, 19: 1.0, 20: 2.888, 21: 1.5337}))]

モデル学習+評価

ロジスティック回帰

 今回はシンプルにロジスティック回帰を使います。他にも決定木系やSVCなど、基本的なものはほとんどSpark MLに実装されています。

# ロジスティック回帰
from pyspark.ml.classification import LogisticRegression

lr = LogisticRegression(labelCol='Survived')
lr_model = lr.fit(df_train)

 正解ラベルが存在するものに関しては、モデルに対して.evaluation()で結果のSummaryオブジェクトを取得でき、これに対して.predictionsで内容を確認することができます。

# 推論結果の取得
train_result = lr_model.evaluate(df_train)
valid_result = lr_model.evaluate(df_valid)

# 確認
valid_result.predictions.select(['Survived', 'rawPrediction', 'probability', 'prediction']).show()

>>>
+--------+--------------------+--------------------+----------+
|Survived|       rawPrediction|         probability|prediction|
+--------+--------------------+--------------------+----------+
|       1|[-0.0107443246616...|[0.49731394467449...|       1.0|
|       0|[2.10818940159344...|[0.89169660088758...|       0.0|
|       0|[2.71630875457920...|[0.93798215479564...|       0.0|
|       1|[-0.2429596953280...|[0.43955710975950...|       1.0|
|       0|[1.81502375560081...|[0.85996794511927...|       0.0|
+--------+--------------------+--------------------+----------+
only showing top 5 rows

AUCスコアの算出

 モデル評価にはevaluationモジュール内のクラスを使う。今回のように2値分類でのAUCスコアの算出には、BinaryClassificationEvaluatorを使う。

# モデル評価(AUC)
from pyspark.ml.evaluation import BinaryClassificationEvaluator

evaluater = BinaryClassificationEvaluator(
    rawPredictionCol='prediction',
    labelCol='Survived',
    metricName='areaUnderROC'  # デフォルトでAUCスコアになっている
)

auc_train = evaluater.evaluate(train_result.predictions)
auc_valid = evaluater.evaluate(valid_result.predictions)

# 確認
print('AUCスコア')
print('train:', auc_train)
print('valid:', auc_valid)

>>>
AUCスコア
train: 0.7889497287232977
valid: 0.8065704293474217

 'f1', 'recall', 'precision'などを使いたい時は、MulticlassClassificationEvaluatorの方を使う必要がある。

# モデル評価(F1)
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluater = MulticlassClassificationEvaluator(
    predictionCol='prediction',
    labelCol='Survived',
    metricName='f1'
)

f1_train = evaluater.evaluate(train_result.predictions)
f1_valid = evaluater.evaluate(valid_result.predictions)

print('f1スコア')
print('train:', f1_train)
print('valid:', f1_valid)

>>>
f1スコア
train: 0.8032854491383006
valid: 0.8270897150503879

テストデータ(未知データ)の推論

欠損値補完(最頻値)

 未知データに対して、欠損データを削除するのは好ましくないため、ここでは'Embarked'を最頻値の'S'で埋める。

# 'Embarked' = null のデータを最頻値で補完する
train = train.fillna({'Embarked': 'S'})
test = test.fillna({'Embarked': 'S'})

###パイプラインで処理をまとめる
 モデル評価までで使用したプロセスをPipelineにまとめる。基本的にはScikit-learnのPipelineと同様のものと考えて良さそう。

# パイプラインの設定
from pyspark.ml import Pipeline, PipelineModel

stages=[
    imputer,     #'Age'の欠損値補完
    assembler,   #'Age' と 'Fare' を1つのベクトルに
    sc,          #'Age' と 'Fare' を標準化
    indexer,     #'Sex' と 'Embarked' を数値化
    ohe,         #OneHotEncodeing
    assembler2,  #使用する特徴量をまとめる
    lr           #ロジスティック回帰
] 

pipeline = Pipeline(stages=stages)
# モデル学習
pipeline_model = pipeline.fit(train)

# モデルの保存
model_path = './model/lr_base'
pipeline_model.save(model_path)
# モデル再読み込み
pipeline_model = PipelineModel.load(model_path)

# 推論
train_result = pipeline_model.transform(train)
test_result = pipeline_model.transform(test)
# 予測結果の表示
submission = test_result.withColumn('Survived', test_result['prediction'].cast('int'))
submission = submission.select('PassengerId', 'Survived')
submission.show()

>>>
+-----------+--------+
|PassengerId|Survived|
+-----------+--------+
|        892|       0|
|        893|       0|
|        894|       0|
|        895|       0|
|        896|       1|
|        897|       0|
|        898|       1|
|        899|       0|
|        900|       1|
|        901|       0|
|        902|       0|
|        903|       0|
|        904|       1|
|        905|       0|
|        906|       1|
|        907|       1|
|        908|       0|
|        909|       0|
|        910|       1|
|        911|       1|
+-----------+--------+
only showing top 20 rows

おわりに

 Titanicコンペを題材に、PySparkで基礎的な機械学習手法を実装してみました。Spark MLはScikit-learnの影響を強く受けているらしく似ている部分も多かったですが、なぜか標準化を(そのまま使うと)1カラムずつしかできなかったり、One-Hotの前にLabelエンコーディングをする必要があったりと、ややクセがあるように感じました。今後はSpark MLだけでなくSparkの外部パッケージなども使ってみたいと思います。

参考

https://data-analysis-stats.jp/spark/pyspark%E3%81%A7%E6%AC%A0%E6%90%8D%E5%80%A4null%E3%81%AE%E5%8F%96%E3%82%8A%E6%89%B1%E3%81%84%E6%96%B9%E6%B3%95/
https://qiita.com/calderarie/items/d37462d7eafef04891b8
https://qiita.com/gsy0911/items/a4cb8b2d54d6341558e0#window%E9%96%A2%E6%95%B0
PySpark公式ドキュメント

3
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
3
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?