Help us understand the problem. What is going on with this article?

pysparkでデータハンドリングする時によく使うやつメモ

この記事について

pysparkのデータハンドリングでよく使うものをスニペット的にまとめていく。随時追記中。
勉強しながら書いているので網羅的でないのはご容赦を。
Databricks上での実行、sparkは2.3.0以降, pythonは3.6以降を利用することを想定。

既存データからDataFrameの作成

# csvk形式1(spark DataFrameから書き出されたデータなど、データが複数にまたがっている場合)
df = spark.read.csv("s3://my-backet/my-data/*.csv")
# csv形式1(単一のファイルの場合。そもそもあまりない状況だと思うが…。状況にもよるが後にrepartion()実行を推奨)
df = spark.read.csv("s3://my-backet/my-data/data.csv")
# parquet形式
df = spark.read.parquet("s3://my-backet/my-data/")

DataFrameの書き出し

# csv形式1
# 出力先ファイルを一つにしたい時にはcoalesce(1)を使う
df.coalesce(1).write.mode('overwrite').csv("s3://my-backet/my-output/")

# csv形式2
# すでに存在するs3 backetに追加する場合は、mode('append')を使う
# そのままだとheaderがつかないので、必要な場合はheader=Trueを設定
df.coalesce(1).write.mode('append').csv("s3://my-backet/my-output/", header=True)

# parquet形式
df.write.mode('overwrite').parquet("s3://my-backet/my-output/")

新しい列を作成

# col1とcol2の値を足し合わせて新しい"newcol"を作成
from pyspark.sql.functions import col
df = df.withColumn("newcol", col('col1') + col('col2'))

# すでにある列("existing_col")をrenameして新しい列"(renamed_col")を作成
df = df.withColumnRenamed("existing_col", "renamed_col")

DataFrameのパーティション周りの操作

# DataFrameのパーティション数の確認
df.rdd.getNumPartitions()
# DataFrameのパーティション数を1にする
df = df.coalesce(1)
# DataFrameのパーティション数を100にする
df = df.repartion(100)

パーティション周りの細かいtips

  • coalesce(1) は使うと処理が遅くなる。また、一つのワーカーノードに収まらないデータ量のDataFrameに対して実行するとメモリ不足になれば処理が落ちる(どちらもsparkが分散処理基盤であることを考えれば当たり前といえば当たり前だが)。
  • coalesce() は現在のパーティション以下にしか設定できないが、repartition() は現在のパーティション数より多い数も設定できる。

データの加工系諸々

timestampの取り扱い

from pyspark.sql.functions import to_timestamp, from_utc_timestamp, date_format, from_unixtime

# データ変換シリーズ
# unixtime(e.g. 1552443740)をtimestamp型に変換
df = df.withColumn("timestamp", to_timestamp(from_unixtime(col("unix_timestamp"))))
# timestampっぽい文字列をtimestampに変換
df = df.withColumn('unix_timestamp_utc', to_timestamp(col('time_str'), 'yyyy-MM-dd HH:mm:ss'))
# UTCで格納されているtimestampをJSTに変換
df = df.withColumn('unix_timestamp_jst', from_utc_timestamp(col('unix_timestamp_utc'), "JST"))

# timestampから特定の部分を取り出すシリーズ
# 曜日("Mon","Tue"みたいに文字列で取り出すパターン)
df = df.withColumn('dow', date_format(col('unix_timestamp_jst'), 'EE'))
# 曜日(1(月曜), 2(火曜),..., 7(日曜)みたいに整数で取り出すパターン)
df = df.withColumn('dow', date_format(col('unix_timestamp_jst'), 'u'))
# 時間(hour)
df = df.withColumn('hour', date_format(col('unix_timestamp_jst'), 'HH'))

when() を用いた条件分岐

pysparkのデータハンドリングで条件分岐を書く際には when() を使う

