2024/4/12に翔泳社よりApache Spark徹底入門を出版します!
書籍のサンプルノートブックをウォークスルーしていきます。Python/Chapter05/5-1 Spark SQL & UDFs
の後半となります。
翻訳ノートブックのリポジトリはこちら。
ノートブックはこちら
データフレームとSpark SQLの一般的なリレーショナルオペレーター
Spark SQLのパワーは、数多くのデータフレームオペレーションを持っているということです(型なしデータセットオペレーションとも呼ばれます)。
完全なリストについては、Spark SQL, Built-in Functionsをご覧下さい。
次のセクションでは、以下の一般的なリレーショナルオペレーターにフォーカスします:
- UnionとJoin
- ウィンドウ処理
- 変更
from pyspark.sql.functions import expr
# ファイルパスの設定
delays_path = "/databricks-datasets/learning-spark-v2/flights/departuredelays.csv"
airports_path = "/databricks-datasets/learning-spark-v2/flights/airport-codes-na.txt"
# airportsデータセットの取得
airports = spark.read.options(header="true", inferSchema="true", sep="\t").csv(airports_path)
airports.createOrReplaceTempView("airports_na")
# 出発遅延データの取得
delays = spark.read.options(header="true").csv(delays_path)
delays = (delays
.withColumn("delay", expr("CAST(delay as INT) as delay"))
.withColumn("distance", expr("CAST(distance as INT) as distance")))
delays.createOrReplaceTempView("departureDelays")
# 一時的な小さいテーブルを作成
foo = delays.filter(expr("""
origin == 'SEA' AND
destination == 'SFO' AND
date like '01010%' AND
delay > 0"""))
foo.createOrReplaceTempView("foo")
spark.sql("SELECT * FROM airports_na LIMIT 10").show()
+-----------+-----+-------+----+
| City|State|Country|IATA|
+-----------+-----+-------+----+
| Abbotsford| BC| Canada| YXX|
| Aberdeen| SD| USA| ABR|
| Abilene| TX| USA| ABI|
| Akron| OH| USA| CAK|
| Alamosa| CO| USA| ALS|
| Albany| GA| USA| ABY|
| Albany| NY| USA| ALB|
|Albuquerque| NM| USA| ABQ|
| Alexandria| LA| USA| AEX|
| Allentown| PA| USA| ABE|
+-----------+-----+-------+----+
spark.sql("SELECT * FROM departureDelays LIMIT 10").show()
+--------+-----+--------+------+-----------+
| date|delay|distance|origin|destination|
+--------+-----+--------+------+-----------+
|01011245| 6| 602| ABE| ATL|
|01020600| -8| 369| ABE| DTW|
|01021245| -2| 602| ABE| ATL|
|01020605| -4| 602| ABE| ATL|
|01031245| -4| 602| ABE| ATL|
|01030605| 0| 602| ABE| ATL|
|01041243| 10| 602| ABE| ATL|
|01040605| 28| 602| ABE| ATL|
|01051245| 88| 602| ABE| ATL|
|01050605| 9| 602| ABE| ATL|
+--------+-----+--------+------+-----------+
spark.sql("SELECT * FROM foo LIMIT 10").show()
+--------+-----+--------+------+-----------+
| date|delay|distance|origin|destination|
+--------+-----+--------+------+-----------+
|01010710| 31| 590| SEA| SFO|
|01010955| 104| 590| SEA| SFO|
|01010730| 5| 590| SEA| SFO|
+--------+-----+--------+------+-----------+
Union
# 二つのテーブルをUnion: delaysとbarを結合してbarを作成
bar = delays.union(foo)
bar.createOrReplaceTempView("bar")
# barに含まれるfooのデータを表示: 元々fooはdelaysから作っているのでレコードが重複している
bar.filter(expr("origin == 'SEA' AND destination == 'SFO' AND date LIKE '01010%' AND delay > 0")).show()
+--------+-----+--------+------+-----------+
| date|delay|distance|origin|destination|
+--------+-----+--------+------+-----------+
|01010710| 31| 590| SEA| SFO|
|01010955| 104| 590| SEA| SFO|
|01010730| 5| 590| SEA| SFO|
|01010710| 31| 590| SEA| SFO|
|01010955| 104| 590| SEA| SFO|
|01010730| 5| 590| SEA| SFO|
+--------+-----+--------+------+-----------+
# Spark SQLでも確認
spark.sql("""
SELECT *
FROM bar
WHERE origin = 'SEA'
AND destination = 'SFO'
AND date LIKE '01010%'
AND delay > 0
""").show()
+--------+-----+--------+------+-----------+
| date|delay|distance|origin|destination|
+--------+-----+--------+------+-----------+
|01010710| 31| 590| SEA| SFO|
|01010955| 104| 590| SEA| SFO|
|01010730| 5| 590| SEA| SFO|
|01010710| 31| 590| SEA| SFO|
|01010955| 104| 590| SEA| SFO|
|01010730| 5| 590| SEA| SFO|
+--------+-----+--------+------+-----------+
Join
デフォルトはinner join
です。他のオプションinner, cross, outer, full, full_outer, left, left_outer, right, right_outer, left_semi, left_anti
があります。
詳細はこちら:
# 到着遅延データ(foo)とフライト情報をjoin: キーは出発地の空港
foo.join(
airports,
airports.IATA == foo.origin
).select("City", "State", "date", "delay", "distance", "destination").show()
+-------+-----+--------+-----+--------+-----------+
| City|State| date|delay|distance|destination|
+-------+-----+--------+-----+--------+-----------+
|Seattle| WA|01010710| 31| 590| SFO|
|Seattle| WA|01010955| 104| 590| SFO|
|Seattle| WA|01010730| 5| 590| SFO|
+-------+-----+--------+-----+--------+-----------+
spark.sql("""
SELECT a.City, a.State, f.date, f.delay, f.distance, f.destination
FROM foo f
JOIN airports_na a
ON a.IATA = f.origin
""").show()
+-------+-----+--------+-----+--------+-----------+
| City|State| date|delay|distance|destination|
+-------+-----+--------+-----+--------+-----------+
|Seattle| WA|01010710| 31| 590| SFO|
|Seattle| WA|01010955| 104| 590| SFO|
|Seattle| WA|01010730| 5| 590| SFO|
+-------+-----+--------+-----+--------+-----------+
ウィンドウ関数
素晴らしいリファレンス: Introduction Windowing Functions in Spark SQL
そのコアでは、ウィンドウ関数はフレームと呼ばれる行のグループをベースにして、テーブルのすべての入力行の値の戻り値を計算します。すべての入力行は関連づけられるユニークなフレームを持ちます。このウィンドウ関数の特性によって、他の関数よりもパワフルなものとなっており、ユーザーはウィンドウ関数なしには簡潔な方法で表現が困難(あるいは不可能)なさまざまなデータ処理タスクを表現することができます。
spark.sql("DROP TABLE IF EXISTS departureDelaysWindow")
spark.sql("""
CREATE TABLE departureDelaysWindow AS
SELECT origin, destination, sum(delay) as TotalDelays
FROM departureDelays
WHERE origin IN ('SEA', 'SFO', 'JFK')
AND destination IN ('SEA', 'SFO', 'JFK', 'DEN', 'ORD', 'LAX', 'ATL')
GROUP BY origin, destination
""")
spark.sql("""SELECT * FROM departureDelaysWindow""").show()
+------+-----------+-----------+
|origin|destination|TotalDelays|
+------+-----------+-----------+
| JFK| ORD| 5608|
| JFK| SFO| 35619|
| JFK| DEN| 4315|
| JFK| ATL| 12141|
| JFK| SEA| 7856|
| JFK| LAX| 35755|
| SEA| LAX| 9359|
| SFO| ORD| 27412|
| SFO| DEN| 18688|
| SFO| SEA| 17080|
| SEA| SFO| 22293|
| SFO| ATL| 5091|
| SEA| DEN| 13645|
| SEA| ATL| 4535|
| SEA| ORD| 10041|
| SFO| JFK| 24100|
| SFO| LAX| 40798|
| SEA| JFK| 4667|
+------+-----------+-----------+
SEA、SFO、JFKが出発地の合計遅延がトップスリーの目的地はどこでしょうか?
spark.sql("""
SELECT origin, destination, sum(TotalDelays) as TotalDelays
FROM departureDelaysWindow
WHERE origin = 'SEA'
GROUP BY origin, destination
ORDER BY TotalDelays DESC
LIMIT 3
""").show()
+------+-----------+-----------+
|origin|destination|TotalDelays|
+------+-----------+-----------+
| SEA| SFO| 22293|
| SEA| DEN| 13645|
| SEA| ORD| 10041|
+------+-----------+-----------+
spark.sql("""
SELECT origin, destination, TotalDelays, rank
FROM (
SELECT origin, destination, TotalDelays, dense_rank()
OVER (PARTITION BY origin ORDER BY TotalDelays DESC) as rank
FROM departureDelaysWindow
) t
WHERE rank <= 3
""").show()
+------+-----------+-----------+----+
|origin|destination|TotalDelays|rank|
+------+-----------+-----------+----+
| JFK| LAX| 35755| 1|
| JFK| SFO| 35619| 2|
| JFK| ATL| 12141| 3|
| SEA| SFO| 22293| 1|
| SEA| DEN| 13645| 2|
| SEA| ORD| 10041| 3|
| SFO| LAX| 40798| 1|
| SFO| ORD| 27412| 2|
| SFO| JFK| 24100| 3|
+------+-----------+-----------+----+
変更
この他の一般的なデータフレームのオペレーションは、データフレームに対して変更を加えることです。背後にあるRDDは、Sparkオペレーションのデータリネージを保持できるように不変である(つまり、変更不可)ことを思い出してください。このため、データフレーム自体は不変ですが、例えば、異なる列を持つ別のデータフレームを新たに作成するオペレーションを通じて変更を行うことができます。
新規列の追加
foo2 = foo.withColumn("status", expr("CASE WHEN delay <= 10 THEN 'On-time' ELSE 'Delayed' END"))
foo2.show()
+--------+-----+--------+------+-----------+-------+
| date|delay|distance|origin|destination| status|
+--------+-----+--------+------+-----------+-------+
|01010710| 31| 590| SEA| SFO|Delayed|
|01010955| 104| 590| SEA| SFO|Delayed|
|01010730| 5| 590| SEA| SFO|On-time|
+--------+-----+--------+------+-----------+-------+
spark.sql("""SELECT *, CASE WHEN delay <= 10 THEN 'On-time' ELSE 'Delayed' END AS status FROM foo""").show()
+--------+-----+--------+------+-----------+-------+
| date|delay|distance|origin|destination| status|
+--------+-----+--------+------+-----------+-------+
|01010710| 31| 590| SEA| SFO|Delayed|
|01010955| 104| 590| SEA| SFO|Delayed|
|01010730| 5| 590| SEA| SFO|On-time|
+--------+-----+--------+------+-----------+-------+
列の削除
foo3 = foo2.drop("delay")
foo3.show()
+--------+--------+------+-----------+-------+
| date|distance|origin|destination| status|
+--------+--------+------+-----------+-------+
|01010710| 590| SEA| SFO|Delayed|
|01010955| 590| SEA| SFO|Delayed|
|01010730| 590| SEA| SFO|On-time|
+--------+--------+------+-----------+-------+
列名の変更
foo4 = foo3.withColumnRenamed("status", "flight_status")
foo4.show()
+--------+--------+------+-----------+-------------+
| date|distance|origin|destination|flight_status|
+--------+--------+------+-----------+-------------+
|01010710| 590| SEA| SFO| Delayed|
|01010955| 590| SEA| SFO| Delayed|
|01010730| 590| SEA| SFO| On-time|
+--------+--------+------+-----------+-------------+
ピボット
素晴らしいリファレンス SQL Pivot: Converting Rows to Columns
spark.sql("""SELECT destination, CAST(SUBSTRING(date, 0, 2) AS int) AS month, delay FROM departureDelays WHERE origin = 'SEA'""").show(10)
+-----------+-----+-----+
|destination|month|delay|
+-----------+-----+-----+
| ORD| 1| 92|
| JFK| 1| -7|
| DFW| 1| -5|
| MIA| 1| -3|
| DFW| 1| -3|
| DFW| 1| 1|
| ORD| 1| -10|
| DFW| 1| -6|
| DFW| 1| -2|
| ORD| 1| -3|
+-----------+-----+-----+
only showing top 10 rows
spark.sql("""
SELECT * FROM (
SELECT destination, CAST(SUBSTRING(date, 0, 2) AS int) AS month, delay
FROM departureDelays WHERE origin = 'SEA'
)
PIVOT (
CAST(AVG(delay) AS DECIMAL(4, 2)) as AvgDelay, MAX(delay) as MaxDelay
FOR month IN (1 JAN, 2 FEB, 3 MAR)
)
ORDER BY destination
""").show()
+-----------+------------+------------+------------+------------+------------+------------+
|destination|JAN_AvgDelay|JAN_MaxDelay|FEB_AvgDelay|FEB_MaxDelay|MAR_AvgDelay|MAR_MaxDelay|
+-----------+------------+------------+------------+------------+------------+------------+
| ABQ| 19.86| 316| 11.42| 69| 11.47| 74|
| ANC| 4.44| 149| 7.90| 141| 5.10| 187|
| ATL| 11.98| 397| 7.73| 145| 6.53| 109|
| AUS| 3.48| 50| -0.21| 18| 4.03| 61|
| BOS| 7.84| 110| 14.58| 152| 7.78| 119|
| BUR| -2.03| 56| -1.89| 78| 2.01| 108|
| CLE| 16.00| 27| NULL| NULL| NULL| NULL|
| CLT| 2.53| 41| 12.96| 228| 5.16| 110|
| COS| 5.32| 82| 12.18| 203| 9.74| 205|
| CVG| -0.50| 4| NULL| NULL| NULL| NULL|
| DCA| -1.15| 50| 0.07| 34| 5.73| 199|
| DEN| 13.13| 425| 12.95| 625| 7.48| 231|
| DFW| 7.95| 247| 12.57| 356| 6.71| 277|
| DTW| 9.18| 107| 3.47| 77| 2.47| 72|
| EWR| 9.63| 236| 5.20| 212| 10.59| 181|
| FAI| 1.84| 160| 4.21| 60| 5.32| 98|
| FAT| 1.36| 119| 5.22| 232| 1.67| 92|
| FLL| 2.94| 54| 3.50| 40| 3.06| 52|
| GEG| 2.28| 63| 2.87| 60| 4.49| 89|
| HDN| -0.44| 27| -6.50| 0| -3.44| 15|
+-----------+------------+------------+------------+------------+------------+------------+
only showing top 20 rows
spark.sql("""
SELECT * FROM (
SELECT destination, CAST(SUBSTRING(date, 0, 2) AS int) AS month, delay
FROM departureDelays WHERE origin = 'SEA'
)
PIVOT (
CAST(AVG(delay) AS DECIMAL(4, 2)) as AvgDelay, MAX(delay) as MaxDelay
FOR month IN (1 JAN, 2 FEB)
)
ORDER BY destination
""").show()
+-----------+------------+------------+------------+------------+
|destination|JAN_AvgDelay|JAN_MaxDelay|FEB_AvgDelay|FEB_MaxDelay|
+-----------+------------+------------+------------+------------+
| ABQ| 19.86| 316| 11.42| 69|
| ANC| 4.44| 149| 7.90| 141|
| ATL| 11.98| 397| 7.73| 145|
| AUS| 3.48| 50| -0.21| 18|
| BOS| 7.84| 110| 14.58| 152|
| BUR| -2.03| 56| -1.89| 78|
| CLE| 16.00| 27| NULL| NULL|
| CLT| 2.53| 41| 12.96| 228|
| COS| 5.32| 82| 12.18| 203|
| CVG| -0.50| 4| NULL| NULL|
| DCA| -1.15| 50| 0.07| 34|
| DEN| 13.13| 425| 12.95| 625|
| DFW| 7.95| 247| 12.57| 356|
| DTW| 9.18| 107| 3.47| 77|
| EWR| 9.63| 236| 5.20| 212|
| FAI| 1.84| 160| 4.21| 60|
| FAT| 1.36| 119| 5.22| 232|
| FLL| 2.94| 54| 3.50| 40|
| GEG| 2.28| 63| 2.87| 60|
| HDN| -0.44| 27| -6.50| 0|
+-----------+------------+------------+------------+------------+
only showing top 20 rows
Rollup
What is the difference between cube, rollup and groupBy operators?をご覧ください。