0
0

SparkデータフレームとSpark SQLの一般的なリレーショナルオペレーター

Last updated at Posted at 2024-03-27

2024/4/12に翔泳社よりApache Spark徹底入門を出版します!

書籍のサンプルノートブックをウォークスルーしていきます。Python/Chapter05/5-1 Spark SQL & UDFsの後半となります。

翻訳ノートブックのリポジトリはこちら。

ノートブックはこちら

データフレームとSpark SQLの一般的なリレーショナルオペレーター

Spark SQLのパワーは、数多くのデータフレームオペレーションを持っているということです(型なしデータセットオペレーションとも呼ばれます)。

完全なリストについては、Spark SQL, Built-in Functionsをご覧下さい。

次のセクションでは、以下の一般的なリレーショナルオペレーターにフォーカスします:

  • UnionとJoin
  • ウィンドウ処理
  • 変更
from pyspark.sql.functions import expr

# ファイルパスの設定
delays_path = "/databricks-datasets/learning-spark-v2/flights/departuredelays.csv"
airports_path = "/databricks-datasets/learning-spark-v2/flights/airport-codes-na.txt"

# airportsデータセットの取得
airports = spark.read.options(header="true", inferSchema="true", sep="\t").csv(airports_path)
airports.createOrReplaceTempView("airports_na")

# 出発遅延データの取得
delays = spark.read.options(header="true").csv(delays_path)
delays = (delays
          .withColumn("delay", expr("CAST(delay as INT) as delay"))
          .withColumn("distance", expr("CAST(distance as INT) as distance")))

delays.createOrReplaceTempView("departureDelays")

# 一時的な小さいテーブルを作成
foo = delays.filter(expr("""
            origin == 'SEA' AND 
            destination == 'SFO' AND 
            date like '01010%' AND 
            delay > 0"""))

foo.createOrReplaceTempView("foo")
spark.sql("SELECT * FROM airports_na LIMIT 10").show()
+-----------+-----+-------+----+
|       City|State|Country|IATA|
+-----------+-----+-------+----+
| Abbotsford|   BC| Canada| YXX|
|   Aberdeen|   SD|    USA| ABR|
|    Abilene|   TX|    USA| ABI|
|      Akron|   OH|    USA| CAK|
|    Alamosa|   CO|    USA| ALS|
|     Albany|   GA|    USA| ABY|
|     Albany|   NY|    USA| ALB|
|Albuquerque|   NM|    USA| ABQ|
| Alexandria|   LA|    USA| AEX|
|  Allentown|   PA|    USA| ABE|
+-----------+-----+-------+----+
spark.sql("SELECT * FROM departureDelays LIMIT 10").show()
+--------+-----+--------+------+-----------+
|    date|delay|distance|origin|destination|
+--------+-----+--------+------+-----------+
|01011245|    6|     602|   ABE|        ATL|
|01020600|   -8|     369|   ABE|        DTW|
|01021245|   -2|     602|   ABE|        ATL|
|01020605|   -4|     602|   ABE|        ATL|
|01031245|   -4|     602|   ABE|        ATL|
|01030605|    0|     602|   ABE|        ATL|
|01041243|   10|     602|   ABE|        ATL|
|01040605|   28|     602|   ABE|        ATL|
|01051245|   88|     602|   ABE|        ATL|
|01050605|    9|     602|   ABE|        ATL|
+--------+-----+--------+------+-----------+
spark.sql("SELECT * FROM foo LIMIT 10").show()
+--------+-----+--------+------+-----------+
|    date|delay|distance|origin|destination|
+--------+-----+--------+------+-----------+
|01010710|   31|     590|   SEA|        SFO|
|01010955|  104|     590|   SEA|        SFO|
|01010730|    5|     590|   SEA|        SFO|
+--------+-----+--------+------+-----------+

Union

