2
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

Spark のJoinと実行計画を確認してみた

Last updated at Posted at 2023-05-30

背景・目的

こちらのJoin Strategies in Apache Spark— Deep Diveや、Apache Spark Join Strategiesによると、Sparkでは、内部結合、外部結合、左結合、右結合、または半結合などジョインを行う際に全く異なる方法で処理される。結合時にさざまな戦略を採用しているとのこと。
本書では、Sparkのジョインにはどのようなものがあるのか。実行計画と時間がどの様に変わるのかを確認します。

まとめ

Sparkのジョインは、下記の違いがあります。

  • Broadcast joinは、片方のデータセット全体を各Executorにばらまきジョインする
  • shuffle sort merge joinは、両方のデータセットをキーごとにシャッフルし、ソート後にジョインする。
  • shuffle hash joinは、両方のデータセットをキーごとにシャッフルし、ハッシュテーブルを作成しジョインする
  • shuffle-and-replicate nested loop joinは、両方のデータセットをキーごとにシャッフルし、Nested Loopしジョインする。

概要

Sparkにおけるジョイン

Sparkでは、内部結合、外部結合、左結合、右結合、または半結合などジョインを行う際に全く異なる方法で処理される。
結合時にさざまな戦略を採用している。これらの戦略の内部について知ることは、Sparkのアプリケーションのパフォーマンスを向上させることに役立つ。

結合操作に影響を与える要因

  • Dataset Size
    • 結合に使用するデータセットのサイズは、結合操作のパフォーマンスに直接影響します。
  • Condition of Join
    • 結合条件は、データセット内のフィールドの論理比較から派生。 これらの条件は、等価条件 (=) または非等価条件 (>、<、≧、≤、<>) に分類できる。
  • Type of Join
    • 内部結合、外部結合、セミ結合、またはクロス結合に分類される結合のタイプがある

結合のための戦略

Sparkでは、上記の要素に基づいて、結合実行のための以下の戦略を作成した。

  • Broadcast Hash Join
  • Shuffle Hash Join
  • Shuffle Sort Merge Join
  • Cartesian Join
  • Broadcast Nested Loop Join

Broadcast Hash Join

ブロードキャスト ハッシュ結合は、より小さいデータセットをワーカーノードにブロードキャストするという単純な戦略を採用している。これによりシャッフル コストが節約される。
このタイプの結合は、データセットの 1 つが巨大で、もう 1 つが通常 10 MB (デフォルト) 未満の小さい場合に非常に役立つとのこと。

ブロードキャストされたデータセットは、ドライバーノードとエグゼキューターノードが利用される。挙動は下記の通りです。

  1. ドライバーはまずエグゼキュータ側からデータセットを取得
  2. 次に、より大きなデータセットのパーティションが存在するすべてのワーカー ノードにデータセットをブロードキャストする。
  • Full Outer Joinを除く等価結合のタイプがサポートされています。
  • spark.sql.autoBroadcastJoinThreshold (デフォルト値 10485760(10 MB)は最大は 8GB まで
  • テーブルはドライバーノードだけでなくエグゼキューターノードにもキャッシュされる。
  • 大規模なテーブルがブロードキャストされると、ネットワークに負荷がかかる操作となり、パフォーマンスの低下につながる。

Shuffle Hash Join

シャッフル ハッシュ結合には、シャッフル フェーズとハッシュ結合フェーズという 2 フェーズのプロセスが含まれる。 同じ結合キーを持つデータセットは同じ実行ノードに移動され、その後実行ノード上で小さい方のテーブルのハッシュ テーブルを作成し、ハッシュ結合を適用する。

  • 結合キーはソート可能である必要はない。
  • Full Outer Joinを除く等価結合条件と結合タイプがサポートされる
  • 結合操作に参加するデータセット上でシャッフルとハッシュ テーブルの作成を伴うため、コストがかかる結合
  • この結合を有効にするには、Shuffle Sort Merge Join を false に設定する必要がある。
    • spark.sql.join.preferSortMergeJoin=false (Default value is true)

Shuffle Sort Merge Join

シャッフル ソート マージ ジョインは、シャッフルとソートマージのフェーズが含まれる。同じ結合キーを持つデータセットは同じ実行ノードに移動され、実行ノード上でノード上のデータセットパーティションが結合キーによって並べ替えられ、結合キーに基づいてマージされる。

  • Spark 2.3 以降の Apache Spark のデフォルトの結合戦略。
  • これは、spark.sql.join.preferSortMergeJoin=false を使用して無効にできる。
  • 結合キーは並べ替え可能である必要がある。
  • すべての結合タイプがサポートされている。

Cartesian Join(デカルト結合)

参加しているデータセットが結合キー (条件付き) を指定していない場合は、デカルト積戦略が選択される。

  • 内部結合タイプのみがサポート
  • 等価結合条件と非等価結合条件をサポート

Broadcast Nested Loop Join

この結合戦略は、適切な結合メカニズムを選択できない場合に選択される。 つまり、結合条件とヒント タイプが指定されていない場合、この結合が選択される。

  • 等価結合条件と非等価結合条件をサポート
  • すべての結合タイプがサポート
  • 非常にコストのかかる結合、Spark はブロードキャストできる適切なデータセットを探して結合を自動的に最適化する。

Spark Join Selection Strategy

Case-I) Equivalence Join Condition

等価結合条件の時には、次の順序でヒントを確認する。

  1. Broadcast Hash Join ヒント — 結合タイプがサポートされている場合は、ブロードキャスト ハッシュ結合を選択します。
  2. Sort Merge のヒント — 結合キーが並べ替え可能な場合は、並べ替えと結合を選択して結合します。
  3. Shuffle Hash Join ヒント — 結合タイプがサポートされている場合は、シャッフル ハッシュ結合を選択します。
  4. Shuffle Replicate NL Hint — 結合タイプが inner like の場合はデカルト積を選択します。

該当するヒントがない場合、次の順序でヒントを確認する。

  1. データセットの 1 つがブロードキャストできるほど小さく、結合タイプがサポートされている場合は、ブロードキャスト ハッシュ結合を選択します。
  2. 結合キーがソート可能な場合は、「Sort Merge Join」を選択します。
  3. データセットの 1 つがハッシュ テーブルを構築できるほど小さく、spark.sql.join.preferSortMergeJoin=false の場合は、[シャッフル ハッシュ結合] を選択します。
  4. 結合タイプが内部タイプの場合は、デカルト積結合を選択します。
  5. どのオプションも満たさない場合は、ブロードキャスト ネスト ループ結合を呼び出します。

Case-II) Non-Equivalence Join Conditions

次の順序で結合ヒントを確認してください。

  1. ブロードキャスト ヒント — ブロードキャストのネストされたループ結合を選択します。
  2. Shuffle Replicate NL Hint — 結合タイプが内部の場合はデカルト積を選択します。

実践

事前準備

サンプルデータベースのデータセットを使用します。

category

# category
categoryScehama = T.StructType([
    T.StructField('catid', T.LongType()),
    T.StructField('catgroup', T.StringType()),
    T.StructField('catname', T.StringType()),
    T.StructField('catdesc', T.StringType())
])
categoryDF = sparkSession.read.schema(categoryScehama).csv("s3://{0}/input/join/category/".format(bucketName))
categoryDF.show()
categoryDF.printSchema()

