LoginSignup
114
82

More than 3 years have passed since last update.

pysparkでデータハンドリングする時によく使うやつメモ

Last updated at Posted at 2018-07-25

この記事について

pysparkのデータハンドリングでよく使うものをスニペット的にまとめていく。随時追記中。
勉強しながら書いているので網羅的でないのはご容赦を。
Databricks上での実行、sparkは2.3.0以降, pythonは3.6以降を利用することを想定。

既存データからDataFrameの作成

# csvk形式1(spark DataFrameから書き出されたデータなど、データが複数にまたがっている場合)
df = spark.read.csv("s3://my-backet/my-data/*.csv")
# csv形式1(単一のファイルの場合。そもそもあまりない状況だと思うが…。状況にもよるが後にrepartion()実行を推奨)
df = spark.read.csv("s3://my-backet/my-data/data.csv")
# parquet形式
df = spark.read.parquet("s3://my-backet/my-data/")

DataFrameの書き出し

# csv形式1
# 出力先ファイルを一つにしたい時にはcoalesce(1)を使う
df.coalesce(1).write.mode('overwrite').csv("s3://my-backet/my-output/")

# csv形式2
# すでに存在するs3 backetに追加する場合は、mode('append')を使う
# そのままだとheaderがつかないので、必要な場合はheader=Trueを設定
df.coalesce(1).write.mode('append').csv("s3://my-backet/my-output/", header=True)

# parquet形式
df.write.mode('overwrite').parquet("s3://my-backet/my-output/")

新しい列を作成

# col1とcol2の値を足し合わせて新しい"newcol"を作成
from pyspark.sql.functions import col
df = df.withColumn("newcol", col('col1') + col('col2'))

# すでにある列("existing_col")をrenameして新しい列"(renamed_col")を作成
df = df.withColumnRenamed("existing_col", "renamed_col")

DataFrameのパーティション周りの操作

# DataFrameのパーティション数の確認
df.rdd.getNumPartitions()
# DataFrameのパーティション数を1にする
df = df.coalesce(1)
# DataFrameのパーティション数を100にする
df = df.repartion(100)

パーティション周りの細かいtips

  • coalesce(1) は使うと処理が遅くなる。また、一つのワーカーノードに収まらないデータ量のDataFrameに対して実行するとメモリ不足になれば処理が落ちる(どちらもsparkが分散処理基盤であることを考えれば当たり前といえば当たり前だが)。
  • coalesce() は現在のパーティション以下にしか設定できないが、repartition() は現在のパーティション数より多い数も設定できる。

データの加工系諸々

timestampの取り扱い

from pyspark.sql.functions import to_timestamp, from_utc_timestamp, date_format, from_unixtime

# データ変換シリーズ
# unixtime(e.g. 1552443740)をtimestamp型に変換
df = df.withColumn("timestamp", to_timestamp(from_unixtime(col("unix_timestamp"))))
# timestampっぽい文字列をtimestampに変換
df = df.withColumn('unix_timestamp_utc', to_timestamp(col('time_str'), 'yyyy-MM-dd HH:mm:ss'))
# UTCで格納されているtimestampをJSTに変換
df = df.withColumn('unix_timestamp_jst', from_utc_timestamp(col('unix_timestamp_utc'), "JST"))

# timestampから特定の部分を取り出すシリーズ
# 曜日("Mon","Tue"みたいに文字列で取り出すパターン)
df = df.withColumn('dow', date_format(col('unix_timestamp_jst'), 'EE'))
# 曜日(1(月曜), 2(火曜),..., 7(日曜)みたいに整数で取り出すパターン)
df = df.withColumn('dow', date_format(col('unix_timestamp_jst'), 'u'))
# 時間(hour)
df = df.withColumn('hour', date_format(col('unix_timestamp_jst'), 'HH'))

when() を用いた条件分岐

pysparkのデータハンドリングで条件分岐を書く際には when() を使う

from pyspark.sql.functions import when 
# 1,2の値で格納されている性別を、1:'male', 2:'female', それ以外:'unknown' にする
df = df.withColumn('gender', when(col('gender')==1, 'male').when(col('gender')==2, 'female').otherwise('unknown'))

window関数

分析に必須のwindow関数も書いておく。
この例は、window内の次のtimestampを新しい列として格納するための処理。

from pyspark.sql.window import Window

# 長くなるので予めOVER句の中身を変数として切り出しておく
window_schema = Window.partitionBy(col("uuid")).orderBy(col("timestamp").asc())

df = df.withColumn("next_timestamp", lead("timestamp", 1).over(window_schema))

欠損値操作

# 'na_colA'と'na_colB'を0埋めする。カラム名はsubsetで渡す。
df = df.fillna(0, subset=['na_colA', 'na_colB'])

# subsetは辞書形式でも渡せる。na_colAは"A"埋め、na_colBは"B"埋めする。
df = df.fillna({"na_colA":"A","na_colB":"B"})

spark DataFrameの値をpythonオブジェクトにする

後段でやりたい処理のためにFor文に突っ込んだり、カテゴリ変数のユニークな値をサクッと確認したりに便利なやつ。
Spark DataFrameの値をlistなどpythonオブジェクトとして持つには、rdd APIの collect() が有効です。

# おなじみのirisデータからspark DataFrameを作る
iris_df = spark.read.csv('iris.csv', header=True)
# 種類のユニークを取る
sp_list = iris_df.select("Species").distinct().rdd.flatMap(lambda x:x).collect()

UDFの定義

現在、sparkのpythonUDFはパフォーマンスの面でspark標準の関数に劣るため使用は推奨されない。ただし、spark2.3以降から登場したpandas UDFはpandasのDataFrame-likeにvectrizedされた状態で渡されるので、通常のUDFに比べて処理が速い。関数の中身の記述方法は基本的には変わらないが、pandasベースの処理が使えたりもする。例えば返り値にSeriesの値を指定できる。

import pandas as pd
from scipy import stats
from pyspark.sql.functions import udf, pandas_udf


# 一番シンプルな記法(udf関数で処理内容をラップする)
plus_one = udf(lambda x: x + 1, IntegerType())


# いわゆる普通のpython UDF(デコレータを使って渡している)
@udf(Doubletype())
def root(x):
    return x ** 0.5


# pandas UDF(SCALER type)
@pandas_udf('double')
def cdf(v):
    return pd.Series(stats.norm.cdf(v))

参考:
https://databricks.com/blog/2017/10/30/introducing-vectorized-udfs-for-pyspark.html

UDFの活用例: groupby medianを算出する

2018年8月現在、pyspark.sqlモジュールには、medianがないのでUDFで実装する必要がある。
実装の一例はこちら。

@udf(DoubleType())
def udf_median(values_list):
    med = np.median(values_list)
    return float(med)
df.groupBy('user_category').agg(udf_median(collect_list('cnt')).alias('median_cnt'))

おまけ:UDFに複数カラムのデータを渡す

複数列の結果を使ってUDF内で演算したい時に、UDFにそのままカラムのリストを渡すと怒られるので、例えばstruct()を使うことで回避できる。

from pyspark.sql.types import IntegerType
from pyspark.sql.functions import udf, struct

sum_cols = udf(lambda x: x[0]+x[1], IntegerType())
a = spark.createDataFrame([(101, 1, 16)], ['ID', 'A', 'B'])
a.show()
a.withColumn('Result', sum_cols(struct('A', 'B'))).show()

参考:
https://stackoverflow.com/questions/42540169/pyspark-pass-multiple-columns-in-udf

114
82
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
114
82