9
9

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

PySpark データ分析時に使用する操作メモ

Posted at

データ分析時にpysparkで使用する操作をまとめました。
随時更新予定です。

準備

import datetime as dt
import pandas as pd
import numpy as np
from pyspark.sql import functions as F
from pyspark.sql import Window
from pyspark.sql.types import *

#データの読み込み/書き込み
###データ読みこみ

# csv
sdf = spark.read.csv('./file.csv', header=True)
# parquet
sdf = spark.read.parquet('./file.parquet')
# pandas Dataframeから
sdf = spark.createDataFrame(pdf)

###データ書き込み

# csv
sdf.write.csv("./file.csv", mode="overwrite", header=True)
# parquet
sdf.write.parquet("./tmp.parquet", mode="overwrite")

###pandasへの変換

pdf = sdf.toPandas()

#簡単なデータ内容確認

###最初の5行を表示

sdf.show(5)

# カラムが多くて見にくいときは一回Pandasにいれる
sdf.limit(5).toPandas()

###カラム抽出

sdf.select("col1", "col2")

###データの行数、列数

# shapeはない
sdf.count(), len(sdf.columns)

###カラム名と型

sdf.printSchema()

###ソート

sdf_sort = sdf.sort(F.col("col_1"), F.col("col_2").desc())

#データの整形
###カラム名の変更

# 1つずつ
renamed_sdf = (
    sdf
    .withColumnRenamed("col1_before", "col1_after")
    .withColumnRenamed("col2_before", "col2_after")
)

# まとめて(pandasでいう下記の処理)
# pdf.columns = ["col1","col2","col3"]
col_list = ["col1", "col2", "col3"]
renamed_sdf = sdf.toDF(*col_list)

###型変換

# 主要の型:"string", "boolean", "int", "double", "float", "date", "timestamp"
parsed_sdf = (
    sdf
    # str → int
    .withColumn("int_col", F.col("str_col1").cast("int"))
    # str → date
    .withColumn("date_col", F.col("str_col2").cast("date"))
)

###条件抽出

# or条件は「&」の代わりに「|」を使う
filtered_sdf = (
    sdf
    .filter(
        (F.col("str_col") != "abc")
        & (F.col("int_col") >= 4)
        & (F.col("float_col").between(0.5, 1.0))
        & (F.col("date_col").between("2020-3-1", "2020-3-2"))
        # timestampを日付のみで抽出しようとするとその日の00:00:00にcastされる
        # 下記は2020-3-2 00:00:00 までしか含まないので注意
        & (F.col("timestamp_col").between("2020-3-1", "2020-3-2"))
    )
)

###重複削除

sdf_drop_duplicate = sdf.dropDuplicates(subset = ["col1"])

###case文

sdf_case=(
    sdf.withColumn("case_col",
                   F.when(F.col("int_col") >= 80, "A")
                   .when(F.col("int_col") >= 50, "B")
                   .otherwise("C"))
)

###欠損補完

sdf_fillna=(
    .fillna("word", subset=["missing_col1", "missing_col2"])
    # 日付型にfillnaは使えない
    .withColumn("date_col",
                F.when(F.col("date_col").isNull(), dt.date(1900, 1, 1))
                .otherwise(F.col("date_col")))
)

