レコード抽出②"最新日抽出"
データフレームにymd項目があるとき、しばしば最新日を抽出したい場面があります。
その際に使用するコードとロジックの組み立てを紹介します。
狙ったレコードを抽出するために、なぜその処理を実行するのかを意識することは重要です。
■ AsIs
- 現状
- 日時情報を持ったデータがあります。
- そのデータから最新日の情報が欲しいです。
- 課題
- 最新日が本日(today)とは限りません。
-
filter(fn.concat("y","m","d") == datetime.today())
のような抽出処理が使えません。
-
- 最新日が本日(today)とは限りません。
■ ToBe
- 論点
- 日時情報は単調増加
- 最新日レコードにおけるymd項目の値は、列内で最大の整数値を取る。
- 日時情報は単調増加
- ネクストアクション
- ymd項目を作成
- Window関数を使用し、ymd項目を降順でランク付け
- ランクが1のレコードを抽出
■ コード
要求仕様
- 最新日のレコードを抽出
from pyspark.sql import functions as fn
from pyspark.sql.window import Window
# 最新日レコードの抽出
latest_df = (
df
# ymd列の生成
.withColumn("ymd", fn.concat(fn.col("y"), fn.col("m"), fn.col("d")))
# ランク付け
.withColumn("rank", fn.row_number().over(Window.orderBy(fn.col("ymd").desc())))
# 最新日のみを抽出
.filter(fn.col("rank") == 1)
)
■ 結論
最新日や最古日を参照する場合、『日時情報は単調増加である』という性質を使用します。
他にも、日時情報には『最新日が本日(today)とは限らない』、『最古日が固定』や『周期性』など、例外はありますが様々な性質が存在します。
狙ったレコードを抽出するために、なぜその処理を実行するのかを意識することは重要です。
そして、なぜその処理を実行するのかを考えるロジカルシンキングがデータエンジニアリング力の向上につながります。