Pythonの for ループ(行ごとの処理)を、Databricks (PySpark) の SQL組み込み関数(列ごとの処理) に置き換える代表的なパターンを6つ紹介します。
① 行ごとの計算(for ループ → DataFrame演算)
❌ forループ(非推奨)
Python
result = []
for row in df.collect(): # ← driverに全データが来る
result.append(row.a + row.b)
✅ 組み込み関数(推奨)(dfはSparkのDataFrame)
Python
from pyspark.sql.functions import col
df2 = df.withColumn("sum", col("a") + col("b"))
✔ 分散処理
✔ collect 不要
✔ 数百万行でも安全
② 条件分岐(if/for → when)
❌ for + if
Python
labels = []
for row in df.collect():
if row.score >= 80:
labels.append("A")
else:
labels.append("B")
✅ when(SQL組み込み関数)
Python
from pyspark.sql.functions import when
df2 = df.withColumn(
"label",
when(col("score") >= 80, "A").otherwise("B")
)
③ 集計処理(forで足し算 → groupBy)
❌ forループ集計
Python
total = 0
for row in df.collect():
total += row.sales
✅ 集約関数
Python
from pyspark.sql.functions import sum
df.select(sum("sales")).show()
④ 配列に対する for → 高階関数(超重要)
Databricks では array / map / struct に対して
transform, filter, aggregate が使えます。
④-1 配列要素を加工(map相当)
❌ for
Python
new_arr = []
for x in arr:
new_arr.append(x * 2)
✅ transform
Python
from pyspark.sql.functions import transform
df2 = df.withColumn(
"arr2",
transform("arr", lambda x: x * 2)
)
④-2 条件フィルタ(for + if)
❌ for
Python
filtered = []
for x in arr:
if x > 10:
filtered.append(x)
✅ filter
Python
from pyspark.sql.functions import filter
df2 = df.withColumn(
"filtered",
filter("arr", lambda x: x > 10)
)
④-3 配列の合計(forで累積 → aggregate)
❌ for
Python
s = 0
for x in arr:
s += x
✅ aggregate
Python
from pyspark.sql.functions import aggregate, lit
df2 = df.withColumn(
"sum_arr",
aggregate("arr", lit(0), lambda acc, x: acc + x)
)
⑤ SQLでfor相当を書く(Databricks向き)
Python
df.createOrReplaceTempView("t")
spark.sql("""
SELECT
id,
aggregate(arr, 0, (acc, x) -> acc + x) AS sum_arr
FROM t
""")
✔ Spark SQLは最適化(Catalyst)される
✔ Databricksでは Python for より圧倒的に速い
⑥ for が許されるケース(例外)
以下は OK な for ループです:
- カラム定義を動的生成
- 小さな設定リスト処理
- DataFrame 操作の「外側」
Python
for c in cols:
df = df.withColumn(c, col(c) * 100)
❌ 行データに対する for
⭕ DataFrame操作を組み立てる for
まとめ(重要)
| 処理 | forループ | Databricks推奨 |
|---|---|---|
| 行処理 | ❌ | withColumn |
| 条件分岐 | ❌ | when |
| 集計 | ❌ | groupBy / agg |
| 配列処理 | ❌ | transform / filter / aggregate |
| SQL的処理 | ❌ | Spark SQL |