# 二つのテーブルをUnion: delaysとbarを結合してbarを作成
bar = delays.union(foo)
bar.createOrReplaceTempView("bar")
# barに含まれるfooのデータを表示: 元々fooはdelaysから作っているのでレコードが重複している
bar.filter(expr("origin == 'SEA' AND destination == 'SFO' AND date LIKE '01010%' AND delay > 0")).show()
+--------+-----+--------+------+-----------+
|    date|delay|distance|origin|destination|
+--------+-----+--------+------+-----------+
|01010710|   31|     590|   SEA|        SFO|
|01010955|  104|     590|   SEA|        SFO|
|01010730|    5|     590|   SEA|        SFO|
|01010710|   31|     590|   SEA|        SFO|
|01010955|  104|     590|   SEA|        SFO|
|01010730|    5|     590|   SEA|        SFO|
+--------+-----+--------+------+-----------+
# Spark SQLでも確認
spark.sql("""
SELECT * 
FROM bar 
WHERE origin = 'SEA' 
   AND destination = 'SFO' 
   AND date LIKE '01010%' 
   AND delay > 0
""").show()
+--------+-----+--------+------+-----------+
|    date|delay|distance|origin|destination|
+--------+-----+--------+------+-----------+
|01010710|   31|     590|   SEA|        SFO|
|01010955|  104|     590|   SEA|        SFO|
|01010730|    5|     590|   SEA|        SFO|
|01010710|   31|     590|   SEA|        SFO|
|01010955|  104|     590|   SEA|        SFO|
|01010730|    5|     590|   SEA|        SFO|
+--------+-----+--------+------+-----------+

Join

デフォルトはinner joinです。他のオプションinner, cross, outer, full, full_outer, left, left_outer, right, right_outer, left_semi, left_antiがあります。

詳細はこちら:

# 到着遅延データ(foo)とフライト情報をjoin: キーは出発地の空港
foo.join(
  airports, 
  airports.IATA == foo.origin
).select("City", "State", "date", "delay", "distance", "destination").show()
+-------+-----+--------+-----+--------+-----------+
|   City|State|    date|delay|distance|destination|
+-------+-----+--------+-----+--------+-----------+
|Seattle|   WA|01010710|   31|     590|        SFO|
|Seattle|   WA|01010955|  104|     590|        SFO|
|Seattle|   WA|01010730|    5|     590|        SFO|
+-------+-----+--------+-----+--------+-----------+
spark.sql("""
SELECT a.City, a.State, f.date, f.delay, f.distance, f.destination 
  FROM foo f
  JOIN airports_na a
    ON a.IATA = f.origin
""").show()
+-------+-----+--------+-----+--------+-----------+
|   City|State|    date|delay|distance|destination|
+-------+-----+--------+-----+--------+-----------+
|Seattle|   WA|01010710|   31|     590|        SFO|
|Seattle|   WA|01010955|  104|     590|        SFO|
|Seattle|   WA|01010730|    5|     590|        SFO|
+-------+-----+--------+-----+--------+-----------+

ウィンドウ関数

素晴らしいリファレンス: Introduction Windowing Functions in Spark SQL

そのコアでは、ウィンドウ関数はフレームと呼ばれる行のグループをベースにして、テーブルのすべての入力行の値の戻り値を計算します。すべての入力行は関連づけられるユニークなフレームを持ちます。このウィンドウ関数の特性によって、他の関数よりもパワフルなものとなっており、ユーザーはウィンドウ関数なしには簡潔な方法で表現が困難(あるいは不可能)なさまざまなデータ処理タスクを表現することができます。

spark.sql("DROP TABLE IF EXISTS departureDelaysWindow")
spark.sql("""
CREATE TABLE departureDelaysWindow AS
SELECT origin, destination, sum(delay) as TotalDelays 
  FROM departureDelays 
 WHERE origin IN ('SEA', 'SFO', 'JFK') 
   AND destination IN ('SEA', 'SFO', 'JFK', 'DEN', 'ORD', 'LAX', 'ATL') 
 GROUP BY origin, destination
""")

spark.sql("""SELECT * FROM departureDelaysWindow""").show()
+------+-----------+-----------+
|origin|destination|TotalDelays|
+------+-----------+-----------+
|   JFK|        ORD|       5608|
|   JFK|        SFO|      35619|
|   JFK|        DEN|       4315|
|   JFK|        ATL|      12141|
|   JFK|        SEA|       7856|
|   JFK|        LAX|      35755|
|   SEA|        LAX|       9359|
|   SFO|        ORD|      27412|
|   SFO|        DEN|      18688|
|   SFO|        SEA|      17080|
|   SEA|        SFO|      22293|
|   SFO|        ATL|       5091|
|   SEA|        DEN|      13645|
|   SEA|        ATL|       4535|
|   SEA|        ORD|      10041|
|   SFO|        JFK|      24100|
|   SFO|        LAX|      40798|
|   SEA|        JFK|       4667|
+------+-----------+-----------+

SEA、SFO、JFKが出発地の合計遅延がトップスリーの目的地はどこでしょうか?