===
+-----+--------+---------+--------------------+
|catid|catgroup|  catname|             catdesc|
+-----+--------+---------+--------------------+
|    1|  Sports|      MLB|Major League Base...|
|    2|  Sports|      NHL|National Hockey L...|
|    3|  Sports|      NFL|National Football...|
|    4|  Sports|      NBA|National Basketba...|
|    5|  Sports|      MLS| Major League Soccer|
|    6|   Shows| Musicals|     Musical theatre|
|    7|   Shows|    Plays|All non-musical t...|
|    8|   Shows|    Opera|All opera and lig...|
|    9|Concerts|      Pop|All rock and pop ...|
|   10|Concerts|     Jazz|All jazz singers ...|
|   11|Concerts|Classical|        All symphony|
+-----+--------+---------+--------------------+

root
 |-- catid: long (nullable = true)
 |-- catgroup: string (nullable = true)
 |-- catname: string (nullable = true)
 |-- catdesc: string (nullable = true)

date

# date
dateScehama = T.StructType([
    T.StructField('dateid', T.LongType()),
    T.StructField('caldate', T.StringType()),
    T.StructField('day', T.StringType()),
    T.StructField('week', T.LongType()),
    T.StructField('month', T.StringType()),
    T.StructField('qtr', T.StringType()),
    T.StructField('year', T.LongType()),
    T.StructField('holiday', T.BooleanType())
])
dateDF = sparkSession.read.schema(dateScehama).csv("s3://{0}/input/join/date/".format(bucketName))
dateDF.show()
dateDF.printSchema()
===
record count[365]
+------+----------+---+----+-----+---+----+-------+
|dateid|   caldate|day|week|month|qtr|year|holiday|
+------+----------+---+----+-----+---+----+-------+
|  1827|2008-01-01| WE|   1|  JAN|  1|2008|   true|
|  1828|2008-01-02| TH|   1|  JAN|  1|2008|  false|
|  1829|2008-01-03| FR|   1|  JAN|  1|2008|  false|
|  1830|2008-01-04| SA|   2|  JAN|  1|2008|  false|
|  1831|2008-01-05| SU|   2|  JAN|  1|2008|  false|
|  1832|2008-01-06| MO|   2|  JAN|  1|2008|  false|
|  1833|2008-01-07| TU|   2|  JAN|  1|2008|  false|
|  1834|2008-01-08| WE|   2|  JAN|  1|2008|  false|
|  1835|2008-01-09| TH|   2|  JAN|  1|2008|  false|
|  1836|2008-01-10| FR|   2|  JAN|  1|2008|  false|
|  1837|2008-01-11| SA|   3|  JAN|  1|2008|  false|
|  1838|2008-01-12| SU|   3|  JAN|  1|2008|  false|
|  1839|2008-01-13| MO|   3|  JAN|  1|2008|  false|
|  1840|2008-01-14| TU|   3|  JAN|  1|2008|  false|
|  1841|2008-01-15| WE|   3|  JAN|  1|2008|  false|
|  1842|2008-01-16| TH|   3|  JAN|  1|2008|  false|
|  1843|2008-01-17| FR|   3|  JAN|  1|2008|  false|
|  1844|2008-01-18| SA|   4|  JAN|  1|2008|  false|
|  1845|2008-01-19| SU|   4|  JAN|  1|2008|  false|
|  1846|2008-01-20| MO|   4|  JAN|  1|2008|  false|
+------+----------+---+----+-----+---+----+-------+
only showing top 20 rows

root
 |-- dateid: long (nullable = true)
 |-- caldate: string (nullable = true)
 |-- day: string (nullable = true)
 |-- week: long (nullable = true)
 |-- month: string (nullable = true)
 |-- qtr: string (nullable = true)
 |-- year: long (nullable = true)
 |-- holiday: boolean (nullable = true)


event

