Pythonの for ループ(行ごとの処理)を、Databricks (PySpark) の SQL組み込み関数(列ごとの処理) に置き換える代表的なパターンを4つ紹介します。
1. 条件分岐 (if - elif - else)
Pythonの if-elif-else ループを置き換えます。
- 使う関数: F.when().otherwise()
- 考え方: 全行に対して条件を評価し、真ならA、偽ならBというフラグを一気に立てます。
シナリオ: 売上金額 (amount) に応じてランク付けをする。
❌ 変更前(Pythonの Forループ。dfはPandasのDataFrame)
# 行ごとにPythonのif文で判定している
data = df.collect() # 全データをドライバに集める(危険!)
results = []
for row in data:
amt = row['amount']
if amt >= 10000:
rank = "Gold"
elif amt >= 5000:
rank = "Silver"
else:
rank = "Bronze"
results.append((row['id'], rank))
# 再びDataFrameに戻す
df_result = spark.createDataFrame(results, ["id", "rank"])
✅ 変更後(Spark SQL when / otherwise:dfはSparkのDataFrame)
from pyspark.sql import functions as F
# 全データに対して並列で一気にフラグを立てる
df_result = df.withColumn("rank",
F.when(F.col("amount") >= 10000, "Gold")
.when(F.col("amount") >= 5000, "Silver")
.otherwise("Bronze")
)
2. 文字列の加工・抽出
シナリオ: 「ID_2023_JP」のような文字列から、真ん中の年号「2023」を取り出す。
❌ 変更前(Pythonの Forループ)
# Pythonのsplit関数を使ってループ処理
def extract_year(text):
parts = text.split('_')
return parts[1] if len(parts) > 1 else None
# UDF(ユーザー定義関数)経由で実行(シリアライズ負荷が高い)
from pyspark.sql.functions import udf
extract_year_udf = udf(extract_year)
df_result = df.withColumn("year", extract_year_udf(df["code"]))
✅ 変更後(Spark SQL split)
from pyspark.sql import functions as F
# JVM内部で高速に分割・抽出される
# splitした配列の 2番目(インデックス1) を取得
df_result = df.withColumn("year", F.split(F.col("code"), "_").getItem(1))
3. 配列(リスト)の中身を処理
Pythonの [f(x) for x in list](リスト内包表記)を置き換えます。
- 使う関数: F.transform()
- 考え方: カラムの中に「配列(Array)」が入っている場合、ループを回さずに配列内の全要素を変換できます。
シナリオ: カラムの中に「数値のリスト」が入っていて、そのすべての数字を2倍にしたい。 例: [1, 2, 3] → [2, 4, 6]
❌ 変更前(Pythonの Forループ)
# リスト内包表記を使ったUDF(各行でPythonインタプリタが起動)
@udf("array<int>")
def double_list_values(num_list):
return [x * 2 for x in num_list]
df_result = df.withColumn("doubled_list", double_list_values(df["numbers"]))
✅ 変更後(Spark SQL transform)
高階関数 (Higher-Order Functions) と呼ばれる強力な機能です。配列内のループ処理すらSQL関数だけで完結します。
from pyspark.sql import functions as F
# SQL式の中でラムダ関数 (x -> x * 2) を適用
df_result = df.withColumn("doubled_list", F.transform("numbers", lambda x: x * 2))
4. 前の行との比較(時系列データ)
シナリオ: 日付順に並べて、「前日との差分」を計算する。
- 使う関数: F.lag(), F.lead() (Window関数)
- 考え方: データの「窓(Window)」を定義し、現在の行から見て「1つ前の行」を隣の列に持ってくるイメージで計算します。
❌ 変更前(Pythonの Forループ)
# 配列のインデックス操作(分散処理できないため、データを1箇所に集める必要がある)
local_data = df.sort("date").collect()
diffs = []
for i in range(1, len(local_data)):
diff = local_data[i]['val'] - local_data[i-1]['val']
diffs.append(diff)
✅ 変更後(Spark SQL lag Window関数)
from pyspark.sql import functions as F
from pyspark.sql.window import Window
# 「日付順に並べた窓」を定義
window_spec = Window.orderBy("date")
# 現在の行 - 1つ前の行(lag)
df_result = df.withColumn("diff",
F.col("val") - F.lag("val", 1).over(window_spec)
)
5. 「バラバラのデータを1つにまとめたい(集計)」とき
Pythonの「total = 0; for x in list: total += x」を置き換えます。
- 使う関数: F.sum(), F.collect_list(), F.aggregate()
- 考え方: グループごとにデータを「畳み込む」処理をします。
# グループごとに合計を出す
df.groupBy("user_id").agg(F.sum("amount"))
# 配列内の要素を複雑に計算しながら1つの値にする(reduce的な処理)
df.withColumn("total", F.aggregate("my_array", F.lit(0), lambda acc, x: acc + x))
6. 「特定の条件に合うものだけ取り出したい」とき
Pythonの if condition: results.append(x) を置き換えます。
- 使う関数: F.filter()(または F.array_filter())
- 考え方: 条件に合わない「行」または配列内の「要素」を削ぎ落とします。
# 行を絞り込む
df_filtered = df.filter(F.col("status") == "active")
# 配列の中身を絞り込む(例:100以上の値だけ残す)
df.withColumn("large_values", F.array_filter("my_array", lambda x: x >= 100))
まとめのヒント
- F.when:if 文の代わりに使う。
- F.split / F.regexp_extract:文字列操作のループの代わりに使う。
- F.transform:リスト内包表記の代わりに使う。
- Window関数:前後の行を参照するループの代わりに使う。
- F.sum / F.collect_list / F.aggregate:データを集計したいときに使う
- F.filter / F.array_filter:特定の条件に合うものだけ取り出したいときに使う