概要
pyspark(Spark SQL)において、pandas
におけるffill(forward fill)やbfill(backward fill)に該当するものはデフォルトでは存在しない。
そのため、近しい処理が必要な場合は自前で工夫する必要がある。(自分用メモ)
参考文献(答え)
- https://stackoverflow.com/questions/38131982/forward-fill-missing-values-in-spark-python/50422240
- https://johnpaton.net/posts/forward-fill-spark/
- https://towardsdatascience.com/end-to-end-time-series-interpolation-in-pyspark-filling-the-gap-5ccefc6b7fc9
上記リンク先のような感じでやれば良いらしい
実践例
参考文献で問題はほぼ解決しているが、一応自分なりに試してみる。
検証環境:
- Ubuntu18.04(WSL)
- Python3.8.3
- pyspark: 3.0.0
- Java: 1.8.0_192 (Azul Systems, Inc.)
前準備
# 必要ライブラリのimport
import sys
from typing import (
Union,
List,
)
import numpy as np
import pandas as pd
import pyspark.sql
from pyspark.sql import SparkSession
from pyspark.sql.functions import (
col,
first,
last,
)
from pyspark.sql.window import Window
# spark sessionの生成
spark = SparkSession.builder.getOrCreate()
numpy
, pandas
は主にテストデータ作成用に入れています
関数でffill, bfillを実装
def ffill(
target: str,
partition: Union[str, List[str]],
sort_key: str,
) -> pyspark.sql.Column:
"""
forward fill
Args:
target: null値をforward fillする対象のカラム
partition: レコードをグループ分けするカラム(複数の場合はList)
sort_key: 順序を決めるためのカラム
"""
window = Window.partitionBy(partition) \
.orderBy(sort_key) \
.rowsBetween(-sys.maxsize, 0)
filled_column = last(col(target), ignorenulls=True).over(window)
return filled_column
def bfill(
target: str,
partition: Union[str, List[str]],
sort_key: str,
) -> pyspark.sql.Column:
"""
backward fill
Args:
target: null値をback fillする対象のカラム
partition: レコードをグループ分けするカラム(複数の場合はList)
sort_key: 順序を決めるためのカラム
"""
window = Window.partitionBy(partition) \
.orderBy(sort_key) \
.rowsBetween(0, sys.maxsize)
filled_column = first(col(target), ignorenulls=True).over(window)
return filled_column
docstringで概要は書いていますが、簡単に補足
-
window
:-
partition
で指定したカラムでDataFrameを区切り、sort_key
で指定したカラムでそれぞれのブロックをsort -
sys.maxsize
は事実上無限大のような意味 -
rowsBetween(start, end)
は処理対象のレコードを基準にして、start
個前からend
個後までのレコードを含む範囲を作る-
sys.maxsize
を使っているので、ffillの場合は"partitionByで区切った範囲で最初のレコードから処理対象のレコードまで"、bfillの場合は"partitionByで区切った範囲で処理対象のレコードから最後のレコードまで"の範囲を指す
-
-
-
filled_column
例1
# テストデータ用意
test = pd.DataFrame({
"id": ['A']*10 + ['B']*10,
"timestamp": pd.date_range(start="2020-08-12T15:30:00",periods=20, freq='1s'),
"value": [0, None, None, 3, None, 5, 3, None, None, 2, None, 4, 2, None, None, 9, 2, 8, None, None]
})
df_test = spark.createDataFrame(test) \
.replace(np.nan, None) # 数値型だと`NaN`が格納されてしまうため、nullに置換
df_test.show()
# +---+-------------------+-----+
# | id| timestamp|value|
# +---+-------------------+-----+
# | A|2020-08-12 15:30:00| 0.0|
# | A|2020-08-12 15:30:01| null|
# | A|2020-08-12 15:30:02| null|
# | A|2020-08-12 15:30:03| 3.0|
# | A|2020-08-12 15:30:04| null|
# | A|2020-08-12 15:30:05| 5.0|
# | A|2020-08-12 15:30:06| 3.0|
# | A|2020-08-12 15:30:07| null|
# | A|2020-08-12 15:30:08| null|
# | A|2020-08-12 15:30:09| 2.0|
# | B|2020-08-12 15:30:10| null|
# | B|2020-08-12 15:30:11| 4.0|
# | B|2020-08-12 15:30:12| 2.0|
# | B|2020-08-12 15:30:13| null|
# | B|2020-08-12 15:30:14| null|
# | B|2020-08-12 15:30:15| 9.0|
# | B|2020-08-12 15:30:16| 2.0|
# | B|2020-08-12 15:30:17| 8.0|
# | B|2020-08-12 15:30:18| null|
# | B|2020-08-12 15:30:19| null|
# +---+-------------------+-----+
# 先程作った関数を使ってnull補完を実践
df_test \
.withColumn(
"ffill",
ffill(target="value", partition="id", sort_key="timestamp")
) \
.withColumn(
"bfill",
bfill(target="value", partition="id", sort_key="timestamp")
) \
.show()
# +---+-------------------+-----+------------+-------------+
# | id| timestamp|value|forward fill|backward fill|
# +---+-------------------+-----+------------+-------------+
# | B|2020-08-12 15:30:10| null| null| 4.0|
# | B|2020-08-12 15:30:11| 4.0| 4.0| 4.0|
# | B|2020-08-12 15:30:12| 2.0| 2.0| 2.0|
# | B|2020-08-12 15:30:13| null| 2.0| 9.0|
# | B|2020-08-12 15:30:14| null| 2.0| 9.0|
# | B|2020-08-12 15:30:15| 9.0| 9.0| 9.0|
# | B|2020-08-12 15:30:16| 2.0| 2.0| 2.0|
# | B|2020-08-12 15:30:17| 8.0| 8.0| 8.0|
# | B|2020-08-12 15:30:18| null| 8.0| null|
# | B|2020-08-12 15:30:19| null| 8.0| null|
# | A|2020-08-12 15:30:00| 0.0| 0.0| 0.0|
# | A|2020-08-12 15:30:01| null| 0.0| 3.0|
# | A|2020-08-12 15:30:02| null| 0.0| 3.0|
# | A|2020-08-12 15:30:03| 3.0| 3.0| 3.0|
# | A|2020-08-12 15:30:04| null| 3.0| 5.0|
# | A|2020-08-12 15:30:05| 5.0| 5.0| 5.0|
# | A|2020-08-12 15:30:06| 3.0| 3.0| 3.0|
# | A|2020-08-12 15:30:07| null| 3.0| 2.0|
# | A|2020-08-12 15:30:08| null| 3.0| 2.0|
# | A|2020-08-12 15:30:09| 2.0| 2.0| 2.0|
# +---+-------------------+-----+------------+-------------+
- 若干見づらいが、
id
: A, Bで区切りになっている - 各
id
の区切りの中でtimestamp
でsortしてvalue
の欠損値に対してffillとbfillを行っている-
value
とforward fill
および、backward fill
を見比べて下さい
-
例2
- 先程の例は一つのカラムでpartitionByしていたので、複数カラムを指定してみる
- 補完対象のカラムは先程は数値だったが、今度は文字列でもうまくいくことをみてみる
# テストデータ用意(その2)
test2 = pd.DataFrame({
"key1": ['A']*10 + ['B']*10,
"key2": [1, 2]*10,
"timestamp": pd.date_range(start="2020-08-12T15:30:00",periods=20, freq='1s'),
"value": ["foo", None, None, "bar", None, "hoge", "foofoo", None, None, "foobar", None, "aaa", "bbb", None, None, "ccc", "xxx", "zzz", None, None]
})
df_test2 = spark.createDataFrame(test2)
df_test2.show()
# +----+----+-------------------+------+
# |key1|key2| timestamp| value|
# +----+----+-------------------+------+
# | A| 1|2020-08-12 15:30:00| foo|
# | A| 2|2020-08-12 15:30:01| null|
# | A| 1|2020-08-12 15:30:02| null|
# | A| 2|2020-08-12 15:30:03| bar|
# | A| 1|2020-08-12 15:30:04| null|
# | A| 2|2020-08-12 15:30:05| hoge|
# | A| 1|2020-08-12 15:30:06|foofoo|
# | A| 2|2020-08-12 15:30:07| null|
# | A| 1|2020-08-12 15:30:08| null|
# | A| 2|2020-08-12 15:30:09|foobar|
# | B| 1|2020-08-12 15:30:10| null|
# | B| 2|2020-08-12 15:30:11| aaa|
# | B| 1|2020-08-12 15:30:12| bbb|
# | B| 2|2020-08-12 15:30:13| null|
# | B| 1|2020-08-12 15:30:14| null|
# | B| 2|2020-08-12 15:30:15| ccc|
# | B| 1|2020-08-12 15:30:16| xxx|
# | B| 2|2020-08-12 15:30:17| zzz|
# | B| 1|2020-08-12 15:30:18| null|
# | B| 2|2020-08-12 15:30:19| null|
# +----+----+-------------------+------+
# 先程作った関数を使ってnull補完を実践
df_test2 \
.withColumn(
"forward fill",
ffill(target="value", partition=["key1", "key2"], sort_key="timestamp")
) \
.withColumn(
"backward fill",
bfill(target="value", partition=["key1", "key2"], sort_key="timestamp")
) \
.show()
# +----+----+-------------------+------+------------+-------------+
# |key1|key2| timestamp| value|forward fill|backward fill|
# +----+----+-------------------+------+------------+-------------+
# | B| 1|2020-08-12 15:30:10| null| null| bbb|
# | B| 1|2020-08-12 15:30:12| bbb| bbb| bbb|
# | B| 1|2020-08-12 15:30:14| null| bbb| xxx|
# | B| 1|2020-08-12 15:30:16| xxx| xxx| xxx|
# | B| 1|2020-08-12 15:30:18| null| xxx| null|
# | A| 2|2020-08-12 15:30:01| null| null| bar|
# | A| 2|2020-08-12 15:30:03| bar| bar| bar|
# | A| 2|2020-08-12 15:30:05| hoge| hoge| hoge|
# | A| 2|2020-08-12 15:30:07| null| hoge| foobar|
# | A| 2|2020-08-12 15:30:09|foobar| foobar| foobar|
# | A| 1|2020-08-12 15:30:00| foo| foo| foo|
# | A| 1|2020-08-12 15:30:02| null| foo| foofoo|
# | A| 1|2020-08-12 15:30:04| null| foo| foofoo|
# | A| 1|2020-08-12 15:30:06|foofoo| foofoo| foofoo|
# | A| 1|2020-08-12 15:30:08| null| foofoo| null|
# | B| 2|2020-08-12 15:30:11| aaa| aaa| aaa|
# | B| 2|2020-08-12 15:30:13| null| aaa| ccc|
# | B| 2|2020-08-12 15:30:15| ccc| ccc| ccc|
# | B| 2|2020-08-12 15:30:17| zzz| zzz| zzz|
# | B| 2|2020-08-12 15:30:19| null| zzz| null|
# +----+----+-------------------+------+------------+-------------+
- この例の場合
key1
とkey2
の組み合わせで区切りが出来ている - この場合も
timestamp
で各区切りがsortされており、value
とforward fill
およびbackward fill
を見比べるとnullがそれらしく前後の値で補完されている様子が見れる
補足
上の例で作った関数の返り値に関して
display(ffill(target="value", partition="id", sort_key="timestamp"))
# Column<b'last(value, true) OVER (PARTITION BY id ORDER BY timestamp ASC NULLS FIRST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)'>
display(bfill(target="value", partition="id", sort_key="timestamp"))
# Column<b'first(value, true) OVER (PARTITION BY id ORDER BY timestamp ASC NULLS FIRST ROWS BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING)'>
display(ffill(target="value", partition=["key1", "key2"], sort_key="timestamp"))
# Column<b'last(value, true) OVER (PARTITION BY key1, key2 ORDER BY timestamp ASC NULLS FIRST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)'>
display(bfill(target="value", partition=["key1", "key2"], sort_key="timestamp"))
# Column<b'first(value, true) OVER (PARTITION BY key1, key2 ORDER BY timestamp ASC NULLS FIRST ROWS BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING)'>
-
pyspark.sql
の機能を使って、引数を元に適切なSQL文を生成するような感じ - SQLにおける
FROM <table>
の<table>
に対応するものとしてDataFrameを与えると実際の値を返す- DataFrameの
select
やwithColumn
のメソッドと組み合わせて使う
- DataFrameの