データ分析時にpysparkで使用する操作をまとめました。
随時更新予定です。
準備
import datetime as dt
import pandas as pd
import numpy as np
from pyspark.sql import functions as F
from pyspark.sql import Window
from pyspark.sql.types import *
#データの読み込み/書き込み
###データ読みこみ
# csv
sdf = spark.read.csv('./file.csv', header=True)
# parquet
sdf = spark.read.parquet('./file.parquet')
# pandas Dataframeから
sdf = spark.createDataFrame(pdf)
###データ書き込み
# csv
sdf.write.csv("./file.csv", mode="overwrite", header=True)
# parquet
sdf.write.parquet("./tmp.parquet", mode="overwrite")
###pandasへの変換
pdf = sdf.toPandas()
#簡単なデータ内容確認
###最初の5行を表示
sdf.show(5)
# カラムが多くて見にくいときは一回Pandasにいれる
sdf.limit(5).toPandas()
###カラム抽出
sdf.select("col1", "col2")
###データの行数、列数
# shapeはない
sdf.count(), len(sdf.columns)
###カラム名と型
sdf.printSchema()
###ソート
sdf_sort = sdf.sort(F.col("col_1"), F.col("col_2").desc())
#データの整形
###カラム名の変更
# 1つずつ
renamed_sdf = (
sdf
.withColumnRenamed("col1_before", "col1_after")
.withColumnRenamed("col2_before", "col2_after")
)
# まとめて(pandasでいう下記の処理)
# pdf.columns = ["col1","col2","col3"]
col_list = ["col1", "col2", "col3"]
renamed_sdf = sdf.toDF(*col_list)
###型変換
# 主要の型:"string", "boolean", "int", "double", "float", "date", "timestamp"
parsed_sdf = (
sdf
# str → int
.withColumn("int_col", F.col("str_col1").cast("int"))
# str → date
.withColumn("date_col", F.col("str_col2").cast("date"))
)
###条件抽出
# or条件は「&」の代わりに「|」を使う
filtered_sdf = (
sdf
.filter(
(F.col("str_col") != "abc")
& (F.col("int_col") >= 4)
& (F.col("float_col").between(0.5, 1.0))
& (F.col("date_col").between("2020-3-1", "2020-3-2"))
# timestampを日付のみで抽出しようとするとその日の00:00:00にcastされる
# 下記は2020-3-2 00:00:00 までしか含まないので注意
& (F.col("timestamp_col").between("2020-3-1", "2020-3-2"))
)
)
###重複削除
sdf_drop_duplicate = sdf.dropDuplicates(subset = ["col1"])
###case文
sdf_case=(
sdf.withColumn("case_col",
F.when(F.col("int_col") >= 80, "A")
.when(F.col("int_col") >= 50, "B")
.otherwise("C"))
)
###欠損補完
sdf_fillna=(
.fillna("word", subset=["missing_col1", "missing_col2"])
# 日付型にfillnaは使えない
.withColumn("date_col",
F.when(F.col("date_col").isNull(), dt.date(1900, 1, 1))
.otherwise(F.col("date_col")))
)
nullではない直前の値での補完は下記。(pandasでのfillna(method='ffill')
)
# 数値型の場合は一旦strにcastする
w = Window.partitionBy("grp").orderBy("row_num").rowsBetween(-np.inf, 0))
sdf_fillfoward=(
sdf
.withColumn("fill_foward", F.last("str_col", ignorenulls=True).over(w)
)
#結合
sdf_join=(
sdf_left
.join(sdf_right1, on="key1", how="left")
# keyが複数
.join(sdf_right2, on=["key1", "key2"], how="left")
# keyの名前が異なる
.join(sdf_right3,
on=(sdf_left.key1 == sdf_right3.key_1),
how="left")
# keyの名前が異なる且つ複数
.join(sdf_right3,
on=((sdf_left.key1 == sdf_right3.key_1) & (sdf_left.key2 == sdf_right3.key_2)),
how="left")
)
結合の結果、key以外のカラム名が重複すると後にエラーが発生する。join後にselectするか、join前にカラム名を変更しておくかで対処する。
# join後にselect
sdf_join=(
sdf_left
.join(sdf_right, on="key1")
.select(
sdf_left.key1,
sdf_left.str_col,
sdf_right.int_col
)
)
# 予めカラム名にsuffix(接尾後)を付与してからjoin
sdf_left=sdf_left.select(*(F.col(x).alias(x + '_x') for x in sdf_left.columns))
sdf_right=sdf_right.select(*(F.col(x).alias(x + '_y')
for x in sdf_right.columns))
sdf_join=(
sdf_left
.join(sdf_right, on=(sdf_left.key1_x == sdf_right .key1_y))
)
#集計
###基礎統計量
# count, mean, stddev, min, max,
sdf.describe().show()
###基本的な集計
# 集計後にaliasでカラム名を変更
sdf_agg = (
sdf
.agg(
F.max("col1").alias("col_max"),
F.min("col1").alias("col_min"),
F.avg("col1").alias("col_avg"),
F.expr("percentile_approx(col1, 0.5)").alias("col_median"),
F.count("col1").alias("col_count"),
F.countDistinct("col1").alias("col_countDistinct")
)
)
###group by
# aggの前にgroupby()を加える
sdf_agg = (
sdf
.groupby("col1")
.agg(
F.max("col1").alias("col_max")
)
)
#window関数
###windowの定義
w = (
Window
.partitionBy("col1") # 指定しないとpartitionなし(データ全体)
.orderBy("col2")
.rowsBetween(-np.inf, 0) # 指定しないとpartition全体(-inf ~ inf)
)
###基本的な集計関数
# countDistinctはできない
sdf_agg = (
sdf.withColumn("window_sum", F.sum("int_col").over(w)),
sdf.withColumn("window_avg", F.avg("int_col").over(w))
))
###lag
# lagをleadにすれば、N個後の値を取得可能
# 窓の設定
w = Window.partitionBy("id").orderBy("date_col")
sdf_lag=(
sdf
# デフォルトでは1つ前の値を取得し、前の値がないときはnull
.withColumn("lag", F.lag("reviews_date").over(w))
# offsetで何個前の値を取得するか指定
.withColumn("offset_option", F.lag("reviews_date", offset=3).over(w))
# defaultで値を指定するとnullをその値で補完
.withColumn("default_option", F.lag("reviews_date", default="2001-01-01 00:00:00").over(w))
)
sdf_lag.show(5)
+----+------------+----------+-------------+--------------+
| id| date_col| lag|offset_option|default_option|
+----+------------+----------+-------------+--------------+
| A| 2016-06-04| null| null| 2001-01-01|
| A| 2016-06-24|2016-06-04| null| 2016-06-04|
| A| 2016-07-01|2016-06-24| null| 2016-06-24|
| A| 2016-07-08|2016-07-01| 2016-06-04| 2016-07-01|
| A| 2016-07-11|2016-07-08| 2016-06-24| 2016-07-08|
###row_number、rank、dense_rank
# 降順のランクは、windowでdescを設定
w_asc=Window.partitionBy("id").orderBy("date_col")
w_desc=Window.partitionBy("id").orderBy(F.col("date_col").desc())
# row_number、dense_rankも使い方は同じ
sdf_rank=(
sdf
.withColumn("rank_asc", F.rank().over(w_asc))
.withColumn("rank_desc", F.rank().over(w_desc))
)
#日付型の操作
# 使用するサンプルデータ
q = '''
select timestamp('2020-03-15 12:34:56') as start
'''
date_sdf = spark.sql(q)
date_sdf.show()
+-------------------+
| start|
+-------------------+
|2020-03-15 12:34:56|
+-------------------+
###年月日、時分秒の取り出し
q = '''
select timestamp('2020-03-15 12:34:56') as start
'''
date_sdf = spark.sql(q)
get_elements_sdf = (
date_sdf
.withColumn("year",F.year("start"))
.withColumn("month",F.month("start"))
.withColumn("day",F.dayofmonth("start"))
.withColumn("hour",F.hour("start"))
.withColumn("minute",F.minute("start"))
.withColumn("second",F.second("start"))
)
get_elements_sdf.show()
+-------------------+----+-----+---+----+------+------+
| start|year|month|day|hour|minute|second|
+-------------------+----+-----+---+----+------+------+
|2020-03-15 12:34:56|2020| 3| 15| 12| 34| 56|
+-------------------+----+-----+---+----+------+------+
###日付の足し算
add_days_sdf =(
date_sdf
# 日にち
.withColumn("add_date",F.date_add("start",3))
.withColumn("sub_date",F.date_sub("start",-3))
# 月
.withColumn("add_months",F.add_months("start",3))
.withColumn("sub_months",F.add_months("start",-3))
)
add_days_sdf.show()
+-------------------+----------+----------+----------+----------+
| start| add_date| sub_date|add_months|sub_months|
+-------------------+----------+----------+----------+----------+
|2020-03-15 12:34:56|2020-03-18|2020-03-18|2020-06-15|2019-12-15|
+-------------------+----------+----------+----------+----------+
###時間の足し算
add_times_sdf = (
date_sdf
.withColumn('add_hours', F.col("start") + F.expr('INTERVAL 3 HOURS'))
.withColumn('add_minutes', F.col("start") + F.expr('INTERVAL 3 MINUTES'))
.withColumn('add_seconds', F.col("start") + F.expr('INTERVAL 3 SECONDS'))
)
add_times_sdf.show()
+-------------------+-------------------+-------------------+-------------------+
| start| add_hours| add_minutes| add_seconds|
+-------------------+-------------------+-------------------+-------------------+
|2020-03-15 12:34:56|2020-03-15 15:34:56|2020-03-15 12:37:56|2020-03-15 12:34:59|
+-------------------+-------------------+-------------------+-------------------+
###月の最終日
last_day_of_month_sdf = (
date_sdf
.withColumn("last_day",F.last_day("start"))
)
last_day_of_month_sdf.show()
+-------------------+----------+
| start| last_day|
+-------------------+----------+
|2020-03-15 12:34:56|2020-03-31|
+-------------------+----------+
###差分計算(timediff)
q = '''
select timestamp('2020-03-15 12:34:56') as start,
timestamp('2020-03-16 00:35:15') as start
'''
date_sdf2 = spark.sql(q)
time_diff_sdf = (
date_sdf2
#
.withColumn("diff_day",F.datediff("end","start"))
# 一旦unix_timeにしてから
.withColumn("diff_sec",(F.col("end").cast("long") - F.col("start").cast("long")) )
.withColumn("diff_min",(F.col("end").cast("long") - F.col("start").cast("long")) / 60 )
.withColumn("diff_hour",(F.col("end").cast("long") - F.col("start").cast("long")) / 3600 )
)
time_diff_sdf.show()
+-------------------+-------------------+--------+--------+-----------------+------------------+
| start| end|diff_day|diff_sec| diff_min| diff_hour|
+-------------------+-------------------+--------+--------+-----------------+------------------+
|2020-03-15 23:34:56|2020-03-16 00:35:15| 1| 3619|60.31666666666667|1.0052777777777777|
+-------------------+-------------------+--------+--------+-----------------+------------------+
#UDF
自作関数の前に@F.udf
を追記する。引数にはカラム名もしくは定数を入れて使用するが、定数を入れる場合はF.lit
で定数化する必要あり。
# UDFの定義(カラムの値に対して、引数xを足して2倍して返す)
@F.udf(returnType=IntegerType())
def add_x_and_double(col,x):
return (col + x)*2
# サンプルデータ
sdf = spark.createDataFrame(pd.DataFrame({"int_col": [1, 2, 3]}))
# UDFの使用
sdf_udf = (
sdf
# 定数にはF.litが必要
.withColumn("udf_col", add_x_and_double("int_col", F.lit(3)))
)
sdf.show()
+-------+-------+
|int_col|udf_col|
+-------+-------+
| 1| 8|
| 2| 10|
| 3| 12|
+-------+-------+
#SQLでの記述
createTempView()
でデータをviewに登録すればsqlでの記述も可能。
# viewの登録
sdf.createTempView("sdf")
# クエリ
q = '''
-- コメントも可能
SELECT id
,int(str_col1) AS int_col
,timestamp(str_col2) AS date_col
FROM sdf
'''
parsed_sdf = spark.sql(q)
#(おまけ)Google Colaboratoryでsparkを使用できるようにする
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://www-us.apache.org/dist/spark/spark-3.0.2/spark-3.0.2-bin-hadoop2.7.tgz
!tar xf spark-3.0.2-bin-hadoop2.7.tgz
!pip install -q findspark
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.2-bin-hadoop2.7"
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder\
.master("local")\
.appName("Colab")\
.config('spark.ui.port', '4050')\
.getOrCreate()
参考
[PySparkデータ操作]
(https://qiita.com/gsy0911/items/a4cb8b2d54d6341558e0)
[pysparkでデータハンドリングする時によく使うやつメモ]
(https://qiita.com/paulxll/items/1c0833782cd4e1de86e2)