はじめに
PySparkでのデータ加工や機械学習の実装練習として、KaggleタイタニックコンペをPySpark縛りでやってみました。「PandasやScikit-learnでよくやるこの処理、PySparkだとどうする?」をテーマに、比較しながら書いていこうと思います。
実行環境
DockerでJupyter/pyspark-notebook:latestのimageをpullしそのまま使っています(Python 3.8.6, Spark 3.0.1)。詳しくは前記事をご覧ください。
データ読み込み〜EDA
おまじない。
# Session起動
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("titanic").getOrCreate()
データ読み込み(csv)
# csvデータ読み込み
train = spark.read.csv('./data/train.csv', header=True, inferSchema=True)
test = spark.read.csv('./data/test.csv', header=True, inferSchema=True)
データ書き出し(parquet)
# DataFrameをParquet形式に書き出す
train.write.parquet('./data/train_parquet')
test.write.parquet('./data/test_parquet')
※出力先のディレクトリのpathが既に存在するとエラーになる
# データ再読み込み
train = spark.read.parquet('./data/train_parquet', header=True, inferSchema=True)
test = spark.read.parquet('./data/test_parquet', header=True, inferSchema=True)
shape(行数, 列数)の確認
Pandasでの.shape
に相当するものはない。行数は.count()
で取得。列数は.columns
でカラム名一覧をリストとして取得できるので、それをlen()
で数える。
# shapeの確認
train_shape = (train.count(), len(train.columns))
test_shape = (test.count(), len(test.columns))
print('train:',train_shape)
print('test:',test_shape)
>>>
train: (891, 12)
test: (418, 11)
スキーマの確認
各カラムのデータ型などを確認する。データ型はPythonのものでなくScalaのもので扱われている。
# スキーマの確認
train.printSchema()
>>>
root
|-- PassengerId: integer (nullable = true)
|-- Survived: integer (nullable = true)
|-- Pclass: integer (nullable = true)
|-- Name: string (nullable = true)
|-- Sex: string (nullable = true)
|-- Age: double (nullable = true)
|-- SibSp: integer (nullable = true)
|-- Parch: integer (nullable = true)
|-- Ticket: string (nullable = true)
|-- Fare: double (nullable = true)
|-- Cabin: string (nullable = true)
|-- Embarked: string (nullable = true)
データを数件表示して中身を確認
データを数行表示させて概要を掴む、Pandasでの.head()
に相当するもの。PySparkの場合には.show()
でDataFrame自体を表示させる方法か、head()
でRowオブジェクト単位で表示させ方法がある。カラム数が多かったり、長めのテキストデータを含む場合などは後者の方が見やすいように思う。
# データを5件表示
train.show(5)
>>>
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass| Name| Sex| Age|SibSp|Parch| Ticket| Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
| 1| 0| 3|Braund, Mr. Owen ...| male|22.0| 1| 0| A/5 21171| 7.25| null| S|
| 2| 1| 1|Cumings, Mrs. Joh...|female|38.0| 1| 0| PC 17599|71.2833| C85| C|
| 3| 1| 3|Heikkinen, Miss. ...|female|26.0| 0| 0|STON/O2. 3101282| 7.925| null| S|
| 4| 1| 1|Futrelle, Mrs. Ja...|female|35.0| 1| 0| 113803| 53.1| C123| S|
| 5| 0| 3|Allen, Mr. Willia...| male|35.0| 0| 0| 373450| 8.05| null| S|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
only showing top 5 rows
# データを5件表示
train.head(5)
>>>
[Row(PassengerId=1, Survived=0, Pclass=3, Name='Braund, Mr. Owen Harris', Sex='male', Age=22.0, SibSp=1, Parch=0, Ticket='A/5 21171', Fare=7.25, Cabin=None, Embarked='S'),
Row(PassengerId=2, Survived=1, Pclass=1, Name='Cumings, Mrs. John Bradley (Florence Briggs Thayer)', Sex='female', Age=38.0, SibSp=1, Parch=0, Ticket='PC 17599', Fare=71.2833, Cabin='C85', Embarked='C'),
Row(PassengerId=3, Survived=1, Pclass=3, Name='Heikkinen, Miss. Laina', Sex='female', Age=26.0, SibSp=0, Parch=0, Ticket='STON/O2. 3101282', Fare=7.925, Cabin=None, Embarked='S'),
Row(PassengerId=4, Survived=1, Pclass=1, Name='Futrelle, Mrs. Jacques Heath (Lily May Peel)', Sex='female', Age=35.0, SibSp=1, Parch=0, Ticket='113803', Fare=53.1, Cabin='C123', Embarked='S'),
Row(PassengerId=5, Survived=0, Pclass=3, Name='Allen, Mr. William Henry', Sex='male', Age=35.0, SibSp=0, Parch=0, Ticket='373450', Fare=8.05, Cabin=None, Embarked='S')]
統計量一覧
Pandasでもよくやる.describe()
。PySparkでも同様だが、表示するためには.show()
が必要。Pandasの場合デフォルトでは数値変数のみ表示されるが、PySparkではカテゴリ変数も表示される。
# 統計量一覧を表示
train.describe().show()
>>>
+-------+------------------+-------------------+------------------+--------------------+------+-----------------+------------------+-------------------+------------------+-----------------+-----+--------+
|summary| PassengerId| Survived| Pclass| Name| Sex| Age| SibSp| Parch| Ticket| Fare|Cabin|Embarked|
+-------+------------------+-------------------+------------------+--------------------+------+-----------------+------------------+-------------------+------------------+-----------------+-----+--------+
| count| 889| 889| 889| 889| 889| 712| 889| 889| 889| 889| 202| 889|
| mean| 446.0|0.38245219347581555|2.3115860517435323| null| null|29.64209269662921|0.5241844769403825|0.38245219347581555| 260763.9104704097|32.09668087739029| null| null|
| stddev|256.99817277718313|0.48625968831477334|0.8346997785705753| null| null|14.49293290032352| 1.103704875596923| 0.8067607445174785|472255.95121695305|49.69750431670795| null| null|
| min| 1| 0| 1|"Andersson, Mr. A...|female| 0.42| 0| 0| 110152| 0.0| A10| C|
| max| 891| 1| 3|van Melkebeke, Mr...| male| 80.0| 8| 6| WE/P 5735| 512.3292| T| S|
+-------+------------------+-------------------+------------------+--------------------+------+-----------------+------------------+-------------------+------------------+-----------------+-----+--------+
欠損値の確認
.describe()
のcount行で確認するのが手っ取り早いように思う。一応カラムごとに.isNull()
やisNotNull()
でフィルターをかけて確認することもできる。
# 欠損値確認
# train.describe().head()でも同様の結果
train.describe().collect()[0]
>>>
Row(summary='count', PassengerId='891', Survived='891', Pclass='891', Name='891', Sex='891', Age='714', SibSp='891', Parch='891', Ticket='891', Fare='891', Cabin='204', Embarked='889')
# 'Age'列の欠損値数を取得
train.filter(train['Age'].isNull()).count()
>>>
177
# 'Age'列の非欠損値数を取得
train.filter(train['Age'].isNotNull()).count()
>>>
714
カテゴリカル変数と数値変数のカラムを分ける
.dtypes
で、各カラムに対して(カラム名, データ型)が入ったタプルのリストを取得できるため、これを利用する。
train.dtypes
>>>
[('PassengerId', 'int'),
('Survived', 'int'),
('Pclass', 'int'),
('Name', 'string'),
('Sex', 'string'),
('Age', 'double'),
('SibSp', 'int'),
('Parch', 'int'),
('Ticket', 'string'),
('Fare', 'double'),
('Cabin', 'string'),
('Embarked', 'string')]
今回はデータ型が'double'のものを数値変数、'string'または'int'のものをカテゴリ変数として扱うことにし、リスト内包表記で以下のように分けた。
# カテゴリカル変数のカラムと数値変数のカラムを分ける
num_cols = [dtype[0] for dtype in train.dtypes if dtype[1] == 'double']
cat_cols = [dtype[0] for dtype in train.dtypes if dtype[1] != 'double']
# 目的変数の'Survived'だけ除いておく
cat_cols.remove('Survived')
print('カテゴリカル変数: {}'.format(cat_cols))
print('数値変数:{}'.format(num_cols))
>>>
カテゴリカル変数: ['PassengerId', 'Pclass', 'Name', 'Sex', 'SibSp', 'Parch', 'Ticket', 'Cabin', 'Embarked']
数値変数:['Age', 'Fare']
各カラムのユニーク数を調べる
Pandasでの.nunique()
に相当するもの。特徴量選択やカテゴリ変数の変換の際に参考にすることもあるかと。PySparkでは実装されていないため、やるなら以下のようにfor文で回す。(分散処理とfor文はあまり相性はよくないらしいが...)
# カテゴリ変数のユニーク数を調べる
nunique = {}
for col in cat_cols:
value = train.select(col).distinct().count()
nunique[col] = value
print(nunique)
>>>
{'PassengerId': 891,
'Pclass': 3,
'Name': 891,
'Sex': 2,
'SibSp': 7,
'Parch': 7,
'Ticket': 681,
'Cabin': 148,
'Embarked': 4}
カテゴリ変数の分布を調べる
分類タスクの場合に不均衡データかどうかを確認したいときなど。
# 'Sex'の比率を確認する
train.groupBy('Sex').count().show()
>>>
+------+-----+
| Sex|count|
+------+-----+
|female| 314|
| male| 577|
+------+-----+
ピボットテーブルを作成
.groupBy()
と.pivot()
を使って実装する。今回の例だと、(ものすごく有名だが)性別によって生死の割合が大きく異なっていることがわかる。
# ピボットテーブルを作る
train.groupBy('Survived').pivot("Sex").count().show()
>>>
+--------+------+----+
|Survived|female|male|
+--------+------+----+
| 1| 233| 109|
| 0| 81| 468|
+--------+------+----+
下のように、表示する要素を絞ったり、数値変数の統計量を表示することもできる。
# 'Survived', 'Embarked'ごとの'Age'の平均値を調べる
# 'Embarked'に関しては'C','Q','S'に限定
train.groupBy('Survived').pivot('Embarked', ['C', 'Q', 'S']).mean('Age').show()
>>>
+--------+------------------+------+------------------+
|Survived| C| Q| S|
+--------+------------------+------+------------------+
| 1| 28.97367088607595| 22.5| 28.11318407960199|
| 0|33.666666666666664|30.325|30.203966005665723|
+--------+------------------+------+------------------+
相関係数(相関行列)を調べる
DataFrameに対し.corr()
メソッドを使う。また、Spark MLのstatモジュールにあるCorrelation
を使えば、相関行列も取得可能。
# 'Age'と'Fare'の相関係数
train.corr('Age', 'Fare')
>>>
0.135515853527051
前処理
訓練データと評価データに分割
Scikit-learnのtrain_test_splitに相当するもの。PySparkの場合はDataFrameのメソッドとして実装されている。
# trainデータを7:3に分割
df_train, df_valid = train.randomSplit([0.7,0.3], seed=2020)
print('df_train: {} rows'.format(df_train.count()))
print('df_valid: {} rows'.format(df_valid.count()))
>>>
df_train: 651 rows
df_valid: 240 rows
不要カラムの削除
PySparkの場合、モデルに突っ込む前に改めて使用するカラムを一つのカラムにまとめる作業があるため、必ずしもここで不要カラムを削除する必要はないが、一応以下のようにできる。
# 不要なカラムを落とす
df_train = df_train.drop('PassengerId', 'Name', 'Cabin', 'Ticket')
df_valid = df_valid.drop('PassengerId', 'Name', 'Cabin', 'Ticket')
df_train.columns
>>>
['Survived', 'Pclass', 'Sex', 'Age', 'SibSp', 'Parch', 'Fare', 'Embarked']
欠損値処理(削除・補完)
dropna()
で欠損値を含む行を削除できる。引数は3つあり、how
では'any'か'all'を選択(デフォルト'any')。thresh
ではhow='any'の時にいくつ以上欠損があったら削除するか指定(int, デフォルトでは'None')。subset
では対象カラムを選択する(デフォルトでは'None')。
# 'Embarked' = null のデータを削除する
df_train = df_train.dropna(how='any', subset=['Embarked'])
df_valid = df_valid.dropna(how='any', subset=['Embarked'])
df_train.describe().show()
>>>
+-------+-------------------+------------------+------+-----------------+------------------+-------------------+-----------------+--------+
|summary| Survived| Pclass| Sex| Age| SibSp| Parch| Fare|Embarked|
+-------+-------------------+------------------+------+-----------------+------------------+-------------------+-----------------+--------+
| count| 889| 889| 889| 712| 889| 889| 889| 889|
| mean|0.38245219347581555|2.3115860517435323| null|29.64209269662921|0.5241844769403825|0.38245219347581555|32.09668087739029| null|
| stddev|0.48625968831477334|0.8346997785705753| null|14.49293290032352| 1.103704875596923| 0.8067607445174785|49.69750431670795| null|
| min| 0| 1|female| 0.42| 0| 0| 0.0| C|
| max| 1| 3| male| 80.0| 8| 6| 512.3292| S|
+-------+-------------------+------------------+------+-----------------+------------------+-------------------+-----------------+--------+
平均値または中央値で補完する場合は、Spark MLのImputer
が使える。一番右に新しく'Age_imputed'というカラムが追加されていることが確認できる。
# 'Age'カラムを中央値で補完する
from pyspark.ml.feature import Imputer
imputer = Imputer(
strategy='median',
inputCol='Age',
outputCol='Age_imputed'
)
model = imputer.fit(df_train)
model.transform(df_train)
model.transform(df_valid)
# 確認
df_train.describe().show()
>>>
+-------+-------------------+------------------+------+------------------+------------------+------------------+------------------+--------+------------------+
|summary| Survived| Pclass| Sex| Age| SibSp| Parch| Fare|Embarked| Age_imputed|
+-------+-------------------+------------------+------+------------------+------------------+------------------+------------------+--------+------------------+
| count| 649| 649| 649| 521| 649| 649| 649| 649| 649|
| mean| 0.386748844375963| 2.295839753466872| null|29.335259117082533|0.5469953775038521|0.3913713405238829|31.948169645608576| null|29.071910631741137|
| stddev|0.48738094472424587|0.8418076223501735| null| 14.67636802626401| 1.1130653931477|0.7940671982196961| 46.4778648584037| null| 13.15793243066804|
| min| 0| 1|female| 0.42| 0| 0| 0.0| C| 0.42|
| max| 1| 3| male| 80.0| 8| 6| 512.3292| S| 80.0|
+-------+-------------------+------------------+------+------------------+------------------+------------------+------------------+--------+------------------+
数値変数の標準化
Scikit-learn同様、Spark MLにもStandardScaler
が実装されている。ただし、Spark MLの場合は1カラムずつしか扱うことができないため、複数カラムを同時に標準化したい場合は、先にVectorAssembler
で処理したいカラムを1カラムにまとめてから、標準化する必要がある。
# 'Age_imputed'と'Fare'をまとめた'num_cols'列を作成
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(
inputCols = ['Age_imputed', 'Fare'],
outputCol='num_cols'
)
df_train = assembler.transform(df_train)
df_valid = assembler.transform(df_valid)
# 標準化
from pyspark.ml.feature import StandardScaler
sc = StandardScaler(inputCol='num_cols', outputCol='num_cols_scaled')
sc_model = sc.fit(df_train)
df_train = sc_model.transform(df_train)
df_valid = sc_model.transform(df_valid)
# 確認
df_train.select(['Age_imputed', 'Fare', 'num_cols', 'num_cols_scaled']).show(5)
>>>
+-----------+-------+--------------+--------------------+
|Age_imputed| Fare| num_cols| num_cols_scaled|
+-----------+-------+--------------+--------------------+
| 22.0| 7.25| [22.0,7.25]|[1.67199521018387...|
| 38.0|71.2833|[38.0,71.2833]|[2.88799172668123...|
| 26.0| 7.925| [26.0,7.925]|[1.97599433930821...|
| 35.0| 53.1| [35.0,53.1]|[2.65999237983797...|
| 35.0| 8.05| [35.0,8.05]|[2.65999237983797...|
+-----------+-------+--------------+--------------------+
only showing top 5 rows
カテゴリ変数のOne-Hotエンコーディング
One-Hotエンコーディングも同様に、Spark MLに実装されている。StandardScalerと違い複数カラムをいっぺんに処理することができるが、数値しか扱うことができない。そのため、文字列が入っているカラムは一度StringIndexser
で数値を振ってから(つまりLabelエンコーディングをしてから)、OneHotEncoder
で処理する必要がある。
# 'Sex', 'Embarked'を数値に変換
from pyspark.ml.feature import StringIndexer
idx_input = ['Sex', 'Embarked']
idx_output = [col + '_indexed' for col in idx_input]
indexer = StringIndexer(
inputCols=idx_input,
outputCols=idx_output
)
idx_model = indexer.fit(df_train)
df_train = idx_model.transform(df_train)
df_valid = idx_model.transform(df_valid)
# 確認
df_train.select(['Sex', 'Sex_indexed', 'Embarked', 'Embarked_indexed']).show(5)
>>>
+------+-----------+--------+----------------+
| Sex|Sex_indexed|Embarked|Embarked_indexed|
+------+-----------+--------+----------------+
| male| 0.0| S| 0.0|
|female| 1.0| C| 1.0|
|female| 1.0| S| 0.0|
|female| 1.0| S| 0.0|
| male| 0.0| S| 0.0|
+------+-----------+--------+----------------+
only showing top 5 rows
# One-Hotエンコーディング
from pyspark.ml.feature import OneHotEncoder
ohe_input = ['Pclass', 'Sex_indexed', 'SibSp', 'Parch', 'Embarked_indexed']
ohe_output = [col + '_encoded' for col in ohe_input]
ohe = OneHotEncoder(
inputCols=ohe_input,
outputCols=ohe_output,
dropLast=True
)
ohe_model = ohe.fit(df_train)
df_train = ohe_model.transform(df_train)
df_valid = ohe_model.transform(df_valid)
# 'Embarked'を確認
df_train['Embarked', 'Embarked_indexed', 'Embarked_indexed_encoded'].show(10)
>>>
+--------+----------------+------------------------+
|Embarked|Embarked_indexed|Embarked_indexed_encoded|
+--------+----------------+------------------------+
| S| 0.0| (2,[0],[1.0])|
| C| 1.0| (2,[1],[1.0])|
| S| 0.0| (2,[0],[1.0])|
| S| 0.0| (2,[0],[1.0])|
| S| 0.0| (2,[0],[1.0])|
| Q| 2.0| (2,[],[])|
| S| 0.0| (2,[0],[1.0])|
| S| 0.0| (2,[0],[1.0])|
| C| 1.0| (2,[1],[1.0])|
| S| 0.0| (2,[0],[1.0])|
+--------+----------------+------------------------+
このように、One-Hotエンコーディング後はSparseデータとなる。
学習に使用するカラムをまとめる
Scikit-learnと異なる点の一つとして、PySparkではモデルの学習に使用する特徴量を1つのカラムにまとめる必要がある。標準化のところでも使用したVectorAssembler
を使い、カラム名は'features'とするのが一般的らしい。
# 特徴量をまとめる
assembler2 = VectorAssembler(
inputCols=['Pclass_encoded',
'Sex_indexed_encoded',
'SibSp_encoded',
'Parch_encoded',
'Embarked_indexed_encoded',
'num_cols_scaled'],
outputCol='features'
)
df_train = assembler2.transform(df_train)
df_valid = assembler2.transform(df_valid)
df_train.select(['Survived', 'features']).show(5)
>>>
+--------+--------------------+
|Survived| features|
+--------+--------------------+
| 0|(22,[3,5,12,18,20...|
| 1|(22,[1,5,12,19,20...|
| 1|(22,[4,12,18,20,2...|
| 1|(22,[1,5,12,18,20...|
| 0|(22,[3,4,12,18,20...|
+--------+--------------------+
only showing top 5 rows
ちなみに、今回は元々カテゴリ変数がSparseデータ、数値変数がDenseデータ(普通のデータ)となっていましたが、これらをまとめると自動でSpaeseデータになるみたいです。
df_train.select(['features']).head(2)
>>>
# データ構造の確認
[Row(features=SparseVector(22, {3: 1.0, 5: 1.0, 12: 1.0, 18: 1.0, 20: 1.672, 21: 0.156})),
Row(features=SparseVector(22, {1: 1.0, 5: 1.0, 12: 1.0, 19: 1.0, 20: 2.888, 21: 1.5337}))]
モデル学習+評価
ロジスティック回帰
今回はシンプルにロジスティック回帰を使います。他にも決定木系やSVCなど、基本的なものはほとんどSpark MLに実装されています。
# ロジスティック回帰
from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression(labelCol='Survived')
lr_model = lr.fit(df_train)
正解ラベルが存在するものに関しては、モデルに対して.evaluation()
で結果のSummaryオブジェクトを取得でき、これに対して.predictions
で内容を確認することができます。
# 推論結果の取得
train_result = lr_model.evaluate(df_train)
valid_result = lr_model.evaluate(df_valid)
# 確認
valid_result.predictions.select(['Survived', 'rawPrediction', 'probability', 'prediction']).show()
>>>
+--------+--------------------+--------------------+----------+
|Survived| rawPrediction| probability|prediction|
+--------+--------------------+--------------------+----------+
| 1|[-0.0107443246616...|[0.49731394467449...| 1.0|
| 0|[2.10818940159344...|[0.89169660088758...| 0.0|
| 0|[2.71630875457920...|[0.93798215479564...| 0.0|
| 1|[-0.2429596953280...|[0.43955710975950...| 1.0|
| 0|[1.81502375560081...|[0.85996794511927...| 0.0|
+--------+--------------------+--------------------+----------+
only showing top 5 rows
AUCスコアの算出
モデル評価にはevaluationモジュール内のクラスを使う。今回のように2値分類でのAUCスコアの算出には、BinaryClassificationEvaluator
を使う。
# モデル評価(AUC)
from pyspark.ml.evaluation import BinaryClassificationEvaluator
evaluater = BinaryClassificationEvaluator(
rawPredictionCol='prediction',
labelCol='Survived',
metricName='areaUnderROC' # デフォルトでAUCスコアになっている
)
auc_train = evaluater.evaluate(train_result.predictions)
auc_valid = evaluater.evaluate(valid_result.predictions)
# 確認
print('AUCスコア')
print('train:', auc_train)
print('valid:', auc_valid)
>>>
AUCスコア
train: 0.7889497287232977
valid: 0.8065704293474217
'f1', 'recall', 'precision'などを使いたい時は、MulticlassClassificationEvaluator
の方を使う必要がある。
# モデル評価(F1)
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluater = MulticlassClassificationEvaluator(
predictionCol='prediction',
labelCol='Survived',
metricName='f1'
)
f1_train = evaluater.evaluate(train_result.predictions)
f1_valid = evaluater.evaluate(valid_result.predictions)
print('f1スコア')
print('train:', f1_train)
print('valid:', f1_valid)
>>>
f1スコア
train: 0.8032854491383006
valid: 0.8270897150503879
テストデータ(未知データ)の推論
欠損値補完(最頻値)
未知データに対して、欠損データを削除するのは好ましくないため、ここでは'Embarked'を最頻値の'S'で埋める。
# 'Embarked' = null のデータを最頻値で補完する
train = train.fillna({'Embarked': 'S'})
test = test.fillna({'Embarked': 'S'})
###パイプラインで処理をまとめる
モデル評価までで使用したプロセスをPipelineにまとめる。基本的にはScikit-learnのPipelineと同様のものと考えて良さそう。
# パイプラインの設定
from pyspark.ml import Pipeline, PipelineModel
stages=[
imputer, #'Age'の欠損値補完
assembler, #'Age' と 'Fare' を1つのベクトルに
sc, #'Age' と 'Fare' を標準化
indexer, #'Sex' と 'Embarked' を数値化
ohe, #OneHotEncodeing
assembler2, #使用する特徴量をまとめる
lr #ロジスティック回帰
]
pipeline = Pipeline(stages=stages)
# モデル学習
pipeline_model = pipeline.fit(train)
# モデルの保存
model_path = './model/lr_base'
pipeline_model.save(model_path)
# モデル再読み込み
pipeline_model = PipelineModel.load(model_path)
# 推論
train_result = pipeline_model.transform(train)
test_result = pipeline_model.transform(test)
# 予測結果の表示
submission = test_result.withColumn('Survived', test_result['prediction'].cast('int'))
submission = submission.select('PassengerId', 'Survived')
submission.show()
>>>
+-----------+--------+
|PassengerId|Survived|
+-----------+--------+
| 892| 0|
| 893| 0|
| 894| 0|
| 895| 0|
| 896| 1|
| 897| 0|
| 898| 1|
| 899| 0|
| 900| 1|
| 901| 0|
| 902| 0|
| 903| 0|
| 904| 1|
| 905| 0|
| 906| 1|
| 907| 1|
| 908| 0|
| 909| 0|
| 910| 1|
| 911| 1|
+-----------+--------+
only showing top 20 rows
おわりに
Titanicコンペを題材に、PySparkで基礎的な機械学習手法を実装してみました。Spark MLはScikit-learnの影響を強く受けているらしく似ている部分も多かったですが、なぜか標準化を(そのまま使うと)1カラムずつしかできなかったり、One-Hotの前にLabelエンコーディングをする必要があったりと、ややクセがあるように感じました。今後はSpark MLだけでなくSparkの外部パッケージなども使ってみたいと思います。
参考
https://data-analysis-stats.jp/spark/pyspark%E3%81%A7%E6%AC%A0%E6%90%8D%E5%80%A4null%E3%81%AE%E5%8F%96%E3%82%8A%E6%89%B1%E3%81%84%E6%96%B9%E6%B3%95/
https://qiita.com/calderarie/items/d37462d7eafef04891b8
https://qiita.com/gsy0911/items/a4cb8b2d54d6341558e0#window%E9%96%A2%E6%95%B0
PySpark公式ドキュメント