LoginSignup
2
1

More than 3 years have passed since last update.

pysparkでDataFrameの欠損値(null)を前後の値で埋める

Last updated at Posted at 2020-08-12

概要

pyspark(Spark SQL)において、pandasにおけるffill(forward fill)やbfill(backward fill)に該当するものはデフォルトでは存在しない。
そのため、近しい処理が必要な場合は自前で工夫する必要がある。(自分用メモ)

参考文献(答え)

上記リンク先のような感じでやれば良いらしい

実践例

参考文献で問題はほぼ解決しているが、一応自分なりに試してみる。

検証環境:

  • Ubuntu18.04(WSL)
  • Python3.8.3
  • pyspark: 3.0.0
  • Java: 1.8.0_192 (Azul Systems, Inc.)

前準備

# 必要ライブラリのimport
import sys
from typing import (
    Union,
    List,
)

import numpy as np
import pandas as pd
import pyspark.sql
from pyspark.sql import SparkSession
from pyspark.sql.functions import (
    col,
    first,
    last,
)
from pyspark.sql.window import Window

# spark sessionの生成
spark = SparkSession.builder.getOrCreate()

numpy, pandasは主にテストデータ作成用に入れています

関数でffill, bfillを実装

def ffill(
    target: str,
    partition: Union[str, List[str]],
    sort_key: str,
) -> pyspark.sql.Column:
    """
    forward fill

    Args:
        target: null値をforward fillする対象のカラム
        partition: レコードをグループ分けするカラム(複数の場合はList)
        sort_key: 順序を決めるためのカラム
    """
    window = Window.partitionBy(partition) \
         .orderBy(sort_key) \
         .rowsBetween(-sys.maxsize, 0)

    filled_column = last(col(target), ignorenulls=True).over(window)

    return filled_column


def bfill(
    target: str,
    partition: Union[str, List[str]],
    sort_key: str,
) -> pyspark.sql.Column:
    """
    backward fill

    Args:
        target: null値をback fillする対象のカラム
        partition: レコードをグループ分けするカラム(複数の場合はList)
        sort_key: 順序を決めるためのカラム
    """
    window = Window.partitionBy(partition) \
         .orderBy(sort_key) \
         .rowsBetween(0, sys.maxsize)

    filled_column = first(col(target), ignorenulls=True).over(window)

    return filled_column

docstringで概要は書いていますが、簡単に補足

  • window
    • partitionで指定したカラムでDataFrameを区切り、sort_keyで指定したカラムでそれぞれのブロックをsort
    • sys.maxsizeは事実上無限大のような意味
    • rowsBetween(start, end)は処理対象のレコードを基準にして、start個前からend個後までのレコードを含む範囲を作る
      • sys.maxsizeを使っているので、ffillの場合は"partitionByで区切った範囲で最初のレコードから処理対象のレコードまで"、bfillの場合は"partitionByで区切った範囲で処理対象のレコードから最後のレコードまで"の範囲を指す
  • filled_column
    • 上記windowによる窓関数を使った処理を全レコードに適用
    • ffill/bfillでlast/firstを使うことにより、windowの範囲で最後/最初のnullでない値を返す
      • nullでないレコードは自身そのものを返す
      • nullであるようなレコードは、windowで作った範囲において最も近い非nullの値が入る

例1

# テストデータ用意
test = pd.DataFrame({
    "id": ['A']*10 + ['B']*10,
    "timestamp": pd.date_range(start="2020-08-12T15:30:00",periods=20, freq='1s'),
    "value": [0, None, None, 3, None, 5, 3, None, None, 2, None, 4, 2, None, None, 9, 2, 8, None, None]
})
df_test = spark.createDataFrame(test) \
    .replace(np.nan, None) # 数値型だと`NaN`が格納されてしまうため、nullに置換

df_test.show()
# +---+-------------------+-----+
# | id|          timestamp|value|
# +---+-------------------+-----+
# |  A|2020-08-12 15:30:00|  0.0|
# |  A|2020-08-12 15:30:01| null|
# |  A|2020-08-12 15:30:02| null|
# |  A|2020-08-12 15:30:03|  3.0|
# |  A|2020-08-12 15:30:04| null|
# |  A|2020-08-12 15:30:05|  5.0|
# |  A|2020-08-12 15:30:06|  3.0|
# |  A|2020-08-12 15:30:07| null|
# |  A|2020-08-12 15:30:08| null|
# |  A|2020-08-12 15:30:09|  2.0|
# |  B|2020-08-12 15:30:10| null|
# |  B|2020-08-12 15:30:11|  4.0|
# |  B|2020-08-12 15:30:12|  2.0|
# |  B|2020-08-12 15:30:13| null|
# |  B|2020-08-12 15:30:14| null|
# |  B|2020-08-12 15:30:15|  9.0|
# |  B|2020-08-12 15:30:16|  2.0|
# |  B|2020-08-12 15:30:17|  8.0|
# |  B|2020-08-12 15:30:18| null|
# |  B|2020-08-12 15:30:19| null|
# +---+-------------------+-----+

