下記のようなWindow関数に対して、Pysparkで書く場合の備忘メモを残します。
SQL.sql
SELECT
*,
ROW_NUMBER() OVER (PARTITION BY department ORDER BY salary DESC) AS row_num
FROM employees;
pysparkにすると下記のような形になります。
これはつまり、Window関数を呼び出して、ナンバリングしてるってわけ。
python.py
# 3. ウィンドウ定義(部署ごとに分けて、給料で降順に並べる)
window_spec = Window.partitionBy("department").orderBy(df["salary"].desc())
# 4. row_numberを追加
df_with_rownum = df.withColumn("row_num", row_number().over(window_spec))
ここでoverはこの範囲でWindow関数を適用するという意味で、row_numberを適用する範囲を決めています。
ちなみにこのような処理の裏ではSparkの分散処理によって複数のノードで分散処理が実施されています。