12
12

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

PandasとPySparkの特徴量取り扱い対比

Posted at

はじめに

機械学習のための特徴量エンジニアリングで取り上げられているテクニックについて、Sparkでも同様の処理をまとめます。
そのため、下記のpandas版のコードは引用がメインになります。
あまり慣れてないものもあるので、間違いを見つけ方は是非ご指摘頂ければと思います。

後、ここでは各処理の効用については触れません。

本記事での表記について

各処理サンプルは、以下の規則で書いていきます。

以下サンプルで参照する変数
import numpy as np
import pandas as pd

feature  # PandasのSeries / SparkDataFrameの変数名
df  # PandasDataFrame / SparkDataFrame

数値型

離散化

固定長の離散化

pandasで固定長の離散化
np.floor_divide(feature, 10)
pysparkで固定長の離散化
from pyspark.sql.functions import col, floor

df.withColumn('discretized', floor(col('feature') / 10))

対数階級による離散化

pandasで対数階級による離散化
np.floor(np.log10(feature))
pysparkで対数階級による離散化
from pyspark.sql.functions import floor, log10

df.withColumn('discretized', floor(log10('feature')))

分位数による離散化

pandasで分位数による離散化
# 10分位数への変換
pd.qcut(feature, 10, labels=False)
# 10分位数の計算
feature.quantile(np.linspace(0, 1, num=10+1))
pysparkで分位数による離散化
from pyspark.ml.feature import QuantileDiscretizer

# 10分位数への変換
qds = QuantileDiscretizer(numBuckets=10, inputCol='feature', outputCol='quantile')
qds.fit(df).transform(df)

# 10分位数の計算
df.approxQuantile('feature', list(np.linspace(0, 1, num=10+1)), 0)

スケーリング

Min-Maxスケーリング

pandasでMin-Maxスケーリング
from sklearn.preprocessing import minmax_scale

minmax_scale(feature)
pysparkでMin-Maxスケーリング
from pyspark.sql import functions as F
from pyspark.sql.window import Window

(
    sdf
    .withColumn('min', F.min('feature').over(Window.orderBy()))
    .withColumn('max', F.max('feature').over(Window.orderBy()))
    .withColumn('scaled', (F.col('feature') - F.col('min'))/(F.col('max') - F.col('min')))
)

※SparkMLのMinMaxScalerは、Vector型に変換する必要があり、列単位での処理の正規化には取り扱いにくい

(参考)SparkMLのMinMaxScaler
from pyspark.ml.feature import MinMaxScaler
from pyspark.ml.feature import VectorAssembler

vector_assembler = VectorAssembler(
    inputCols=['feature'],
    outputCol='feature_vector',
)
vector_df = vector_assembler.transform(df)

minmax_scaler = MinMaxScaler(inputCol='feature_vector', outputCol='scaled_vector').fit(vector_df)
scaled_df = minmax_scaler.transform(vector_df)

zスコア変換

pandasでzスコア変換
from sklearn.preprocessing import StandardScaler

StandardScaler().fit_transform(feature)
pysparkでzスコア変換
from pyspark.sql.window import Window

(
    df
    .withColumn('mean', F.mean('feature').over(Window.orderBy()))
    .withColumn('std', F.stddev('feature').over(Window.orderBy()))
    .withColumn('scaled', (F.col('feature') - F.col('mean')) / F.col('std'))
)

※SparkMLのStandardScalerは、Vector型に変換する必要があり、列単位での処理の正規化には取り扱いにくい

L2正規化

pandasでL2正規化
from sklearn.preprocessing import normalize

normalize(feature, axis=0)
pysparkでL2正規化
from pyspark.sql import functions as F

df.withColumn('scaled', F.col('n_tokens_content') / F.sqrt(F.sum(F.pow(F.col('n_tokens_content'), 2)).over(Window.orderBy())))

※SparkMLのNormalizerは、列方向の正規化に対応しておらず、自力で計算する必要がある?

カテゴリ型

One-Hotエンコーディング

pandasでOne-Hotエンコーディング
one_hot_df = pd.get_dummies(df,
                            dummy_na=False,  # 欠損を含めるか
                            drop_first=False)  # Trueでダミーコーディング
one_hot_df.head()
pysparkでOne-Hotエンコーディング
from pyspark.ml.feature import OneHotEncoder, StringIndexer

stringIndexer = StringIndexer(inputCol="category", outputCol="categoryIndex")
model = stringIndexer.fit(df)
indexed = model.transform(df)

encoder = OneHotEncoder(inputCol="categoryIndex", outputCol="categoryVec")
encoded = encoder.transform(indexed)
pysparkでOne-Hotエンコーディング
from pyspark.sql.functions import when

def spark_onehot(df, feature):
  d = df
  for c in [c[feature] for c in df.select(feature).distinct().collect()]:
    d = d.withColumn('{}_{}'.format(feature, c), when(df[feature] == c, 1).otherwise(0))
  return d

spark_onehot(df, 'category')

特徴量ハッシング

pandasで特徴量ハッシング
from sklearn.feature_extraction import FeatureHasher

h = FeatureHasher(n_features=m, input_type='string')
hashed = h.transform(df['feature'])
pysparkで特徴量ハッシング
from pyspark.ml.feature import FeatureHasher

hasher = FeatureHasher(numFeatures=m,  # ハッシュテーブルのサイズ
                       inputCols=features,  # 特徴量名のリスト
                       outputCol="hashed")

featurized = hasher.transform(df)

その他

交互作用

pandasで交互作用特徴量の作成
from sklearn.preprocessing import PolynomialFeatures

# 数値のみのDFに対して、2次の交互作用特徴量を作成
# interaction_onlyはべき乗演算を含めないか
# include_biasは定数項の有無
df = PolynomialFeatures(degree=2, interaction_only=False, include_bias=False).fit_transform(df)
pysparkで交互作用特徴量の作成
from pyspark.ml.feature import PolynomialExpansion, VectorAssembler

vector_assembler = VectorAssembler(
    inputCols=features,  # 特徴量名のリスト
    outputCol='feature_vector',
)
vector_df = vector_assembler.transform(df)

polyExpansion = PolynomialExpansion(degree=2, inputCol="feature_vector", outputCol="polyFeatures")
polyDF = polyExpansion.transform(vector_df)

参考

12
12
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
12
12

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?