# event
eventScehama = T.StructType([
    T.StructField('eventid', T.LongType()),
    T.StructField('venueid', T.LongType()),
    T.StructField('catid', T.LongType()),
    T.StructField('dateid', T.LongType()),
    T.StructField('eventname', T.StringType()),
    T.StructField('starttime', T.TimestampType())
])
eventDF = sparkSession.read.schema(eventScehama).csv("s3://{0}/input/join/event/".format(bucketName))
eventDF.show()
eventDF.printSchema()
===
+-------+-------+-----+------+--------------------+-------------------+
|eventid|venueid|catid|dateid|           eventname|          starttime|
+-------+-------+-----+------+--------------------+-------------------+
|      1|    305|    8|  1851|     Gotterdammerung|2008-01-25 14:30:00|
|      2|    306|    8|  2114|       Boris Godunov|2008-10-15 20:00:00|
|      3|    302|    8|  1935|              Salome|2008-04-19 14:30:00|
|      4|    309|    8|  2090|La Cenerentola (C...|2008-09-21 14:30:00|
|      5|    302|    8|  1982|        Il Trovatore|2008-06-05 19:00:00|
|      6|    308|    8|  2109|    L Elisir d Amore|2008-10-10 19:30:00|
|      7|    309|    8|  1891|       Doctor Atomic|2008-03-06 14:00:00|
|      8|    302|    8|  1832|     The Magic Flute|2008-01-06 20:00:00|
|      9|    308|    8|  2087|             The Fly|2008-09-18 19:30:00|
|     10|    305|    8|  2079|           Rigoletto|2008-09-10 15:00:00|
|     11|    302|    8|  1952|       Doctor Atomic|2008-05-01 19:30:00|
|     12|    303|    8|  2111|          Ring Cycle|2008-10-12 15:00:00|
|     13|    301|    8|  2154| Lucia di Lammermoor|2008-11-24 15:00:00|
|     14|    303|    8|  2149|          La Rondine|2008-11-19 19:30:00|
|     15|    307|    8|  2038|         Die Walkure|2008-07-31 19:30:00|
|     16|    304|    8|  1987|         La Gioconda|2008-06-10 14:00:00|
|     17|    301|    8|  1834|         La Gioconda|2008-01-08 19:00:00|
|     18|    306|    8|  2152|     Gotterdammerung|2008-11-22 19:00:00|
|     19|    302|    8|  2088|       Boris Godunov|2008-09-19 14:30:00|
|     20|    303|    8|  2051|       Boris Godunov|2008-08-13 19:30:00|
+-------+-------+-----+------+--------------------+-------------------+
only showing top 20 rows

root
 |-- eventid: long (nullable = true)
 |-- venueid: long (nullable = true)
 |-- catid: long (nullable = true)
 |-- dateid: long (nullable = true)
 |-- eventname: string (nullable = true)
 |-- starttime: timestamp (nullable = true)

listing

# listing
listingScehama = T.StructType([
    T.StructField('listid', T.LongType()),
    T.StructField('sellerid', T.LongType()),
    T.StructField('eventid', T.LongType()),
    T.StructField('dateid', T.LongType()),
    T.StructField('numtickets', T.LongType()),
    T.StructField('priceperticket', T.DecimalType()),
    T.StructField('totalprice', T.DecimalType()),
    T.StructField('listtime', T.TimestampType())
])

listingDF = sparkSession.read.schema(listingScehama).csv("s3://{0}/input/join/listing/".format(bucketName))
listingDF.show()
listingDF.printSchema()
===

record count[192497]
+------+--------+-------+------+----------+--------------+----------+-------------------+
|listid|sellerid|eventid|dateid|numtickets|priceperticket|totalprice|           listtime|
+------+--------+-------+------+----------+--------------+----------+-------------------+
|     1|   36861|   7872|  1850|        10|           182|      1820|2008-01-24 06:43:29|
|     2|   16002|   4806|  1890|         7|           233|      1631|2008-03-05 12:25:29|
|     3|   21461|   4256|  2131|         2|           182|       364|2008-11-01 07:35:33|
|     4|    8117|   4337|  1970|         8|            38|       304|2008-05-24 01:18:37|
|     5|    1616|   8647|  1963|         4|           175|       700|2008-05-17 02:29:11|
|     6|   47402|   8240|  2053|        18|            77|      1386|2008-08-15 02:08:13|
|     7|   36551|   7801|  2145|         5|            30|       150|2008-11-15 09:38:15|
|     8|   11891|   8036|  2139|         6|            82|       492|2008-11-09 05:07:30|
|     9|     691|    554|  2078|         6|           135|       810|2008-09-09 08:03:36|
|    10|   24858|   3375|  1994|        16|           197|      3152|2008-06-17 09:44:54|
|    11|   41053|   3877|  2146|         5|            24|       120|2008-11-16 11:59:10|
|    12|   45635|   4769|  2032|        26|            65|      1690|2008-07-25 01:45:49|
|    13|   30606|   2147|  1883|         3|           172|       516|2008-02-26 05:04:06|
|    14|   12651|   2762|  2058|         8|           122|       976|2008-08-20 01:45:26|
|    15|   46833|   7910|  1947|        24|            75|      1800|2008-05-01 04:26:30|
|    16|   41902|   4571|  2062|         6|           231|      1386|2008-08-24 09:36:12|
|    17|   20228|   7837|  1865|         9|           167|      1503|2008-02-08 06:27:54|
|    18|   13013|   7085|  2049|        14|           219|      3066|2008-08-11 06:33:03|
|    19|   12339|   3423|  1986|         2|            53|       106|2008-06-09 08:39:56|
|    20|    6300|   2075|  1876|         7|            66|       462|2008-02-19 12:28:01|
+------+--------+-------+------+----------+--------------+----------+-------------------+
only showing top 20 rows

root
 |-- listid: long (nullable = true)
 |-- sellerid: long (nullable = true)
 |-- eventid: long (nullable = true)
 |-- dateid: long (nullable = true)
 |-- numtickets: long (nullable = true)
 |-- priceperticket: decimal(10,0) (nullable = true)
 |-- totalprice: decimal(10,0) (nullable = true)
 |-- listtime: timestamp (nullable = true)


sales

# sales
salesScehama = T.StructType([
    T.StructField('salesid', T.LongType()),
    T.StructField('listid', T.LongType()),
    T.StructField('sellerid', T.LongType()),
    T.StructField('buyerid', T.LongType()),
    T.StructField('eventid', T.LongType()),
    T.StructField('dateid', T.LongType()),
    T.StructField('qtysold', T.LongType()),
    T.StructField('pricepaid', T.DecimalType()),
    T.StructField('commission', T.DecimalType()),
    T.StructField('saletime', T.StringType()),
])

salesDF = sparkSession.read.schema(salesScehama).csv("s3://{0}/input/join/sales/".format(bucketName))
print("record count[{0}]".format(salesDF.count()))
salesDF.show()
salesDF.printSchema()
===
record count[172456]
+-------+------+--------+-------+-------+------+-------+---------+----------+------------------+
|salesid|listid|sellerid|buyerid|eventid|dateid|qtysold|pricepaid|commission|          saletime|
+-------+------+--------+-------+-------+------+-------+---------+----------+------------------+
|      1|     1|   36861|  21191|   7872|  1875|      4|      728|       109|2/18/2008 02:36:48|
|      2|     4|    8117|  11498|   4337|  1983|      2|       76|        11| 6/6/2008 05:00:16|
|      3|     5|    1616|  17433|   8647|  1983|      2|      350|        53| 6/6/2008 08:26:17|
|      4|     5|    1616|  19715|   8647|  1986|      1|      175|        26| 6/9/2008 08:38:52|
|      5|     6|   47402|  14115|   8240|  2069|      2|      154|        23|8/31/2008 09:17:02|
|      6|    10|   24858|  24888|   3375|  2023|      2|      394|        59|7/16/2008 11:59:24|
|      7|    10|   24858|   7952|   3375|  2003|      4|      788|       118|6/26/2008 12:56:06|
|      8|    10|   24858|  19715|   3375|  2017|      1|      197|        30|7/10/2008 02:12:36|
|      9|    10|   24858|  29891|   3375|  2029|      3|      591|        89|7/22/2008 02:23:17|
|     10|    12|   45635|  10542|   4769|  2044|      1|       65|        10| 8/6/2008 02:51:55|
|     11|    12|   45635|   8435|   4769|  2042|      2|      130|        20| 8/4/2008 03:06:36|
|     12|    13|   30606|   9633|   2147|  1894|      2|      344|        52| 3/9/2008 03:18:56|
|     13|    14|   12651|   1664|   2762|  2059|      2|      244|        37|8/21/2008 03:39:06|
|     14|    15|   46833|   2912|   7910|  1950|      4|      300|        45| 5/4/2008 05:39:08|
|     15|    15|   46833|  19715|   7910|  1970|      1|       75|        11|5/24/2008 06:21:47|
|     16|    17|   20228|  36376|   7837|  1908|      2|      334|        50|3/23/2008 06:27:34|
|     17|    17|   20228|  39913|   7837|  1912|      1|      167|        25|3/27/2008 06:40:29|
|     18|    20|    6300|  23690|   2075|  1904|      1|       66|        10|3/19/2008 06:46:46|
|     19|    20|    6300|  32200|   2075|  1914|      4|      264|        40|3/29/2008 06:46:57|
|     20|    20|    6300|  25316|   2075|  1906|      2|      132|        20|3/21/2008 07:01:03|
+-------+------+--------+-------+-------+------+-------+---------+----------+------------------+
only showing top 20 rows

root
 |-- salesid: long (nullable = true)
 |-- listid: long (nullable = true)
 |-- sellerid: long (nullable = true)
 |-- buyerid: long (nullable = true)
 |-- eventid: long (nullable = true)
 |-- dateid: long (nullable = true)
 |-- qtysold: long (nullable = true)
 |-- pricepaid: decimal(10,0) (nullable = true)
 |-- commission: decimal(10,0) (nullable = true)
 |-- saletime: string (nullable = true)

users

# users
usersScehama = T.StructType([
    T.StructField('userid', T.LongType()),
    T.StructField('username', T.StringType()),
    T.StructField('firstname', T.StringType()),
    T.StructField('lastname', T.StringType()),
    T.StructField('city', T.StringType()),
    T.StructField('state', T.StringType()),
    T.StructField('email', T.StringType()),
    T.StructField('phone', T.StringType()),
    T.StructField('likesports', T.BooleanType()),
    T.StructField('liketheatre', T.BooleanType()),
    T.StructField('likeconcerts', T.BooleanType()),
    T.StructField('likejazz', T.BooleanType()),
    T.StructField('likeclassical', T.BooleanType()),
    T.StructField('likeopera', T.BooleanType()),
    T.StructField('likerock', T.BooleanType()),
    T.StructField('likevegas', T.BooleanType()),
    T.StructField('likebroadway', T.BooleanType()),
    T.StructField('likemusicals', T.BooleanType()),
])

usersDF = sparkSession.read.schema(usersScehama).csv("s3://{0}/input/join/users/".format(bucketName))
print("record count[{0}]".format(usersDF.count()))
usersDF.show()
usersDF.printSchema()
===
record count[49990]
+------+--------+---------+---------+------------+-----+--------------------+--------------+----------+-----------+------------+--------+-------------+---------+--------+---------+------------+------------+
|userid|username|firstname| lastname|        city|state|               email|         phone|likesports|liketheatre|likeconcerts|likejazz|likeclassical|likeopera|likerock|likevegas|likebroadway|likemusicals|
+------+--------+---------+---------+------------+-----+--------------------+--------------+----------+-----------+------------+--------+-------------+---------+--------+---------+------------+------------+
|     1|JSG99FHE|   Rafael|   Taylor|        Kent|   WA|Etiam.laoreet.lib...|(664) 602-4412|      true|       true|        null|   false|         true|     null|    null|     true|       false|        true|
|     2|PGL08LJI| Vladimir| Humphrey|Murfreesboro|   SK|Suspendisse.trist...|(783) 492-1886|      null|       null|        null|    true|         true|     null|    null|     true|       false|        true|
|     3|IFT66TXU|     Lars|  Ratliff|  High Point|   ME|amet.faucibus.ut@...|(624) 767-2465|      true|      false|        null|   false|         null|    false|    true|     null|        null|        true|
|     4|XDZ38RDD|    Barry|      Roy|       Omaha|   AB|   sed@lacusUtnec.ca|(355) 452-8168|     false|       true|        null|   false|         null|     null|    null|     null|        null|       false|
|     5|AEB55QTM|   Reagan|    Hodge| Forest Lake|   NS|    Cum@accumsan.com|(476) 519-9131|      null|       null|        true|   false|         null|     null|    true|     true|       false|        true|
|     6|NDQ15VBM|   Victor|Hernandez|  Naperville|   GA|turpis@accumsanla...|(818) 765-4255|     false|       null|        null|    true|         null|     true|    true|     true|        true|        true|
|     7|OWY35QYB|  Tamekah|   Juarez|    Moultrie|   WV|elementum@semperp...|(297) 875-7247|      null|       null|        null|    true|         true|    false|    null|     null|       false|       false|
|     8|AZG78YIP|   Colton|      Roy|     Guayama|   AK|ullamcorper.nisl@...|(998) 934-9210|      null|       null|        true|    true|         null|     true|   false|     null|       false|       false|
|     9|MSD36KVR|  Mufutau|  Watkins| Port Orford|   MD|Integer.mollis.In...|(725) 719-7670|      true|      false|        null|   false|         true|     null|    null|     null|       false|        true|
|    10|WKW41AIW|    Naida| Calderon|   Waterbury|   MB|Donec.fringilla@s...|(197) 726-8249|     false|      false|       false|    null|        false|     true|    null|     true|        null|        null|
|    11|MFN29TYU|    Anika|     Huff|     Rawlins|   MT|arcu.Curabitur@se...|(419) 147-8207|      null|       null|        null|    true|         null|     true|    null|     null|        null|        null|
|    12|FVK28WAS|    Bruce|     Beck|        Kona|   OH|         ac@velit.ca|(617) 527-9908|      null|       null|       false|    null|         null|    false|    null|     null|        null|        null|
|    13|QTF33MCG|    Henry|  Cochran|Bossier City|   QC|Aliquam.vulputate...|(783) 105-0989|      null|       true|        null|    null|         null|     null|    true|     true|        true|        null|
|    14|OVQ88RKY|  Mallory|  Farrell|  Villa Park|   ID|vel.est@veliteges...|(711) 160-7386|      null|       null|        null|    null|         null|     true|    null|     null|       false|        null|
|    15|OWU78MTR| Scarlett|    Mayer|     Gadsden|   GA|lorem.ipsum@Vesti...|(189) 882-8412|      true|      false|        true|    null|         null|     true|    null|     null|        true|        null|
|    16|ZMG93CDD|   Kieran|    Drake| Hot Springs|   BC|molestie.tellus@d...|(192) 914-0016|      null|       true|        true|    null|        false|     null|    true|     true|        null|       false|
|    17|WWZ18EOX|     Cody|     Moss|      Mobile|   ON|dolor.nonummy@ips...|(412) 488-2896|     false|      false|        null|    null|         null|     null|   false|     null|       false|       false|
|    18|VDP05MXU| Germaine|   Valdez|      Kokomo|   WY|cursus.Integer@ar...|(998) 879-8668|      null|       true|        true|    null|         true|     true|    null|    false|        true|        true|
|    19|CXQ97IWP|     Amal|   Landry|      Lomita|   NT|  euismod@turpis.org|(891) 526-1468|      null|      false|        true|    null|         true|     null|    null|    false|       false|        true|
|    20|VJT98ZQY|     Jane|     Dyer|      Sharon|   NH|   et@Nunclaoreet.ca|(411) 638-5867|      null|       true|        null|   false|         null|    false|    null|     null|        true|       false|
+------+--------+---------+---------+------------+-----+--------------------+--------------+----------+-----------+------------+--------+-------------+---------+--------+---------+------------+------------+
only showing top 20 rows

root
 |-- userid: long (nullable = true)
 |-- username: string (nullable = true)
 |-- firstname: string (nullable = true)
 |-- lastname: string (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- email: string (nullable = true)
 |-- phone: string (nullable = true)
 |-- likesports: boolean (nullable = true)
 |-- liketheatre: boolean (nullable = true)
 |-- likeconcerts: boolean (nullable = true)
 |-- likejazz: boolean (nullable = true)
 |-- likeclassical: boolean (nullable = true)
 |-- likeopera: boolean (nullable = true)
 |-- likerock: boolean (nullable = true)
 |-- likevegas: boolean (nullable = true)
 |-- likebroadway: boolean (nullable = true)
 |-- likemusicals: boolean (nullable = true)

venue

# venue
venueScehama = T.StructType([
    T.StructField('venueid', T.LongType()),
    T.StructField('venuename', T.StringType()),
    T.StructField('venuecity', T.StringType()),
    T.StructField('venuestate', T.StringType()),
    T.StructField('venueseats', T.LongType())
])

venueDF = sparkSession.read.schema(venueScehama).csv("s3://{0}/input/join/venue/".format(bucketName))
print("record count[{0}]".format(venueDF.count()))
venueDF.show()
venueDF.printSchema()
===
record count[202]
+-------+--------------------+---------------+----------+----------+
|venueid|           venuename|      venuecity|venuestate|venueseats|
+-------+--------------------+---------------+----------+----------+
|      1|         Toyota Park|     Bridgeview|        IL|         0|
|      2|Columbus Crew Sta...|       Columbus|        OH|         0|
|      3|         RFK Stadium|     Washington|        DC|         0|
|      4|CommunityAmerica ...|    Kansas City|        KS|         0|
|      5|    Gillette Stadium|     Foxborough|        MA|     68756|
|      6|New York Giants S...|East Rutherford|        NJ|     80242|
|      7|           BMO Field|        Toronto|        ON|         0|
|      8|The Home Depot Ce...|         Carson|        CA|         0|
|      9|Dick's Sporting G...|  Commerce City|        CO|         0|
|     10|      Pizza Hut Park|         Frisco|        TX|         0|
|     11|   Robertson Stadium|        Houston|        TX|         0|
|     13| Rice-Eccles Stadium| Salt Lake City|        UT|         0|
|     14|   Buck Shaw Stadium|    Santa Clara|        CA|         0|
|     15|     McAfee Coliseum|        Oakland|        CA|     63026|
|     16| TD Banknorth Garden|         Boston|        MA|         0|
|     17|         Izod Center|East Rutherford|        NJ|         0|
|     18|Madison Square Ga...|  New York City|        NY|     20000|
|     19|     Wachovia Center|   Philadelphia|        PA|         0|
|     20|   Air Canada Centre|        Toronto|        ON|         0|
|     21|       United Center|        Chicago|        IL|         0|
+-------+--------------------+---------------+----------+----------+
only showing top 20 rows

root
 |-- venueid: long (nullable = true)
 |-- venuename: string (nullable = true)
 |-- venuecity: string (nullable = true)
 |-- venuestate: string (nullable = true)
 |-- venueseats: long (nullable = true)

シナリオ

それぞれのデータ量とリレーションは、下記のとおりです。

データセット名 データ件数 データサイズ
category 11 465B
date 365 14.2KB
event 8,798 435.4KB
listing 192,497 11MB
sales 172,456 10.7MB
users 49,990 5.6MB
venue 202 7.8KB

image.png
サンプルデータベースから引用。

  1. 最もデータ量が多い、salesとlistingを使用して検証します。LISTIDでジョインします。
  2. 比較的小さいDATEと、大きいSALESを使用して検証します。dateidでジョインします。
  • SALES
    • SALESIDがプライマリキー。各行の一意の ID 値。各行は、特定のイベントの 1 枚以上のチケットの各販売を表します (特定のリストで提供)。
    • LISTIDがLISTINGの外部キー
  • LISTING
    • LISTIDがプライマリキー。各行の一意の ID 値。各行は、特定のイベントの一連のチケットをリストで表します。
  • DATE
    • プライマリキー。各行の一意の ID 値。各行は、1 暦年の中の各日を表します。

各ジョインで、ヒントを与えて下記について確認します。

  • 実行計画(Physical Plan)
  • 実行時間

両方ともデータが大きい場合

ヒントなし

start_time = time.time()


listingDF.createOrReplaceTempView("listing2")
salesDF.createOrReplaceTempView("sales2")

joinDF = spark.sql("SELECT * FROM listing2 , sales2 WHERE listing2.listid = sales2.listid")
joinDF.explain()
joinDF.show(1)

end_time = time.time()

print("execution time {0}".format(end_time-start_time))
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- SortMergeJoin [listid#1905L], [listid#1977L], Inner
   :- Sort [listid#1905L ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(listid#1905L, 1000), ENSURE_REQUIREMENTS, [plan_id=1934]
   :     +- Filter isnotnull(listid#1905L)
   :        +- FileScan csv [listid#1905L,sellerid#1906L,eventid#1907L,dateid#1908L,numtickets#1909L,priceperticket#1910,totalprice#1911,listtime#1912] Batched: false, DataFilters: [isnotnull(listid#1905L)], Format: CSV, Location: InMemoryFileIndex(1 paths)[s3://XXXXX/input/join/listing], PartitionFilters: [], PushedFilters: [IsNotNull(listid)], ReadSchema: struct<listid:bigint,sellerid:bigint,eventid:bigint,dateid:bigint,numtickets:bigint,priceperticke...
   +- Sort [listid#1977L ASC NULLS FIRST], false, 0
      +- Exchange hashpartitioning(listid#1977L, 1000), ENSURE_REQUIREMENTS, [plan_id=1935]
         +- Filter isnotnull(listid#1977L)
            +- FileScan csv [salesid#1976L,listid#1977L,sellerid#1978L,buyerid#1979L,eventid#1980L,dateid#1981L,qtysold#1982L,pricepaid#1983,commission#1984,saletime#1985] Batched: false, DataFilters: [isnotnull(listid#1977L)], Format: CSV, Location: InMemoryFileIndex(1 paths)[s3://XXXXX/input/join/sales], PartitionFilters: [], PushedFilters: [IsNotNull(listid)], ReadSchema: struct<salesid:bigint,listid:bigint,sellerid:bigint,buyerid:bigint,eventid:bigint,dateid:bigint,q...


+------+--------+-------+------+----------+--------------+----------+-------------------+-------+------+--------+-------+-------+------+-------+---------+----------+------------------+
|listid|sellerid|eventid|dateid|numtickets|priceperticket|totalprice|           listtime|salesid|listid|sellerid|buyerid|eventid|dateid|qtysold|pricepaid|commission|          saletime|
+------+--------+-------+------+----------+--------------+----------+-------------------+-------+------+--------+-------+-------+------+-------+---------+----------+------------------+
|    94|   23652|   6117|  2047|         3|            37|       111|2008-08-09 04:49:27|     75|    94|   23652|  28081|   6117|  2080|      2|       74|        11|9/11/2008 08:42:27|
+------+--------+-------+------+----------+--------------+----------+-------------------+-------+------+--------+-------+-------+------+-------+---------+----------+------------------+
only showing top 1 row

execution time 11.872079372406006

BROADCAST

start_time = time.time()


listingDF.createOrReplaceTempView("listing3")
salesDF.createOrReplaceTempView("sales3")

joinDF = spark.sql("SELECT /*+ BROADCAST(listing3) */ * FROM listing3 , sales3 WHERE listing3.listid = sales3.listid")
joinDF.explain()
joinDF.show(1)

end_time = time.time()

print("execution time {0}".format(end_time-start_time))
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- BroadcastHashJoin [listid#1905L], [listid#1977L], Inner, BuildLeft, false
   :- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]),false), [plan_id=2151]
   :  +- Filter isnotnull(listid#1905L)
   :     +- FileScan csv [listid#1905L,sellerid#1906L,eventid#1907L,dateid#1908L,numtickets#1909L,priceperticket#1910,totalprice#1911,listtime#1912] Batched: false, DataFilters: [isnotnull(listid#1905L)], Format: CSV, Location: InMemoryFileIndex(1 paths)[s3://XXXXX/input/join/listing], PartitionFilters: [], PushedFilters: [IsNotNull(listid)], ReadSchema: struct<listid:bigint,sellerid:bigint,eventid:bigint,dateid:bigint,numtickets:bigint,priceperticke...
   +- Filter isnotnull(listid#1977L)
      +- FileScan csv [salesid#1976L,listid#1977L,sellerid#1978L,buyerid#1979L,eventid#1980L,dateid#1981L,qtysold#1982L,pricepaid#1983,commission#1984,saletime#1985] Batched: false, DataFilters: [isnotnull(listid#1977L)], Format: CSV, Location: InMemoryFileIndex(1 paths)[s3://XXXXX/input/join/sales], PartitionFilters: [], PushedFilters: [IsNotNull(listid)], ReadSchema: struct<salesid:bigint,listid:bigint,sellerid:bigint,buyerid:bigint,eventid:bigint,dateid:bigint,q...


+------+--------+-------+------+----------+--------------+----------+-------------------+-------+------+--------+-------+-------+------+-------+---------+----------+------------------+
|listid|sellerid|eventid|dateid|numtickets|priceperticket|totalprice|           listtime|salesid|listid|sellerid|buyerid|eventid|dateid|qtysold|pricepaid|commission|          saletime|
+------+--------+-------+------+----------+--------------+----------+-------------------+-------+------+--------+-------+-------+------+-------+---------+----------+------------------+
|     1|   36861|   7872|  1850|        10|           182|      1820|2008-01-24 06:43:29|      1|     1|   36861|  21191|   7872|  1875|      4|      728|       109|2/18/2008 02:36:48|
+------+--------+-------+------+----------+--------------+----------+-------------------+-------+------+--------+-------+-------+------+-------+---------+----------+------------------+
only showing top 1 row

execution time 7.517160415649414

MERGE(shuffle sort merge join)

start_time = time.time()


listingDF.createOrReplaceTempView("listing4")
salesDF.createOrReplaceTempView("sales4")

joinDF = spark.sql("SELECT /*+ SHUFFLE_MERGE(listing4) */ * FROM listing4 , sales4 WHERE listing4.listid = sales4.listid")
joinDF.explain()
joinDF.show(1)

end_time = time.time()

print("execution time {0}".format(end_time-start_time))
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- SortMergeJoin [listid#1905L], [listid#1977L], Inner
   :- Sort [listid#1905L ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(listid#1905L, 1000), ENSURE_REQUIREMENTS, [plan_id=2282]
   :     +- Filter isnotnull(listid#1905L)
   :        +- FileScan csv [listid#1905L,sellerid#1906L,eventid#1907L,dateid#1908L,numtickets#1909L,priceperticket#1910,totalprice#1911,listtime#1912] Batched: false, DataFilters: [isnotnull(listid#1905L)], Format: CSV, Location: InMemoryFileIndex(1 paths)[s3://XXXX/input/join/listing], PartitionFilters: [], PushedFilters: [IsNotNull(listid)], ReadSchema: struct<listid:bigint,sellerid:bigint,eventid:bigint,dateid:bigint,numtickets:bigint,priceperticke...
   +- Sort [listid#1977L ASC NULLS FIRST], false, 0
      +- Exchange hashpartitioning(listid#1977L, 1000), ENSURE_REQUIREMENTS, [plan_id=2283]
         +- Filter isnotnull(listid#1977L)
            +- FileScan csv [salesid#1976L,listid#1977L,sellerid#1978L,buyerid#1979L,eventid#1980L,dateid#1981L,qtysold#1982L,pricepaid#1983,commission#1984,saletime#1985] Batched: false, DataFilters: [isnotnull(listid#1977L)], Format: CSV, Location: InMemoryFileIndex(1 paths)[s3://XXXXX/input/join/sales], PartitionFilters: [], PushedFilters: [IsNotNull(listid)], ReadSchema: struct<salesid:bigint,listid:bigint,sellerid:bigint,buyerid:bigint,eventid:bigint,dateid:bigint,q...


+------+--------+-------+------+----------+--------------+----------+-------------------+-------+------+--------+-------+-------+------+-------+---------+----------+------------------+
|listid|sellerid|eventid|dateid|numtickets|priceperticket|totalprice|           listtime|salesid|listid|sellerid|buyerid|eventid|dateid|qtysold|pricepaid|commission|          saletime|
+------+--------+-------+------+----------+--------------+----------+-------------------+-------+------+--------+-------+-------+------+-------+---------+----------+------------------+
|    94|   23652|   6117|  2047|         3|            37|       111|2008-08-09 04:49:27|     75|    94|   23652|  28081|   6117|  2080|      2|       74|        11|9/11/2008 08:42:27|
+------+--------+-------+------+----------+--------------+----------+-------------------+-------+------+--------+-------+-------+------+-------+---------+----------+------------------+
only showing top 1 row

execution time 11.739749908447266

SHUFFLE_HASH(shuffle hash join)

start_time = time.time()

listingDF.createOrReplaceTempView("listing5")
salesDF.createOrReplaceTempView("sales5")

joinDF = spark.sql("SELECT /*+ SHUFFLE_HASH(listing5) */ * FROM listing5 , sales5 WHERE listing5.listid = sales5.listid")
joinDF.explain()
joinDF.show(1)

end_time = time.time()

print("execution time {0}".format(end_time-start_time))
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- ShuffledHashJoin [listid#1905L], [listid#1977L], Inner, BuildLeft
   :- Exchange hashpartitioning(listid#1905L, 1000), ENSURE_REQUIREMENTS, [plan_id=2500]
   :  +- Filter isnotnull(listid#1905L)
   :     +- FileScan csv [listid#1905L,sellerid#1906L,eventid#1907L,dateid#1908L,numtickets#1909L,priceperticket#1910,totalprice#1911,listtime#1912] Batched: false, DataFilters: [isnotnull(listid#1905L)], Format: CSV, Location: InMemoryFileIndex(1 paths)[s3://XXXXX/input/join/listing], PartitionFilters: [], PushedFilters: [IsNotNull(listid)], ReadSchema: struct<listid:bigint,sellerid:bigint,eventid:bigint,dateid:bigint,numtickets:bigint,priceperticke...
   +- Exchange hashpartitioning(listid#1977L, 1000), ENSURE_REQUIREMENTS, [plan_id=2501]
      +- Filter isnotnull(listid#1977L)
         +- FileScan csv [salesid#1976L,listid#1977L,sellerid#1978L,buyerid#1979L,eventid#1980L,dateid#1981L,qtysold#1982L,pricepaid#1983,commission#1984,saletime#1985] Batched: false, DataFilters: [isnotnull(listid#1977L)], Format: CSV, Location: InMemoryFileIndex(1 paths)[s3://XXXX/input/join/sales], PartitionFilters: [], PushedFilters: [IsNotNull(listid)], ReadSchema: struct<salesid:bigint,listid:bigint,sellerid:bigint,buyerid:bigint,eventid:bigint,dateid:bigint,q...


+------+--------+-------+------+----------+--------------+----------+-------------------+-------+------+--------+-------+-------+------+-------+---------+----------+-----------------+
|listid|sellerid|eventid|dateid|numtickets|priceperticket|totalprice|           listtime|salesid|listid|sellerid|buyerid|eventid|dateid|qtysold|pricepaid|commission|         saletime|
+------+--------+-------+------+----------+--------------+----------+-------------------+-------+------+--------+-------+-------+------+-------+---------+----------+-----------------+
|   964|   22048|   6058|  1946|        10|           189|      1890|2008-04-30 09:46:24|    960|   964|   22048|   1816|   6058|  1948|      1|      189|        28|5/2/2008 05:23:09|
+------+--------+-------+------+----------+--------------+----------+-------------------+-------+------+--------+-------+-------+------+-------+---------+----------+-----------------+
only showing top 1 row

execution time 7.968986511230469

SHUFFLE_REPLICATE_NL(shuffle-and-replicate nested loop join)

start_time = time.time()


listingDF.createOrReplaceTempView("listing6")
salesDF.createOrReplaceTempView("sales6")

joinDF = spark.sql("SELECT /*+ SHUFFLE_REPLICATE_NL(listing6) */ * FROM listing6 , sales6 WHERE listing6.listid = sales6.listid")
joinDF.explain()
joinDF.show(1)

end_time = time.time()

print("execution time {0}".format(end_time-start_time))
== Physical Plan ==
CartesianProduct (listid#1905L = listid#1977L)
:- *(1) Filter isnotnull(listid#1905L)
:  +- FileScan csv [listid#1905L,sellerid#1906L,eventid#1907L,dateid#1908L,numtickets#1909L,priceperticket#1910,totalprice#1911,listtime#1912] Batched: false, DataFilters: [isnotnull(listid#1905L)], Format: CSV, Location: InMemoryFileIndex(1 paths)[s3://XXXX/input/join/listing], PartitionFilters: [], PushedFilters: [IsNotNull(listid)], ReadSchema: struct<listid:bigint,sellerid:bigint,eventid:bigint,dateid:bigint,numtickets:bigint,priceperticke...
+- *(2) Filter isnotnull(listid#1977L)
   +- FileScan csv [salesid#1976L,listid#1977L,sellerid#1978L,buyerid#1979L,eventid#1980L,dateid#1981L,qtysold#1982L,pricepaid#1983,commission#1984,saletime#1985] Batched: false, DataFilters: [isnotnull(listid#1977L)], Format: CSV, Location: InMemoryFileIndex(1 paths)[s3://XXXXX/input/join/sales], PartitionFilters: [], PushedFilters: [IsNotNull(listid)], ReadSchema: struct<salesid:bigint,listid:bigint,sellerid:bigint,buyerid:bigint,eventid:bigint,dateid:bigint,q...


+------+--------+-------+------+----------+--------------+----------+-------------------+-------+------+--------+-------+-------+------+-------+---------+----------+------------------+
|listid|sellerid|eventid|dateid|numtickets|priceperticket|totalprice|           listtime|salesid|listid|sellerid|buyerid|eventid|dateid|qtysold|pricepaid|commission|          saletime|
+------+--------+-------+------+----------+--------------+----------+-------------------+-------+------+--------+-------+-------+------+-------+---------+----------+------------------+
|     1|   36861|   7872|  1850|        10|           182|      1820|2008-01-24 06:43:29|      1|     1|   36861|  21191|   7872|  1875|      4|      728|       109|2/18/2008 02:36:48|
+------+--------+-------+------+----------+--------------+----------+-------------------+-------+------+--------+-------+-------+------+-------+---------+----------+------------------+
only showing top 1 row

execution time 7.117904901504517

片方が小さく、片方が大きい場合

ヒントなし

start_time = time.time()

dateDF.createOrReplaceTempView("date7")
salesDF.createOrReplaceTempView("sales7")

joinDF = spark.sql("SELECT * FROM date7 , sales7 WHERE date7.dateid = sales7.dateid")
joinDF.explain()
joinDF.show(1)

end_time = time.time()

print("execution time {0}".format(end_time-start_time))
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- BroadcastHashJoin [dateid#1779L], [dateid#1981L], Inner, BuildLeft, false
   :- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]),false), [plan_id=2755]
   :  +- Filter isnotnull(dateid#1779L)
   :     +- FileScan csv [dateid#1779L,caldate#1780,day#1781,week#1782L,month#1783,qtr#1784,year#1785L,holiday#1786] Batched: false, DataFilters: [isnotnull(dateid#1779L)], Format: CSV, Location: InMemoryFileIndex(1 paths)[s3://XXXX/input/join/date], PartitionFilters: [], PushedFilters: [IsNotNull(dateid)], ReadSchema: struct<dateid:bigint,caldate:string,day:string,week:bigint,month:string,qtr:string,year:bigint,ho...
   +- Filter isnotnull(dateid#1981L)
      +- FileScan csv [salesid#1976L,listid#1977L,sellerid#1978L,buyerid#1979L,eventid#1980L,dateid#1981L,qtysold#1982L,pricepaid#1983,commission#1984,saletime#1985] Batched: false, DataFilters: [isnotnull(dateid#1981L)], Format: CSV, Location: InMemoryFileIndex(1 paths)[s3://XXXX/input/join/sales], PartitionFilters: [], PushedFilters: [IsNotNull(dateid)], ReadSchema: struct<salesid:bigint,listid:bigint,sellerid:bigint,buyerid:bigint,eventid:bigint,dateid:bigint,q...


+------+----------+---+----+-----+---+----+-------+-------+------+--------+-------+-------+------+-------+---------+----------+------------------+
|dateid|   caldate|day|week|month|qtr|year|holiday|salesid|listid|sellerid|buyerid|eventid|dateid|qtysold|pricepaid|commission|          saletime|
+------+----------+---+----+-----+---+----+-------+-------+------+--------+-------+-------+------+-------+---------+----------+------------------+
|  1875|2008-02-18| TU|   8|  FEB|  1|2008|  false|      1|     1|   36861|  21191|   7872|  1875|      4|      728|       109|2/18/2008 02:36:48|
+------+----------+---+----+-----+---+----+-------+-------+------+--------+-------+-------+------+-------+---------+----------+------------------+
only showing top 1 row

execution time 6.243089914321899

BROADCAST

start_time = time.time()

dateDF.createOrReplaceTempView("date8")
salesDF.createOrReplaceTempView("sales8")

joinDF = spark.sql("SELECT /*+ BROADCAST(date8) */ * FROM date8 , sales8 WHERE date8.dateid = sales8.dateid")
joinDF.explain()
joinDF.show(1)

end_time = time.time()

print("execution time {0}".format(end_time-start_time))
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- BroadcastHashJoin [dateid#1779L], [dateid#1981L], Inner, BuildLeft, false
   :- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]),false), [plan_id=2885]
   :  +- Filter isnotnull(dateid#1779L)
   :     +- FileScan csv [dateid#1779L,caldate#1780,day#1781,week#1782L,month#1783,qtr#1784,year#1785L,holiday#1786] Batched: false, DataFilters: [isnotnull(dateid#1779L)], Format: CSV, Location: InMemoryFileIndex(1 paths)[s3://XXXX/input/join/date], PartitionFilters: [], PushedFilters: [IsNotNull(dateid)], ReadSchema: struct<dateid:bigint,caldate:string,day:string,week:bigint,month:string,qtr:string,year:bigint,ho...
   +- Filter isnotnull(dateid#1981L)
      +- FileScan csv [salesid#1976L,listid#1977L,sellerid#1978L,buyerid#1979L,eventid#1980L,dateid#1981L,qtysold#1982L,pricepaid#1983,commission#1984,saletime#1985] Batched: false, DataFilters: [isnotnull(dateid#1981L)], Format: CSV, Location: InMemoryFileIndex(1 paths)[s3://XXXXX/input/join/sales], PartitionFilters: [], PushedFilters: [IsNotNull(dateid)], ReadSchema: struct<salesid:bigint,listid:bigint,sellerid:bigint,buyerid:bigint,eventid:bigint,dateid:bigint,q...


+------+----------+---+----+-----+---+----+-------+-------+------+--------+-------+-------+------+-------+---------+----------+------------------+
|dateid|   caldate|day|week|month|qtr|year|holiday|salesid|listid|sellerid|buyerid|eventid|dateid|qtysold|pricepaid|commission|          saletime|
+------+----------+---+----+-----+---+----+-------+-------+------+--------+-------+-------+------+-------+---------+----------+------------------+
|  1875|2008-02-18| TU|   8|  FEB|  1|2008|  false|      1|     1|   36861|  21191|   7872|  1875|      4|      728|       109|2/18/2008 02:36:48|
+------+----------+---+----+-----+---+----+-------+-------+------+--------+-------+-------+------+-------+---------+----------+------------------+
only showing top 1 row

execution time 6.2336790561676025

MERGE(shuffle sort merge join)

start_time = time.time()

dateDF.createOrReplaceTempView("date9")
salesDF.createOrReplaceTempView("sales9")

joinDF = spark.sql("SELECT /*+ SHUFFLE_MERGE(date9) */ * FROM date9 , sales9 WHERE date9.dateid = sales9.dateid")
joinDF.explain()
joinDF.show(1)

end_time = time.time()

print("execution time {0}".format(end_time-start_time))
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- SortMergeJoin [dateid#1779L], [dateid#1981L], Inner
   :- Sort [dateid#1779L ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(dateid#1779L, 1000), ENSURE_REQUIREMENTS, [plan_id=3016]
   :     +- Filter isnotnull(dateid#1779L)
   :        +- FileScan csv [dateid#1779L,caldate#1780,day#1781,week#1782L,month#1783,qtr#1784,year#1785L,holiday#1786] Batched: false, DataFilters: [isnotnull(dateid#1779L)], Format: CSV, Location: InMemoryFileIndex(1 paths)[s3://XXXXX/input/join/date], PartitionFilters: [], PushedFilters: [IsNotNull(dateid)], ReadSchema: struct<dateid:bigint,caldate:string,day:string,week:bigint,month:string,qtr:string,year:bigint,ho...
   +- Sort [dateid#1981L ASC NULLS FIRST], false, 0
      +- Exchange hashpartitioning(dateid#1981L, 1000), ENSURE_REQUIREMENTS, [plan_id=3017]
         +- Filter isnotnull(dateid#1981L)
            +- FileScan csv [salesid#1976L,listid#1977L,sellerid#1978L,buyerid#1979L,eventid#1980L,dateid#1981L,qtysold#1982L,pricepaid#1983,commission#1984,saletime#1985] Batched: false, DataFilters: [isnotnull(dateid#1981L)], Format: CSV, Location: InMemoryFileIndex(1 paths)[s3://XXXXX/input/join/sales], PartitionFilters: [], PushedFilters: [IsNotNull(dateid)], ReadSchema: struct<salesid:bigint,listid:bigint,sellerid:bigint,buyerid:bigint,eventid:bigint,dateid:bigint,q...


+------+----------+---+----+-----+---+----+-------+-------+------+--------+-------+-------+------+-------+---------+----------+------------------+
|dateid|   caldate|day|week|month|qtr|year|holiday|salesid|listid|sellerid|buyerid|eventid|dateid|qtysold|pricepaid|commission|          saletime|
+------+----------+---+----+-----+---+----+-------+-------+------+--------+-------+-------+------+-------+---------+----------+------------------+
|  1882|2008-02-25| TU|   9|  FEB|  1|2008|  false| 119824|137202|   26796|  38210|   6825|  1882|      2|      798|       120|2/25/2008 01:44:23|
+------+----------+---+----+-----+---+----+-------+-------+------+--------+-------+-------+------+-------+---------+----------+------------------+
only showing top 1 row

execution time 7.939592599868774

SHUFFLE_HASH(shuffle hash join)

start_time = time.time()

dateDF.createOrReplaceTempView("date10")
salesDF.createOrReplaceTempView("sales10")

joinDF = spark.sql("SELECT /*+ SHUFFLE_HASH(date10) */ * FROM date10 , sales10 WHERE date10.dateid = sales10.dateid")
joinDF.explain()
joinDF.show(1)

end_time = time.time()

print("execution time {0}".format(end_time-start_time))
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- ShuffledHashJoin [dateid#1779L], [dateid#1981L], Inner, BuildLeft
   :- Exchange hashpartitioning(dateid#1779L, 1000), ENSURE_REQUIREMENTS, [plan_id=3234]
   :  +- Filter isnotnull(dateid#1779L)
   :     +- FileScan csv [dateid#1779L,caldate#1780,day#1781,week#1782L,month#1783,qtr#1784,year#1785L,holiday#1786] Batched: false, DataFilters: [isnotnull(dateid#1779L)], Format: CSV, Location: InMemoryFileIndex(1 paths)[s3://XXXX/input/join/date], PartitionFilters: [], PushedFilters: [IsNotNull(dateid)], ReadSchema: struct<dateid:bigint,caldate:string,day:string,week:bigint,month:string,qtr:string,year:bigint,ho...
   +- Exchange hashpartitioning(dateid#1981L, 1000), ENSURE_REQUIREMENTS, [plan_id=3235]
      +- Filter isnotnull(dateid#1981L)
         +- FileScan csv [salesid#1976L,listid#1977L,sellerid#1978L,buyerid#1979L,eventid#1980L,dateid#1981L,qtysold#1982L,pricepaid#1983,commission#1984,saletime#1985] Batched: false, DataFilters: [isnotnull(dateid#1981L)], Format: CSV, Location: InMemoryFileIndex(1 paths)[s3://XXXX/input/join/sales], PartitionFilters: [], PushedFilters: [IsNotNull(dateid)], ReadSchema: struct<salesid:bigint,listid:bigint,sellerid:bigint,buyerid:bigint,eventid:bigint,dateid:bigint,q...


+------+----------+---+----+-----+---+----+-------+-------+------+--------+-------+-------+------+-------+---------+----------+-------------------+
|dateid|   caldate|day|week|month|qtr|year|holiday|salesid|listid|sellerid|buyerid|eventid|dateid|qtysold|pricepaid|commission|           saletime|
+------+----------+---+----+-----+---+----+-------+-------+------+--------+-------+-------+------+-------+---------+----------+-------------------+
|  2184|2008-12-24| WE|  52|  DEC|  4|2008|  false|    492|   493|   29029|   7877|   8059|  2184|      3|      678|       102|12/24/2008 06:12:06|
+------+----------+---+----+-----+---+----+-------+-------+------+--------+-------+-------+------+-------+---------+----------+-------------------+
only showing top 1 row

execution time 7.8767030239105225

SHUFFLE_REPLICATE_NL(shuffle-and-replicate nested loop join)

start_time = time.time()

dateDF.createOrReplaceTempView("date11")
salesDF.createOrReplaceTempView("sales11")

joinDF = spark.sql("SELECT /*+ SHUFFLE_REPLICATE_NL(date11) */ * FROM date11 , sales11 WHERE date11.dateid = sales11.dateid")
joinDF.explain()
joinDF.show(1)

end_time = time.time()

print("execution time {0}".format(end_time-start_time))
== Physical Plan ==
CartesianProduct (dateid#1779L = dateid#1981L)
:- *(1) Filter isnotnull(dateid#1779L)
:  +- FileScan csv [dateid#1779L,caldate#1780,day#1781,week#1782L,month#1783,qtr#1784,year#1785L,holiday#1786] Batched: false, DataFilters: [isnotnull(dateid#1779L)], Format: CSV, Location: InMemoryFileIndex(1 paths)[s3://XXXXX/input/join/date], PartitionFilters: [], PushedFilters: [IsNotNull(dateid)], ReadSchema: struct<dateid:bigint,caldate:string,day:string,week:bigint,month:string,qtr:string,year:bigint,ho...
+- *(2) Filter isnotnull(dateid#1981L)
   +- FileScan csv [salesid#1976L,listid#1977L,sellerid#1978L,buyerid#1979L,eventid#1980L,dateid#1981L,qtysold#1982L,pricepaid#1983,commission#1984,saletime#1985] Batched: false, DataFilters: [isnotnull(dateid#1981L)], Format: CSV, Location: InMemoryFileIndex(1 paths)[s3://XXXXX/input/join/sales], PartitionFilters: [], PushedFilters: [IsNotNull(dateid)], ReadSchema: struct<salesid:bigint,listid:bigint,sellerid:bigint,buyerid:bigint,eventid:bigint,dateid:bigint,q...

考察

今回、下記のデータセットに対してヒント句を変えて、ジョインを試してみましたが、実行計画の変化は見られたものの、実行時間は大差ありませんでした。

  • やや大きめのデータセット(10MB)同士の結合
  • やや大きめのデータセット(10MB)と、小さめのデータセット(14KB)同士の結合

もう少し大きいデータを使用する。複雑なクエリを実行することで、違いが出るかもしれません。今後試して見たいと思います。

参考

2
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?