# 先程作った関数を使ってnull補完を実践
df_test \
    .withColumn(
    "ffill",
    ffill(target="value", partition="id", sort_key="timestamp")
) \
    .withColumn(
    "bfill",
    bfill(target="value", partition="id", sort_key="timestamp")
) \
    .show()
# +---+-------------------+-----+------------+-------------+
# | id|          timestamp|value|forward fill|backward fill|
# +---+-------------------+-----+------------+-------------+
# |  B|2020-08-12 15:30:10| null|        null|          4.0|
# |  B|2020-08-12 15:30:11|  4.0|         4.0|          4.0|
# |  B|2020-08-12 15:30:12|  2.0|         2.0|          2.0|
# |  B|2020-08-12 15:30:13| null|         2.0|          9.0|
# |  B|2020-08-12 15:30:14| null|         2.0|          9.0|
# |  B|2020-08-12 15:30:15|  9.0|         9.0|          9.0|
# |  B|2020-08-12 15:30:16|  2.0|         2.0|          2.0|
# |  B|2020-08-12 15:30:17|  8.0|         8.0|          8.0|
# |  B|2020-08-12 15:30:18| null|         8.0|         null|
# |  B|2020-08-12 15:30:19| null|         8.0|         null|
# |  A|2020-08-12 15:30:00|  0.0|         0.0|          0.0|
# |  A|2020-08-12 15:30:01| null|         0.0|          3.0|
# |  A|2020-08-12 15:30:02| null|         0.0|          3.0|
# |  A|2020-08-12 15:30:03|  3.0|         3.0|          3.0|
# |  A|2020-08-12 15:30:04| null|         3.0|          5.0|
# |  A|2020-08-12 15:30:05|  5.0|         5.0|          5.0|
# |  A|2020-08-12 15:30:06|  3.0|         3.0|          3.0|
# |  A|2020-08-12 15:30:07| null|         3.0|          2.0|
# |  A|2020-08-12 15:30:08| null|         3.0|          2.0|
# |  A|2020-08-12 15:30:09|  2.0|         2.0|          2.0|
# +---+-------------------+-----+------------+-------------+
  • 若干見づらいが、id: A, Bで区切りになっている
  • idの区切りの中でtimestampでsortしてvalueの欠損値に対してffillとbfillを行っている
    • valueforward fillおよび、backward fillを見比べて下さい

例2

  • 先程の例は一つのカラムでpartitionByしていたので、複数カラムを指定してみる
  • 補完対象のカラムは先程は数値だったが、今度は文字列でもうまくいくことをみてみる
# テストデータ用意(その2)
test2 = pd.DataFrame({
    "key1": ['A']*10 + ['B']*10,
    "key2": [1, 2]*10,
    "timestamp": pd.date_range(start="2020-08-12T15:30:00",periods=20, freq='1s'),
    "value": ["foo", None, None, "bar", None, "hoge", "foofoo", None, None, "foobar", None, "aaa", "bbb", None, None, "ccc", "xxx", "zzz", None, None]
})
df_test2 = spark.createDataFrame(test2)

df_test2.show()
# +----+----+-------------------+------+
# |key1|key2|          timestamp| value|
# +----+----+-------------------+------+
# |   A|   1|2020-08-12 15:30:00|   foo|
# |   A|   2|2020-08-12 15:30:01|  null|
# |   A|   1|2020-08-12 15:30:02|  null|
# |   A|   2|2020-08-12 15:30:03|   bar|
# |   A|   1|2020-08-12 15:30:04|  null|
# |   A|   2|2020-08-12 15:30:05|  hoge|
# |   A|   1|2020-08-12 15:30:06|foofoo|
# |   A|   2|2020-08-12 15:30:07|  null|
# |   A|   1|2020-08-12 15:30:08|  null|
# |   A|   2|2020-08-12 15:30:09|foobar|
# |   B|   1|2020-08-12 15:30:10|  null|
# |   B|   2|2020-08-12 15:30:11|   aaa|
# |   B|   1|2020-08-12 15:30:12|   bbb|
# |   B|   2|2020-08-12 15:30:13|  null|
# |   B|   1|2020-08-12 15:30:14|  null|
# |   B|   2|2020-08-12 15:30:15|   ccc|
# |   B|   1|2020-08-12 15:30:16|   xxx|
# |   B|   2|2020-08-12 15:30:17|   zzz|
# |   B|   1|2020-08-12 15:30:18|  null|
# |   B|   2|2020-08-12 15:30:19|  null|
# +----+----+-------------------+------+