nullではない直前の値での補完は下記。(pandasでのfillna(method='ffill')

# 数値型の場合は一旦strにcastする
w = Window.partitionBy("grp").orderBy("row_num").rowsBetween(-np.inf, 0))
sdf_fillfoward=(
    sdf
    .withColumn("fill_foward", F.last("str_col", ignorenulls=True).over(w)
)

#結合

sdf_join=(
    sdf_left
    .join(sdf_right1, on="key1", how="left")

    # keyが複数
    .join(sdf_right2, on=["key1", "key2"], how="left")

    # keyの名前が異なる
    .join(sdf_right3,
          on=(sdf_left.key1 == sdf_right3.key_1),
          how="left")

    # keyの名前が異なる且つ複数
    .join(sdf_right3,
          on=((sdf_left.key1 == sdf_right3.key_1) & (sdf_left.key2 == sdf_right3.key_2)),
          how="left")
)

結合の結果、key以外のカラム名が重複すると後にエラーが発生する。join後にselectするか、join前にカラム名を変更しておくかで対処する。

# join後にselect
sdf_join=(
    sdf_left
    .join(sdf_right, on="key1")
    .select(
        sdf_left.key1,
        sdf_left.str_col,
        sdf_right.int_col
    )
)

# 予めカラム名にsuffix(接尾後)を付与してからjoin
sdf_left=sdf_left.select(*(F.col(x).alias(x + '_x') for x in sdf_left.columns))
sdf_right=sdf_right.select(*(F.col(x).alias(x + '_y')
                           for x in sdf_right.columns))
sdf_join=(
    sdf_left
    .join(sdf_right, on=(sdf_left.key1_x == sdf_right .key1_y))
)

#集計
###基礎統計量

# count, mean, stddev, min, max,
sdf.describe().show()

###基本的な集計

# 集計後にaliasでカラム名を変更
sdf_agg = (
    sdf
    .agg(
        F.max("col1").alias("col_max"),
        F.min("col1").alias("col_min"),
        F.avg("col1").alias("col_avg"),
        F.expr("percentile_approx(col1, 0.5)").alias("col_median"),
        F.count("col1").alias("col_count"),
        F.countDistinct("col1").alias("col_countDistinct")
    )
)

###group by

# aggの前にgroupby()を加える
sdf_agg = (
    sdf
    .groupby("col1")
    .agg(
        F.max("col1").alias("col_max")
    )
)

#window関数
###windowの定義

w = (
    Window
    .partitionBy("col1")  # 指定しないとpartitionなし(データ全体)
    .orderBy("col2")
    .rowsBetween(-np.inf, 0)  # 指定しないとpartition全体(-inf ~ inf)
)

###基本的な集計関数

# countDistinctはできない
sdf_agg = (
  sdf.withColumn("window_sum", F.sum("int_col").over(w)),
  sdf.withColumn("window_avg", F.avg("int_col").over(w))
))

###lag

# lagをleadにすれば、N個後の値を取得可能

# 窓の設定
w = Window.partitionBy("id").orderBy("date_col")

sdf_lag=(
    sdf
     # デフォルトでは1つ前の値を取得し、前の値がないときはnull
    .withColumn("lag", F.lag("reviews_date").over(w))
     # offsetで何個前の値を取得するか指定
    .withColumn("offset_option", F.lag("reviews_date", offset=3).over(w))
     # defaultで値を指定するとnullをその値で補完
    .withColumn("default_option", F.lag("reviews_date", default="2001-01-01 00:00:00").over(w))
)
sdf_lag.show(5)
+----+------------+----------+-------------+--------------+
|  id|    date_col|       lag|offset_option|default_option|
+----+------------+----------+-------------+--------------+
|   A|  2016-06-04|      null|         null|    2001-01-01|
|   A|  2016-06-24|2016-06-04|         null|    2016-06-04|
|   A|  2016-07-01|2016-06-24|         null|    2016-06-24|
|   A|  2016-07-08|2016-07-01|   2016-06-04|    2016-07-01|
|   A|  2016-07-11|2016-07-08|   2016-06-24|    2016-07-08|

###row_number、rank、dense_rank

# 降順のランクは、windowでdescを設定
w_asc=Window.partitionBy("id").orderBy("date_col")
w_desc=Window.partitionBy("id").orderBy(F.col("date_col").desc())

# row_number、dense_rankも使い方は同じ
sdf_rank=(
    sdf
    .withColumn("rank_asc", F.rank().over(w_asc))
    .withColumn("rank_desc", F.rank().over(w_desc))
)

#日付型の操作

# 使用するサンプルデータ
q = '''
select timestamp('2020-03-15 12:34:56') as start
'''
date_sdf = spark.sql(q)
date_sdf.show()
+-------------------+
|              start|
+-------------------+
|2020-03-15 12:34:56|
+-------------------+

###年月日、時分秒の取り出し

q = '''
select timestamp('2020-03-15 12:34:56') as start
'''
date_sdf = spark.sql(q)

get_elements_sdf = (
    date_sdf
    .withColumn("year",F.year("start"))
    .withColumn("month",F.month("start"))
    .withColumn("day",F.dayofmonth("start"))
    .withColumn("hour",F.hour("start"))
    .withColumn("minute",F.minute("start"))
    .withColumn("second",F.second("start"))
)
get_elements_sdf.show()
+-------------------+----+-----+---+----+------+------+
|              start|year|month|day|hour|minute|second|
+-------------------+----+-----+---+----+------+------+
|2020-03-15 12:34:56|2020|    3| 15|  12|    34|    56|
+-------------------+----+-----+---+----+------+------+

###日付の足し算

add_days_sdf =(
    date_sdf
    # 日にち
    .withColumn("add_date",F.date_add("start",3))
    .withColumn("sub_date",F.date_sub("start",-3))

    # 月
    .withColumn("add_months",F.add_months("start",3))
    .withColumn("sub_months",F.add_months("start",-3))
)
add_days_sdf.show()