spark.sql("""
SELECT origin, destination, sum(TotalDelays) as TotalDelays
 FROM departureDelaysWindow
WHERE origin = 'SEA'
GROUP BY origin, destination
ORDER BY TotalDelays DESC
LIMIT 3
""").show()
+------+-----------+-----------+
|origin|destination|TotalDelays|
+------+-----------+-----------+
|   SEA|        SFO|      22293|
|   SEA|        DEN|      13645|
|   SEA|        ORD|      10041|
+------+-----------+-----------+
spark.sql("""
SELECT origin, destination, TotalDelays, rank 
  FROM ( 
     SELECT origin, destination, TotalDelays, dense_rank() 
       OVER (PARTITION BY origin ORDER BY TotalDelays DESC) as rank 
       FROM departureDelaysWindow
  ) t 
 WHERE rank <= 3
""").show()
+------+-----------+-----------+----+
|origin|destination|TotalDelays|rank|
+------+-----------+-----------+----+
|   JFK|        LAX|      35755|   1|
|   JFK|        SFO|      35619|   2|
|   JFK|        ATL|      12141|   3|
|   SEA|        SFO|      22293|   1|
|   SEA|        DEN|      13645|   2|
|   SEA|        ORD|      10041|   3|
|   SFO|        LAX|      40798|   1|
|   SFO|        ORD|      27412|   2|
|   SFO|        JFK|      24100|   3|
+------+-----------+-----------+----+

変更

この他の一般的なデータフレームのオペレーションは、データフレームに対して変更を加えることです。背後にあるRDDは、Sparkオペレーションのデータリネージを保持できるように不変である(つまり、変更不可)ことを思い出してください。このため、データフレーム自体は不変ですが、例えば、異なる列を持つ別のデータフレームを新たに作成するオペレーションを通じて変更を行うことができます。

新規列の追加

foo2 = foo.withColumn("status", expr("CASE WHEN delay <= 10 THEN 'On-time' ELSE 'Delayed' END"))
foo2.show()
+--------+-----+--------+------+-----------+-------+
|    date|delay|distance|origin|destination| status|
+--------+-----+--------+------+-----------+-------+
|01010710|   31|     590|   SEA|        SFO|Delayed|
|01010955|  104|     590|   SEA|        SFO|Delayed|
|01010730|    5|     590|   SEA|        SFO|On-time|
+--------+-----+--------+------+-----------+-------+
spark.sql("""SELECT *, CASE WHEN delay <= 10 THEN 'On-time' ELSE 'Delayed' END AS status FROM foo""").show()
+--------+-----+--------+------+-----------+-------+
|    date|delay|distance|origin|destination| status|
+--------+-----+--------+------+-----------+-------+
|01010710|   31|     590|   SEA|        SFO|Delayed|
|01010955|  104|     590|   SEA|        SFO|Delayed|
|01010730|    5|     590|   SEA|        SFO|On-time|
+--------+-----+--------+------+-----------+-------+

列の削除

foo3 = foo2.drop("delay")
foo3.show()
+--------+--------+------+-----------+-------+
|    date|distance|origin|destination| status|
+--------+--------+------+-----------+-------+
|01010710|     590|   SEA|        SFO|Delayed|
|01010955|     590|   SEA|        SFO|Delayed|
|01010730|     590|   SEA|        SFO|On-time|
+--------+--------+------+-----------+-------+

列名の変更

foo4 = foo3.withColumnRenamed("status", "flight_status")
foo4.show()
+--------+--------+------+-----------+-------------+
|    date|distance|origin|destination|flight_status|
+--------+--------+------+-----------+-------------+
|01010710|     590|   SEA|        SFO|      Delayed|
|01010955|     590|   SEA|        SFO|      Delayed|
|01010730|     590|   SEA|        SFO|      On-time|
+--------+--------+------+-----------+-------------+

ピボット

素晴らしいリファレンス SQL Pivot: Converting Rows to Columns

