はじめに
機械学習のための特徴量エンジニアリングで取り上げられているテクニックについて、Sparkでも同様の処理をまとめます。
そのため、下記のpandas版のコードは引用がメインになります。
あまり慣れてないものもあるので、間違いを見つけ方は是非ご指摘頂ければと思います。
後、ここでは各処理の効用については触れません。
本記事での表記について
各処理サンプルは、以下の規則で書いていきます。
以下サンプルで参照する変数
import numpy as np
import pandas as pd
feature # PandasのSeries / SparkDataFrameの変数名
df # PandasDataFrame / SparkDataFrame
数値型
離散化
固定長の離散化
pandasで固定長の離散化
np.floor_divide(feature, 10)
pysparkで固定長の離散化
from pyspark.sql.functions import col, floor
df.withColumn('discretized', floor(col('feature') / 10))
対数階級による離散化
pandasで対数階級による離散化
np.floor(np.log10(feature))
pysparkで対数階級による離散化
from pyspark.sql.functions import floor, log10
df.withColumn('discretized', floor(log10('feature')))
分位数による離散化
pandasで分位数による離散化
# 10分位数への変換
pd.qcut(feature, 10, labels=False)
# 10分位数の計算
feature.quantile(np.linspace(0, 1, num=10+1))
pysparkで分位数による離散化
from pyspark.ml.feature import QuantileDiscretizer
# 10分位数への変換
qds = QuantileDiscretizer(numBuckets=10, inputCol='feature', outputCol='quantile')
qds.fit(df).transform(df)
# 10分位数の計算
df.approxQuantile('feature', list(np.linspace(0, 1, num=10+1)), 0)
スケーリング
Min-Maxスケーリング
pandasでMin-Maxスケーリング
from sklearn.preprocessing import minmax_scale
minmax_scale(feature)
pysparkでMin-Maxスケーリング
from pyspark.sql import functions as F
from pyspark.sql.window import Window
(
sdf
.withColumn('min', F.min('feature').over(Window.orderBy()))
.withColumn('max', F.max('feature').over(Window.orderBy()))
.withColumn('scaled', (F.col('feature') - F.col('min'))/(F.col('max') - F.col('min')))
)
※SparkMLのMinMaxScalerは、Vector型に変換する必要があり、列単位での処理の正規化には取り扱いにくい
(参考)SparkMLのMinMaxScaler
from pyspark.ml.feature import MinMaxScaler
from pyspark.ml.feature import VectorAssembler
vector_assembler = VectorAssembler(
inputCols=['feature'],
outputCol='feature_vector',
)
vector_df = vector_assembler.transform(df)
minmax_scaler = MinMaxScaler(inputCol='feature_vector', outputCol='scaled_vector').fit(vector_df)
scaled_df = minmax_scaler.transform(vector_df)
zスコア変換
pandasでzスコア変換
from sklearn.preprocessing import StandardScaler
StandardScaler().fit_transform(feature)
pysparkでzスコア変換
from pyspark.sql.window import Window
(
df
.withColumn('mean', F.mean('feature').over(Window.orderBy()))
.withColumn('std', F.stddev('feature').over(Window.orderBy()))
.withColumn('scaled', (F.col('feature') - F.col('mean')) / F.col('std'))
)
※SparkMLのStandardScalerは、Vector型に変換する必要があり、列単位での処理の正規化には取り扱いにくい
L2正規化
pandasでL2正規化
from sklearn.preprocessing import normalize
normalize(feature, axis=0)
pysparkでL2正規化
from pyspark.sql import functions as F
df.withColumn('scaled', F.col('n_tokens_content') / F.sqrt(F.sum(F.pow(F.col('n_tokens_content'), 2)).over(Window.orderBy())))
※SparkMLのNormalizerは、列方向の正規化に対応しておらず、自力で計算する必要がある?
カテゴリ型
One-Hotエンコーディング
pandasでOne-Hotエンコーディング
one_hot_df = pd.get_dummies(df,
dummy_na=False, # 欠損を含めるか
drop_first=False) # Trueでダミーコーディング
one_hot_df.head()
pysparkでOne-Hotエンコーディング
from pyspark.ml.feature import OneHotEncoder, StringIndexer
stringIndexer = StringIndexer(inputCol="category", outputCol="categoryIndex")
model = stringIndexer.fit(df)
indexed = model.transform(df)
encoder = OneHotEncoder(inputCol="categoryIndex", outputCol="categoryVec")
encoded = encoder.transform(indexed)
pysparkでOne-Hotエンコーディング
from pyspark.sql.functions import when
def spark_onehot(df, feature):
d = df
for c in [c[feature] for c in df.select(feature).distinct().collect()]:
d = d.withColumn('{}_{}'.format(feature, c), when(df[feature] == c, 1).otherwise(0))
return d
spark_onehot(df, 'category')
特徴量ハッシング
pandasで特徴量ハッシング
from sklearn.feature_extraction import FeatureHasher
h = FeatureHasher(n_features=m, input_type='string')
hashed = h.transform(df['feature'])
pysparkで特徴量ハッシング
from pyspark.ml.feature import FeatureHasher
hasher = FeatureHasher(numFeatures=m, # ハッシュテーブルのサイズ
inputCols=features, # 特徴量名のリスト
outputCol="hashed")
featurized = hasher.transform(df)
その他
交互作用
pandasで交互作用特徴量の作成
from sklearn.preprocessing import PolynomialFeatures
# 数値のみのDFに対して、2次の交互作用特徴量を作成
# interaction_onlyはべき乗演算を含めないか
# include_biasは定数項の有無
df = PolynomialFeatures(degree=2, interaction_only=False, include_bias=False).fit_transform(df)
pysparkで交互作用特徴量の作成
from pyspark.ml.feature import PolynomialExpansion, VectorAssembler
vector_assembler = VectorAssembler(
inputCols=features, # 特徴量名のリスト
outputCol='feature_vector',
)
vector_df = vector_assembler.transform(df)
polyExpansion = PolynomialExpansion(degree=2, inputCol="feature_vector", outputCol="polyFeatures")
polyDF = polyExpansion.transform(vector_df)