+-------------------+----------+----------+----------+----------+
|              start|  add_date|  sub_date|add_months|sub_months|
+-------------------+----------+----------+----------+----------+
|2020-03-15 12:34:56|2020-03-18|2020-03-18|2020-06-15|2019-12-15|
+-------------------+----------+----------+----------+----------+

###時間の足し算

add_times_sdf = (
    date_sdf
    .withColumn('add_hours', F.col("start") + F.expr('INTERVAL 3 HOURS'))
    .withColumn('add_minutes', F.col("start") + F.expr('INTERVAL 3 MINUTES'))
    .withColumn('add_seconds', F.col("start") + F.expr('INTERVAL 3 SECONDS'))
)
add_times_sdf.show()

+-------------------+-------------------+-------------------+-------------------+
|              start|          add_hours|        add_minutes|        add_seconds|
+-------------------+-------------------+-------------------+-------------------+
|2020-03-15 12:34:56|2020-03-15 15:34:56|2020-03-15 12:37:56|2020-03-15 12:34:59|
+-------------------+-------------------+-------------------+-------------------+

###月の最終日

last_day_of_month_sdf = (
    date_sdf
    .withColumn("last_day",F.last_day("start"))
)
last_day_of_month_sdf.show()

+-------------------+----------+
|              start|  last_day|
+-------------------+----------+
|2020-03-15 12:34:56|2020-03-31|
+-------------------+----------+

###差分計算(timediff)

q = '''
select timestamp('2020-03-15 12:34:56') as start,
       timestamp('2020-03-16 00:35:15') as start
'''
date_sdf2 = spark.sql(q)

time_diff_sdf = (
    date_sdf2
    # 
    .withColumn("diff_day",F.datediff("end","start"))
    # 一旦unix_timeにしてから
    .withColumn("diff_sec",(F.col("end").cast("long") - F.col("start").cast("long")) )
    .withColumn("diff_min",(F.col("end").cast("long") - F.col("start").cast("long")) / 60 )
    .withColumn("diff_hour",(F.col("end").cast("long") - F.col("start").cast("long")) / 3600 )
)
time_diff_sdf.show()

+-------------------+-------------------+--------+--------+-----------------+------------------+
|              start|                end|diff_day|diff_sec|         diff_min|         diff_hour|
+-------------------+-------------------+--------+--------+-----------------+------------------+
|2020-03-15 23:34:56|2020-03-16 00:35:15|       1|    3619|60.31666666666667|1.0052777777777777|
+-------------------+-------------------+--------+--------+-----------------+------------------+

#UDF
自作関数の前に@F.udfを追記する。引数にはカラム名もしくは定数を入れて使用するが、定数を入れる場合はF.litで定数化する必要あり。

# UDFの定義(カラムの値に対して、引数xを足して2倍して返す)
@F.udf(returnType=IntegerType())
def add_x_and_double(col,x):
    return (col + x)*2

# サンプルデータ
sdf = spark.createDataFrame(pd.DataFrame({"int_col": [1, 2, 3]}))

# UDFの使用
sdf_udf = (
    sdf
    # 定数にはF.litが必要
    .withColumn("udf_col", add_x_and_double("int_col", F.lit(3)))
)
sdf.show()

+-------+-------+
|int_col|udf_col|
+-------+-------+
|      1|      8|
|      2|     10|
|      3|     12|
+-------+-------+

#SQLでの記述
createTempView()でデータをviewに登録すればsqlでの記述も可能。

# viewの登録
sdf.createTempView("sdf")

# クエリ
q = '''
 -- コメントも可能
SELECT  id 
       ,int(str_col1)       AS int_col 
       ,timestamp(str_col2) AS date_col
FROM sdf
'''
parsed_sdf = spark.sql(q)

#(おまけ)Google Colaboratoryでsparkを使用できるようにする

!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://www-us.apache.org/dist/spark/spark-3.0.2/spark-3.0.2-bin-hadoop2.7.tgz
!tar xf spark-3.0.2-bin-hadoop2.7.tgz
!pip install -q findspark

import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.2-bin-hadoop2.7"

import findspark
findspark.init()
from pyspark.sql import SparkSession

spark = SparkSession.builder\
        .master("local")\
        .appName("Colab")\
        .config('spark.ui.port', '4050')\
        .getOrCreate()

参考

[PySparkデータ操作]
(https://qiita.com/gsy0911/items/a4cb8b2d54d6341558e0)
[pysparkでデータハンドリングする時によく使うやつメモ]
(https://qiita.com/paulxll/items/1c0833782cd4e1de86e2)

9
9
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
9
9

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?