spark.sql("""SELECT destination, CAST(SUBSTRING(date, 0, 2) AS int) AS month, delay FROM departureDelays WHERE origin = 'SEA'""").show(10)
+-----------+-----+-----+
|destination|month|delay|
+-----------+-----+-----+
|        ORD|    1|   92|
|        JFK|    1|   -7|
|        DFW|    1|   -5|
|        MIA|    1|   -3|
|        DFW|    1|   -3|
|        DFW|    1|    1|
|        ORD|    1|  -10|
|        DFW|    1|   -6|
|        DFW|    1|   -2|
|        ORD|    1|   -3|
+-----------+-----+-----+
only showing top 10 rows
spark.sql("""
SELECT * FROM (
SELECT destination, CAST(SUBSTRING(date, 0, 2) AS int) AS month, delay 
  FROM departureDelays WHERE origin = 'SEA' 
) 
PIVOT (
  CAST(AVG(delay) AS DECIMAL(4, 2)) as AvgDelay, MAX(delay) as MaxDelay
  FOR month IN (1 JAN, 2 FEB, 3 MAR)
)
ORDER BY destination
""").show()
+-----------+------------+------------+------------+------------+------------+------------+
|destination|JAN_AvgDelay|JAN_MaxDelay|FEB_AvgDelay|FEB_MaxDelay|MAR_AvgDelay|MAR_MaxDelay|
+-----------+------------+------------+------------+------------+------------+------------+
|        ABQ|       19.86|         316|       11.42|          69|       11.47|          74|
|        ANC|        4.44|         149|        7.90|         141|        5.10|         187|
|        ATL|       11.98|         397|        7.73|         145|        6.53|         109|
|        AUS|        3.48|          50|       -0.21|          18|        4.03|          61|
|        BOS|        7.84|         110|       14.58|         152|        7.78|         119|
|        BUR|       -2.03|          56|       -1.89|          78|        2.01|         108|
|        CLE|       16.00|          27|        NULL|        NULL|        NULL|        NULL|
|        CLT|        2.53|          41|       12.96|         228|        5.16|         110|
|        COS|        5.32|          82|       12.18|         203|        9.74|         205|
|        CVG|       -0.50|           4|        NULL|        NULL|        NULL|        NULL|
|        DCA|       -1.15|          50|        0.07|          34|        5.73|         199|
|        DEN|       13.13|         425|       12.95|         625|        7.48|         231|
|        DFW|        7.95|         247|       12.57|         356|        6.71|         277|
|        DTW|        9.18|         107|        3.47|          77|        2.47|          72|
|        EWR|        9.63|         236|        5.20|         212|       10.59|         181|
|        FAI|        1.84|         160|        4.21|          60|        5.32|          98|
|        FAT|        1.36|         119|        5.22|         232|        1.67|          92|
|        FLL|        2.94|          54|        3.50|          40|        3.06|          52|
|        GEG|        2.28|          63|        2.87|          60|        4.49|          89|
|        HDN|       -0.44|          27|       -6.50|           0|       -3.44|          15|
+-----------+------------+------------+------------+------------+------------+------------+
only showing top 20 rows
spark.sql("""
SELECT * FROM (
SELECT destination, CAST(SUBSTRING(date, 0, 2) AS int) AS month, delay 
  FROM departureDelays WHERE origin = 'SEA' 
) 
PIVOT (
  CAST(AVG(delay) AS DECIMAL(4, 2)) as AvgDelay, MAX(delay) as MaxDelay
  FOR month IN (1 JAN, 2 FEB)
)
ORDER BY destination
""").show()
+-----------+------------+------------+------------+------------+
|destination|JAN_AvgDelay|JAN_MaxDelay|FEB_AvgDelay|FEB_MaxDelay|
+-----------+------------+------------+------------+------------+
|        ABQ|       19.86|         316|       11.42|          69|
|        ANC|        4.44|         149|        7.90|         141|
|        ATL|       11.98|         397|        7.73|         145|
|        AUS|        3.48|          50|       -0.21|          18|
|        BOS|        7.84|         110|       14.58|         152|
|        BUR|       -2.03|          56|       -1.89|          78|
|        CLE|       16.00|          27|        NULL|        NULL|
|        CLT|        2.53|          41|       12.96|         228|
|        COS|        5.32|          82|       12.18|         203|
|        CVG|       -0.50|           4|        NULL|        NULL|
|        DCA|       -1.15|          50|        0.07|          34|
|        DEN|       13.13|         425|       12.95|         625|
|        DFW|        7.95|         247|       12.57|         356|
|        DTW|        9.18|         107|        3.47|          77|
|        EWR|        9.63|         236|        5.20|         212|
|        FAI|        1.84|         160|        4.21|          60|
|        FAT|        1.36|         119|        5.22|         232|
|        FLL|        2.94|          54|        3.50|          40|
|        GEG|        2.28|          63|        2.87|          60|
|        HDN|       -0.44|          27|       -6.50|           0|
+-----------+------------+------------+------------+------------+
only showing top 20 rows

Rollup

What is the difference between cube, rollup and groupBy operators?をご覧ください。

はじめてのDatabricks

はじめてのDatabricks

Databricks無料トライアル

Databricks無料トライアル

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0