0
0

2024/4/12に翔泳社よりApache Spark徹底入門を出版します!

書籍のサンプルノートブックをウォークスルーしていきます。Python/Chapter04/4-1 Example 4.1となります。

翻訳ノートブックのリポジトリはこちら。

ノートブックはこちら

from pyspark.sql.types import *
from pyspark.sql.functions import *

日付フォーマットを読めるフォーマットに変換するUDFを定義します。

注意
日付は文字列で年が含まれていないため、SQLのyear()関数を使うことが困難な場合があります。

def to_date_format_udf(d_str):
  l = [char for char in d_str]
  return "".join(l[0:2]) + "/" +  "".join(l[2:4]) + " " + " " +"".join(l[4:6]) + ":" + "".join(l[6:])

to_date_format_udf("02190925")
'02/19  09:25'

ユーザー定義関数(UDF: User Defined Function)を登録します。

spark.udf.register("to_date_format_udf", to_date_format_udf, StringType())
<function __main__.to_date_format_udf(d_str)>

USを出発するフライトデータを読み込みます。

df = (spark.read.format("csv")
      .schema("date STRING, delay INT, distance INT, origin STRING, destination STRING")
      .option("header", "true")
      .option("path", "/databricks-datasets/learning-spark-v2/flights/departuredelays.csv")
      .load())

Screenshot 2024-03-27 at 13.32.47.png

UDFをテストします。

df.selectExpr("to_date_format_udf(date) as data_format").show(10, truncate=False)
+------------+
|data_format |
+------------+
|01/01  12:45|
|01/02  06:00|
|01/02  12:45|
|01/02  06:05|
|01/03  12:45|
|01/03  06:05|
|01/04  12:43|
|01/04  06:05|
|01/05  12:45|
|01/05  06:05|
+------------+
only showing top 10 rows

SQLクエリーを実行する一次ビューを作成します。

df.createOrReplaceTempView("us_delay_flights_tbl")

クエリーに都合が良いようにテーブルをキャッシュします。

%sql
CACHE TABLE us_delay_flights_tbl

読みやすくなるようにすべてのdatedate_fmに変換します。

注意
オンザフライで変換するためにUDFを使用しています。

spark.sql("SELECT *, date, to_date_format_udf(date) AS date_fm FROM us_delay_flights_tbl").show(10, truncate=False)
+--------+-----+--------+------+-----------+--------+------------+
|date    |delay|distance|origin|destination|date    |date_fm     |
+--------+-----+--------+------+-----------+--------+------------+
|01011245|6    |602     |ABE   |ATL        |01011245|01/01  12:45|
|01020600|-8   |369     |ABE   |DTW        |01020600|01/02  06:00|
|01021245|-2   |602     |ABE   |ATL        |01021245|01/02  12:45|
|01020605|-4   |602     |ABE   |ATL        |01020605|01/02  06:05|
|01031245|-4   |602     |ABE   |ATL        |01031245|01/03  12:45|
|01030605|0    |602     |ABE   |ATL        |01030605|01/03  06:05|
|01041243|10   |602     |ABE   |ATL        |01041243|01/04  12:43|
|01040605|28   |602     |ABE   |ATL        |01040605|01/04  06:05|
|01051245|88   |602     |ABE   |ATL        |01051245|01/05  12:45|
|01050605|9    |602     |ABE   |ATL        |01050605|01/05  06:05|
+--------+-----+--------+------+-----------+--------+------------+
only showing top 10 rows
spark.sql("SELECT COUNT(*) FROM us_delay_flights_tbl").show()
+--------+
|count(1)|
+--------+
| 1391578|
+--------+

Query 1: 出発地と到着地との距離が1000より大きい全てのフライトを検索

spark.sql("SELECT distance, origin, destination FROM us_delay_flights_tbl WHERE distance > 1000 ORDER BY distance DESC").show(10, truncate=False)
+--------+------+-----------+
|distance|origin|destination|
+--------+------+-----------+
|4330    |HNL   |JFK        |
|4330    |HNL   |JFK        |
|4330    |HNL   |JFK        |
|4330    |HNL   |JFK        |
|4330    |HNL   |JFK        |
|4330    |HNL   |JFK        |
|4330    |HNL   |JFK        |
|4330    |HNL   |JFK        |
|4330    |HNL   |JFK        |
|4330    |HNL   |JFK        |
+--------+------+-----------+
only showing top 10 rows

Sparkデータフレームで行う同じクエリー

df.select("distance", "origin", "destination").where(col("distance") > 1000).orderBy(desc("distance")).show(10, truncate=False)
+--------+------+-----------+
|distance|origin|destination|
+--------+------+-----------+
|4330    |HNL   |JFK        |
|4330    |HNL   |JFK        |
|4330    |HNL   |JFK        |
|4330    |HNL   |JFK        |
|4330    |HNL   |JFK        |
|4330    |HNL   |JFK        |
|4330    |HNL   |JFK        |
|4330    |HNL   |JFK        |
|4330    |HNL   |JFK        |
|4330    |HNL   |JFK        |
+--------+------+-----------+
only showing top 10 rows
df.select("distance", "origin", "destination").where("distance > 1000").orderBy("distance", ascending=False).show(10)
+--------+------+-----------+
|distance|origin|destination|
+--------+------+-----------+
|    4330|   HNL|        JFK|
|    4330|   HNL|        JFK|
|    4330|   HNL|        JFK|
|    4330|   HNL|        JFK|
|    4330|   HNL|        JFK|
|    4330|   HNL|        JFK|
|    4330|   HNL|        JFK|
|    4330|   HNL|        JFK|
|    4330|   HNL|        JFK|
|    4330|   HNL|        JFK|
+--------+------+-----------+
only showing top 10 rows
df.select("distance", "origin", "destination").where("distance > 1000").orderBy(desc("distance")).show(10)
+--------+------+-----------+
|distance|origin|destination|
+--------+------+-----------+
|    4330|   HNL|        JFK|
|    4330|   HNL|        JFK|
|    4330|   HNL|        JFK|
|    4330|   HNL|        JFK|
|    4330|   HNL|        JFK|
|    4330|   HNL|        JFK|
|    4330|   HNL|        JFK|
|    4330|   HNL|        JFK|
|    4330|   HNL|        JFK|
|    4330|   HNL|        JFK|
+--------+------+-----------+
only showing top 10 rows