from pyspark.sql.functions import when 
# 1,2の値で格納されている性別を、1:'male', 2:'female', それ以外:'unknown' にする
df = df.withColumn('gender', when(col('gender')==1, 'male').when(col('gender')==2, 'female').otherwise('unknown'))

window関数

分析に必須のwindow関数も書いておく。
この例は、window内の次のtimestampを新しい列として格納するための処理。

from pyspark.sql.window import Window

# 長くなるので予めOVER句の中身を変数として切り出しておく
window_schema = Window.partitionBy(col("uuid")).orderBy(col("timestamp").asc())

df = df.withColumn("next_timestamp", lead("timestamp", 1).over(window_schema))

欠損値操作

# 'na_colA'と'na_colB'を0埋めする。カラム名はsubsetで渡す。
df = df.fillna(0, subset=['na_colA', 'na_colB'])

# subsetは辞書形式でも渡せる。na_colAは"A"埋め、na_colBは"B"埋めする。
df = df.fillna({"na_colA":"A","na_colB":"B"})

spark DataFrameの値をpythonオブジェクトにする

後段でやりたい処理のためにFor文に突っ込んだり、カテゴリ変数のユニークな値をサクッと確認したりに便利なやつ。
Spark DataFrameの値をlistなどpythonオブジェクトとして持つには、rdd APIの collect() が有効です。

# おなじみのirisデータからspark DataFrameを作る
iris_df = spark.read.csv('iris.csv', header=True)
# 種類のユニークを取る
sp_list = iris_df.select("Species").distinct().rdd.flatMap(lambda x:x).collect()

UDFの定義

現在、sparkのpythonUDFはパフォーマンスの面でspark標準の関数に劣るため使用は推奨されない。ただし、spark2.3以降から登場したpandas UDFはpandasのDataFrame-likeにvectrizedされた状態で渡されるので、通常のUDFに比べて処理が速い。関数の中身の記述方法は基本的には変わらないが、pandasベースの処理が使えたりもする。例えば返り値にSeriesの値を指定できる。

import pandas as pd
from scipy import stats
from pyspark.sql.functions import udf, pandas_udf


# 一番シンプルな記法(udf関数で処理内容をラップする)
plus_one = udf(lambda x: x + 1, IntegerType())


# いわゆる普通のpython UDF(デコレータを使って渡している)
@udf(Doubletype())
def root(x):
    return x ** 0.5


# pandas UDF(SCALER type)
@pandas_udf('double')
def cdf(v):
    return pd.Series(stats.norm.cdf(v))

参考:
https://databricks.com/blog/2017/10/30/introducing-vectorized-udfs-for-pyspark.html

UDFの活用例: groupby medianを算出する

2018年8月現在、pyspark.sqlモジュールには、medianがないのでUDFで実装する必要がある。
実装の一例はこちら。

@udf(DoubleType())
def udf_median(values_list):
    med = np.median(values_list)
    return float(med)
df.groupBy('user_category').agg(udf_median(collect_list('cnt')).alias('median_cnt'))

おまけ:UDFに複数カラムのデータを渡す

複数列の結果を使ってUDF内で演算したい時に、UDFにそのままカラムのリストを渡すと怒られるので、例えばstruct()を使うことで回避できる。

from pyspark.sql.types import IntegerType
from pyspark.sql.functions import udf, struct

sum_cols = udf(lambda x: x[0]+x[1], IntegerType())
a = spark.createDataFrame([(101, 1, 16)], ['ID', 'A', 'B'])
a.show()
a.withColumn('Result', sum_cols(struct('A', 'B'))).show()

参考:
https://stackoverflow.com/questions/42540169/pyspark-pass-multiple-columns-in-udf

Why not register and get more from Qiita?
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away
Comments
No comments
Sign up for free and join this conversation.
If you already have a Qiita account
Why do not you register as a user and use Qiita more conveniently?
You need to log in to use this function. Qiita can be used more conveniently after logging in.
You seem to be reading articles frequently this month. Qiita can be used more conveniently after logging in.
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away
ユーザーは見つかりませんでした