# 先程作った関数を使ってnull補完を実践
df_test2 \
    .withColumn(
    "forward fill",
    ffill(target="value", partition=["key1", "key2"], sort_key="timestamp")
) \
    .withColumn(
    "backward fill",
    bfill(target="value", partition=["key1", "key2"], sort_key="timestamp")
) \
    .show()
# +----+----+-------------------+------+------------+-------------+
# |key1|key2|          timestamp| value|forward fill|backward fill|
# +----+----+-------------------+------+------------+-------------+
# |   B|   1|2020-08-12 15:30:10|  null|        null|          bbb|
# |   B|   1|2020-08-12 15:30:12|   bbb|         bbb|          bbb|
# |   B|   1|2020-08-12 15:30:14|  null|         bbb|          xxx|
# |   B|   1|2020-08-12 15:30:16|   xxx|         xxx|          xxx|
# |   B|   1|2020-08-12 15:30:18|  null|         xxx|         null|
# |   A|   2|2020-08-12 15:30:01|  null|        null|          bar|
# |   A|   2|2020-08-12 15:30:03|   bar|         bar|          bar|
# |   A|   2|2020-08-12 15:30:05|  hoge|        hoge|         hoge|
# |   A|   2|2020-08-12 15:30:07|  null|        hoge|       foobar|
# |   A|   2|2020-08-12 15:30:09|foobar|      foobar|       foobar|
# |   A|   1|2020-08-12 15:30:00|   foo|         foo|          foo|
# |   A|   1|2020-08-12 15:30:02|  null|         foo|       foofoo|
# |   A|   1|2020-08-12 15:30:04|  null|         foo|       foofoo|
# |   A|   1|2020-08-12 15:30:06|foofoo|      foofoo|       foofoo|
# |   A|   1|2020-08-12 15:30:08|  null|      foofoo|         null|
# |   B|   2|2020-08-12 15:30:11|   aaa|         aaa|          aaa|
# |   B|   2|2020-08-12 15:30:13|  null|         aaa|          ccc|
# |   B|   2|2020-08-12 15:30:15|   ccc|         ccc|          ccc|
# |   B|   2|2020-08-12 15:30:17|   zzz|         zzz|          zzz|
# |   B|   2|2020-08-12 15:30:19|  null|         zzz|         null|
# +----+----+-------------------+------+------------+-------------+
  • この例の場合key1key2の組み合わせで区切りが出来ている
  • この場合もtimestampで各区切りがsortされており、valueforward fillおよびbackward fillを見比べるとnullがそれらしく前後の値で補完されている様子が見れる

補足

上の例で作った関数の返り値に関して

display(ffill(target="value", partition="id", sort_key="timestamp"))
# Column<b'last(value, true) OVER (PARTITION BY id ORDER BY timestamp ASC NULLS FIRST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)'>

display(bfill(target="value", partition="id", sort_key="timestamp"))
# Column<b'first(value, true) OVER (PARTITION BY id ORDER BY timestamp ASC NULLS FIRST ROWS BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING)'>

display(ffill(target="value", partition=["key1", "key2"], sort_key="timestamp"))
# Column<b'last(value, true) OVER (PARTITION BY key1, key2 ORDER BY timestamp ASC NULLS FIRST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)'>

display(bfill(target="value", partition=["key1", "key2"], sort_key="timestamp"))
# Column<b'first(value, true) OVER (PARTITION BY key1, key2 ORDER BY timestamp ASC NULLS FIRST ROWS BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING)'>
  • pyspark.sqlの機能を使って、引数を元に適切なSQL文を生成するような感じ
  • SQLにおけるFROM <table><table>に対応するものとしてDataFrameを与えると実際の値を返す
    • DataFrameのselectwithColumnのメソッドと組み合わせて使う
2
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
1