Query 2: サンフランシスコとシカゴの間で2時間より長い遅延のあったフライトを検索

spark.sql("""
SELECT date, delay, origin, destination 
FROM us_delay_flights_tbl 
WHERE delay > 120 AND ORIGIN = 'SFO' AND DESTINATION = 'ORD' 
ORDER by delay DESC
""").show(10, truncate=False)
+--------+-----+------+-----------+
|date    |delay|origin|destination|
+--------+-----+------+-----------+
|02190925|1638 |SFO   |ORD        |
|01031755|396  |SFO   |ORD        |
|01022330|326  |SFO   |ORD        |
|01051205|320  |SFO   |ORD        |
|01190925|297  |SFO   |ORD        |
|02171115|296  |SFO   |ORD        |
|01071040|279  |SFO   |ORD        |
|01051550|274  |SFO   |ORD        |
|03120730|266  |SFO   |ORD        |
|01261104|258  |SFO   |ORD        |
+--------+-----+------+-----------+
only showing top 10 rows

Query 3: より複雑なクエリー

SQLにおけるより複雑なクエリーとして、目的地に関係なく空港から出発する全てのUSフライトにラベル、high, medium, low, no delaysをつけましょう。

spark.sql("""SELECT delay, origin, destination,
              CASE
                  WHEN delay > 360 THEN 'Very Long Delays'
                  WHEN delay > 120 AND delay < 360 THEN  'Long Delays '
                  WHEN delay > 60 AND delay < 120 THEN  'Short Delays'
                  WHEN delay > 0 and delay < 60  THEN   'Tolerable Delays'
                  WHEN delay = 0 THEN 'No Delays'
                  ELSE 'No Delays'
               END AS Flight_Delays
               FROM us_delay_flights_tbl
               ORDER BY origin, delay DESC""").show(10, truncate=False)
+-----+------+-----------+-------------+
|delay|origin|destination|Flight_Delays|
+-----+------+-----------+-------------+
|333  |ABE   |ATL        |Long Delays  |
|305  |ABE   |ATL        |Long Delays  |
|275  |ABE   |ATL        |Long Delays  |
|257  |ABE   |ATL        |Long Delays  |
|247  |ABE   |ATL        |Long Delays  |
|247  |ABE   |DTW        |Long Delays  |
|219  |ABE   |ORD        |Long Delays  |
|211  |ABE   |ATL        |Long Delays  |
|197  |ABE   |DTW        |Long Delays  |
|192  |ABE   |ORD        |Long Delays  |
+-----+------+-----------+-------------+
only showing top 10 rows
df = spark.sql("""SELECT delay, origin, destination,
              CASE
                  WHEN delay > 360 THEN 'Very Long Delays'
                  WHEN delay > 120 AND delay < 360 THEN  'Long Delays '
                  WHEN delay > 60 AND delay < 120 THEN  'Short Delays'
                  WHEN delay > 0 and delay < 60  THEN   'Tolerable Delays'
                  WHEN delay = 0 THEN 'No Delays'
                  ELSE 'No Delays'
               END AS Flight_Delays
               FROM us_delay_flights_tbl
               ORDER BY origin, delay DESC""")
display(df)

Screenshot 2024-03-27 at 13.38.19.png

その他のクエリー

df1 =  spark.sql("SELECT date, delay, origin, destination FROM us_delay_flights_tbl WHERE origin = 'SFO'")
df1.createOrReplaceGlobalTempView("us_origin_airport_SFO_tmp_view")
%sql
SELECT * FROM global_temp.us_origin_airport_SFO_tmp_view

Screenshot 2024-03-27 at 13.39.18.png

%sql
DROP VIEW IF EXISTS global_temp.us_origin_airport_JFK_tmp_view
df2 = spark.sql("SELECT date, delay, origin, destination from us_delay_flights_tbl WHERE origin = 'JFK'")
df2.createOrReplaceTempView("us_origin_airport_JFK_tmp_view")
%sql
SELECT * FROM us_origin_airport_JFK_tmp_view

Screenshot 2024-03-27 at 13.40.34.png

%sql
DROP VIEW IF EXISTS us_origin_airport_JFK_tmp_view
spark.catalog.listTables(dbName="global_temp")
[Table(name='us_origin_airport_sfo_tmp_view', catalog=None, namespace=['global_temp'], description=None, tableType='TEMPORARY', isTemporary=True),
 Table(name='us_delay_flights_tbl', catalog=None, namespace=[], description=None, tableType='TEMPORARY', isTemporary=True)]

はじめてのDatabricks

はじめてのDatabricks

Databricks無料トライアル

Databricks無料トライアル

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0