背景・目的
こちらのJoin Strategies in Apache Spark— Deep Diveや、Apache Spark Join Strategiesによると、Sparkでは、内部結合、外部結合、左結合、右結合、または半結合などジョインを行う際に全く異なる方法で処理される。結合時にさざまな戦略を採用しているとのこと。
本書では、Sparkのジョインにはどのようなものがあるのか。実行計画と時間がどの様に変わるのかを確認します。
まとめ
Sparkのジョインは、下記の違いがあります。
- Broadcast joinは、片方のデータセット全体を各Executorにばらまきジョインする
- shuffle sort merge joinは、両方のデータセットをキーごとにシャッフルし、ソート後にジョインする。
- shuffle hash joinは、両方のデータセットをキーごとにシャッフルし、ハッシュテーブルを作成しジョインする
- shuffle-and-replicate nested loop joinは、両方のデータセットをキーごとにシャッフルし、Nested Loopしジョインする。
概要
Sparkにおけるジョイン
Sparkでは、内部結合、外部結合、左結合、右結合、または半結合などジョインを行う際に全く異なる方法で処理される。
結合時にさざまな戦略を採用している。これらの戦略の内部について知ることは、Sparkのアプリケーションのパフォーマンスを向上させることに役立つ。
結合操作に影響を与える要因
- Dataset Size
- 結合に使用するデータセットのサイズは、結合操作のパフォーマンスに直接影響します。
- Condition of Join
- 結合条件は、データセット内のフィールドの論理比較から派生。 これらの条件は、等価条件 (=) または非等価条件 (>、<、≧、≤、<>) に分類できる。
- Type of Join
- 内部結合、外部結合、セミ結合、またはクロス結合に分類される結合のタイプがある
結合のための戦略
Sparkでは、上記の要素に基づいて、結合実行のための以下の戦略を作成した。
- Broadcast Hash Join
- Shuffle Hash Join
- Shuffle Sort Merge Join
- Cartesian Join
- Broadcast Nested Loop Join
Broadcast Hash Join
ブロードキャスト ハッシュ結合は、より小さいデータセットをワーカーノードにブロードキャストするという単純な戦略を採用している。これによりシャッフル コストが節約される。
このタイプの結合は、データセットの 1 つが巨大で、もう 1 つが通常 10 MB (デフォルト) 未満の小さい場合に非常に役立つとのこと。
ブロードキャストされたデータセットは、ドライバーノードとエグゼキューターノードが利用される。挙動は下記の通りです。
- ドライバーはまずエグゼキュータ側からデータセットを取得
- 次に、より大きなデータセットのパーティションが存在するすべてのワーカー ノードにデータセットをブロードキャストする。
- Full Outer Joinを除く等価結合のタイプがサポートされています。
-
spark.sql.autoBroadcastJoinThreshold (デフォルト値 10485760(10 MB)
は最大は 8GB まで - テーブルはドライバーノードだけでなくエグゼキューターノードにもキャッシュされる。
- 大規模なテーブルがブロードキャストされると、ネットワークに負荷がかかる操作となり、パフォーマンスの低下につながる。
Shuffle Hash Join
シャッフル ハッシュ結合には、シャッフル フェーズとハッシュ結合フェーズという 2 フェーズのプロセスが含まれる。 同じ結合キーを持つデータセットは同じ実行ノードに移動され、その後実行ノード上で小さい方のテーブルのハッシュ テーブルを作成し、ハッシュ結合を適用する。
- 結合キーはソート可能である必要はない。
- Full Outer Joinを除く等価結合条件と結合タイプがサポートされる
- 結合操作に参加するデータセット上でシャッフルとハッシュ テーブルの作成を伴うため、コストがかかる結合
- この結合を有効にするには、Shuffle Sort Merge Join を false に設定する必要がある。
spark.sql.join.preferSortMergeJoin=false (Default value is true)
Shuffle Sort Merge Join
シャッフル ソート マージ ジョインは、シャッフルとソートマージのフェーズが含まれる。同じ結合キーを持つデータセットは同じ実行ノードに移動され、実行ノード上でノード上のデータセットパーティションが結合キーによって並べ替えられ、結合キーに基づいてマージされる。
- Spark 2.3 以降の Apache Spark のデフォルトの結合戦略。
- これは、
spark.sql.join.preferSortMergeJoin=false
を使用して無効にできる。 - 結合キーは並べ替え可能である必要がある。
- すべての結合タイプがサポートされている。
Cartesian Join(デカルト結合)
参加しているデータセットが結合キー (条件付き) を指定していない場合は、デカルト積戦略が選択される。
- 内部結合タイプのみがサポート
- 等価結合条件と非等価結合条件をサポート
Broadcast Nested Loop Join
この結合戦略は、適切な結合メカニズムを選択できない場合に選択される。 つまり、結合条件とヒント タイプが指定されていない場合、この結合が選択される。
- 等価結合条件と非等価結合条件をサポート
- すべての結合タイプがサポート
- 非常にコストのかかる結合、Spark はブロードキャストできる適切なデータセットを探して結合を自動的に最適化する。
Spark Join Selection Strategy
Case-I) Equivalence Join Condition
等価結合条件の時には、次の順序でヒントを確認する。
- Broadcast Hash Join ヒント — 結合タイプがサポートされている場合は、ブロードキャスト ハッシュ結合を選択します。
- Sort Merge のヒント — 結合キーが並べ替え可能な場合は、並べ替えと結合を選択して結合します。
- Shuffle Hash Join ヒント — 結合タイプがサポートされている場合は、シャッフル ハッシュ結合を選択します。
- Shuffle Replicate NL Hint — 結合タイプが inner like の場合はデカルト積を選択します。
該当するヒントがない場合、次の順序でヒントを確認する。
- データセットの 1 つがブロードキャストできるほど小さく、結合タイプがサポートされている場合は、ブロードキャスト ハッシュ結合を選択します。
- 結合キーがソート可能な場合は、「Sort Merge Join」を選択します。
- データセットの 1 つがハッシュ テーブルを構築できるほど小さく、spark.sql.join.preferSortMergeJoin=false の場合は、[シャッフル ハッシュ結合] を選択します。
- 結合タイプが内部タイプの場合は、デカルト積結合を選択します。
- どのオプションも満たさない場合は、ブロードキャスト ネスト ループ結合を呼び出します。
Case-II) Non-Equivalence Join Conditions
次の順序で結合ヒントを確認してください。
- ブロードキャスト ヒント — ブロードキャストのネストされたループ結合を選択します。
- Shuffle Replicate NL Hint — 結合タイプが内部の場合はデカルト積を選択します。
実践
事前準備
サンプルデータベースのデータセットを使用します。
category
# category
categoryScehama = T.StructType([
T.StructField('catid', T.LongType()),
T.StructField('catgroup', T.StringType()),
T.StructField('catname', T.StringType()),
T.StructField('catdesc', T.StringType())
])
categoryDF = sparkSession.read.schema(categoryScehama).csv("s3://{0}/input/join/category/".format(bucketName))
categoryDF.show()
categoryDF.printSchema()
===
+-----+--------+---------+--------------------+
|catid|catgroup| catname| catdesc|
+-----+--------+---------+--------------------+
| 1| Sports| MLB|Major League Base...|
| 2| Sports| NHL|National Hockey L...|
| 3| Sports| NFL|National Football...|
| 4| Sports| NBA|National Basketba...|
| 5| Sports| MLS| Major League Soccer|
| 6| Shows| Musicals| Musical theatre|
| 7| Shows| Plays|All non-musical t...|
| 8| Shows| Opera|All opera and lig...|
| 9|Concerts| Pop|All rock and pop ...|
| 10|Concerts| Jazz|All jazz singers ...|
| 11|Concerts|Classical| All symphony|
+-----+--------+---------+--------------------+
root
|-- catid: long (nullable = true)
|-- catgroup: string (nullable = true)
|-- catname: string (nullable = true)
|-- catdesc: string (nullable = true)
date
# date
dateScehama = T.StructType([
T.StructField('dateid', T.LongType()),
T.StructField('caldate', T.StringType()),
T.StructField('day', T.StringType()),
T.StructField('week', T.LongType()),
T.StructField('month', T.StringType()),
T.StructField('qtr', T.StringType()),
T.StructField('year', T.LongType()),
T.StructField('holiday', T.BooleanType())
])
dateDF = sparkSession.read.schema(dateScehama).csv("s3://{0}/input/join/date/".format(bucketName))
dateDF.show()
dateDF.printSchema()
===
record count[365]
+------+----------+---+----+-----+---+----+-------+
|dateid| caldate|day|week|month|qtr|year|holiday|
+------+----------+---+----+-----+---+----+-------+
| 1827|2008-01-01| WE| 1| JAN| 1|2008| true|
| 1828|2008-01-02| TH| 1| JAN| 1|2008| false|
| 1829|2008-01-03| FR| 1| JAN| 1|2008| false|
| 1830|2008-01-04| SA| 2| JAN| 1|2008| false|
| 1831|2008-01-05| SU| 2| JAN| 1|2008| false|
| 1832|2008-01-06| MO| 2| JAN| 1|2008| false|
| 1833|2008-01-07| TU| 2| JAN| 1|2008| false|
| 1834|2008-01-08| WE| 2| JAN| 1|2008| false|
| 1835|2008-01-09| TH| 2| JAN| 1|2008| false|
| 1836|2008-01-10| FR| 2| JAN| 1|2008| false|
| 1837|2008-01-11| SA| 3| JAN| 1|2008| false|
| 1838|2008-01-12| SU| 3| JAN| 1|2008| false|
| 1839|2008-01-13| MO| 3| JAN| 1|2008| false|
| 1840|2008-01-14| TU| 3| JAN| 1|2008| false|
| 1841|2008-01-15| WE| 3| JAN| 1|2008| false|
| 1842|2008-01-16| TH| 3| JAN| 1|2008| false|
| 1843|2008-01-17| FR| 3| JAN| 1|2008| false|
| 1844|2008-01-18| SA| 4| JAN| 1|2008| false|
| 1845|2008-01-19| SU| 4| JAN| 1|2008| false|
| 1846|2008-01-20| MO| 4| JAN| 1|2008| false|
+------+----------+---+----+-----+---+----+-------+
only showing top 20 rows
root
|-- dateid: long (nullable = true)
|-- caldate: string (nullable = true)
|-- day: string (nullable = true)
|-- week: long (nullable = true)
|-- month: string (nullable = true)
|-- qtr: string (nullable = true)
|-- year: long (nullable = true)
|-- holiday: boolean (nullable = true)
event
# event
eventScehama = T.StructType([
T.StructField('eventid', T.LongType()),
T.StructField('venueid', T.LongType()),
T.StructField('catid', T.LongType()),
T.StructField('dateid', T.LongType()),
T.StructField('eventname', T.StringType()),
T.StructField('starttime', T.TimestampType())
])
eventDF = sparkSession.read.schema(eventScehama).csv("s3://{0}/input/join/event/".format(bucketName))
eventDF.show()
eventDF.printSchema()
===
+-------+-------+-----+------+--------------------+-------------------+
|eventid|venueid|catid|dateid| eventname| starttime|
+-------+-------+-----+------+--------------------+-------------------+
| 1| 305| 8| 1851| Gotterdammerung|2008-01-25 14:30:00|
| 2| 306| 8| 2114| Boris Godunov|2008-10-15 20:00:00|
| 3| 302| 8| 1935| Salome|2008-04-19 14:30:00|
| 4| 309| 8| 2090|La Cenerentola (C...|2008-09-21 14:30:00|
| 5| 302| 8| 1982| Il Trovatore|2008-06-05 19:00:00|
| 6| 308| 8| 2109| L Elisir d Amore|2008-10-10 19:30:00|
| 7| 309| 8| 1891| Doctor Atomic|2008-03-06 14:00:00|
| 8| 302| 8| 1832| The Magic Flute|2008-01-06 20:00:00|
| 9| 308| 8| 2087| The Fly|2008-09-18 19:30:00|
| 10| 305| 8| 2079| Rigoletto|2008-09-10 15:00:00|
| 11| 302| 8| 1952| Doctor Atomic|2008-05-01 19:30:00|
| 12| 303| 8| 2111| Ring Cycle|2008-10-12 15:00:00|
| 13| 301| 8| 2154| Lucia di Lammermoor|2008-11-24 15:00:00|
| 14| 303| 8| 2149| La Rondine|2008-11-19 19:30:00|
| 15| 307| 8| 2038| Die Walkure|2008-07-31 19:30:00|
| 16| 304| 8| 1987| La Gioconda|2008-06-10 14:00:00|
| 17| 301| 8| 1834| La Gioconda|2008-01-08 19:00:00|
| 18| 306| 8| 2152| Gotterdammerung|2008-11-22 19:00:00|
| 19| 302| 8| 2088| Boris Godunov|2008-09-19 14:30:00|
| 20| 303| 8| 2051| Boris Godunov|2008-08-13 19:30:00|
+-------+-------+-----+------+--------------------+-------------------+
only showing top 20 rows
root
|-- eventid: long (nullable = true)
|-- venueid: long (nullable = true)
|-- catid: long (nullable = true)
|-- dateid: long (nullable = true)
|-- eventname: string (nullable = true)
|-- starttime: timestamp (nullable = true)
listing
# listing
listingScehama = T.StructType([
T.StructField('listid', T.LongType()),
T.StructField('sellerid', T.LongType()),
T.StructField('eventid', T.LongType()),
T.StructField('dateid', T.LongType()),
T.StructField('numtickets', T.LongType()),
T.StructField('priceperticket', T.DecimalType()),
T.StructField('totalprice', T.DecimalType()),
T.StructField('listtime', T.TimestampType())
])
listingDF = sparkSession.read.schema(listingScehama).csv("s3://{0}/input/join/listing/".format(bucketName))
listingDF.show()
listingDF.printSchema()
===
record count[192497]
+------+--------+-------+------+----------+--------------+----------+-------------------+
|listid|sellerid|eventid|dateid|numtickets|priceperticket|totalprice| listtime|
+------+--------+-------+------+----------+--------------+----------+-------------------+
| 1| 36861| 7872| 1850| 10| 182| 1820|2008-01-24 06:43:29|
| 2| 16002| 4806| 1890| 7| 233| 1631|2008-03-05 12:25:29|
| 3| 21461| 4256| 2131| 2| 182| 364|2008-11-01 07:35:33|
| 4| 8117| 4337| 1970| 8| 38| 304|2008-05-24 01:18:37|
| 5| 1616| 8647| 1963| 4| 175| 700|2008-05-17 02:29:11|
| 6| 47402| 8240| 2053| 18| 77| 1386|2008-08-15 02:08:13|
| 7| 36551| 7801| 2145| 5| 30| 150|2008-11-15 09:38:15|
| 8| 11891| 8036| 2139| 6| 82| 492|2008-11-09 05:07:30|
| 9| 691| 554| 2078| 6| 135| 810|2008-09-09 08:03:36|
| 10| 24858| 3375| 1994| 16| 197| 3152|2008-06-17 09:44:54|
| 11| 41053| 3877| 2146| 5| 24| 120|2008-11-16 11:59:10|
| 12| 45635| 4769| 2032| 26| 65| 1690|2008-07-25 01:45:49|
| 13| 30606| 2147| 1883| 3| 172| 516|2008-02-26 05:04:06|
| 14| 12651| 2762| 2058| 8| 122| 976|2008-08-20 01:45:26|
| 15| 46833| 7910| 1947| 24| 75| 1800|2008-05-01 04:26:30|
| 16| 41902| 4571| 2062| 6| 231| 1386|2008-08-24 09:36:12|
| 17| 20228| 7837| 1865| 9| 167| 1503|2008-02-08 06:27:54|
| 18| 13013| 7085| 2049| 14| 219| 3066|2008-08-11 06:33:03|
| 19| 12339| 3423| 1986| 2| 53| 106|2008-06-09 08:39:56|
| 20| 6300| 2075| 1876| 7| 66| 462|2008-02-19 12:28:01|
+------+--------+-------+------+----------+--------------+----------+-------------------+
only showing top 20 rows
root
|-- listid: long (nullable = true)
|-- sellerid: long (nullable = true)
|-- eventid: long (nullable = true)
|-- dateid: long (nullable = true)
|-- numtickets: long (nullable = true)
|-- priceperticket: decimal(10,0) (nullable = true)
|-- totalprice: decimal(10,0) (nullable = true)
|-- listtime: timestamp (nullable = true)
sales
# sales
salesScehama = T.StructType([
T.StructField('salesid', T.LongType()),
T.StructField('listid', T.LongType()),
T.StructField('sellerid', T.LongType()),
T.StructField('buyerid', T.LongType()),
T.StructField('eventid', T.LongType()),
T.StructField('dateid', T.LongType()),
T.StructField('qtysold', T.LongType()),
T.StructField('pricepaid', T.DecimalType()),
T.StructField('commission', T.DecimalType()),
T.StructField('saletime', T.StringType()),
])
salesDF = sparkSession.read.schema(salesScehama).csv("s3://{0}/input/join/sales/".format(bucketName))
print("record count[{0}]".format(salesDF.count()))
salesDF.show()
salesDF.printSchema()
===
record count[172456]
+-------+------+--------+-------+-------+------+-------+---------+----------+------------------+
|salesid|listid|sellerid|buyerid|eventid|dateid|qtysold|pricepaid|commission| saletime|
+-------+------+--------+-------+-------+------+-------+---------+----------+------------------+
| 1| 1| 36861| 21191| 7872| 1875| 4| 728| 109|2/18/2008 02:36:48|
| 2| 4| 8117| 11498| 4337| 1983| 2| 76| 11| 6/6/2008 05:00:16|
| 3| 5| 1616| 17433| 8647| 1983| 2| 350| 53| 6/6/2008 08:26:17|
| 4| 5| 1616| 19715| 8647| 1986| 1| 175| 26| 6/9/2008 08:38:52|
| 5| 6| 47402| 14115| 8240| 2069| 2| 154| 23|8/31/2008 09:17:02|
| 6| 10| 24858| 24888| 3375| 2023| 2| 394| 59|7/16/2008 11:59:24|
| 7| 10| 24858| 7952| 3375| 2003| 4| 788| 118|6/26/2008 12:56:06|
| 8| 10| 24858| 19715| 3375| 2017| 1| 197| 30|7/10/2008 02:12:36|
| 9| 10| 24858| 29891| 3375| 2029| 3| 591| 89|7/22/2008 02:23:17|
| 10| 12| 45635| 10542| 4769| 2044| 1| 65| 10| 8/6/2008 02:51:55|
| 11| 12| 45635| 8435| 4769| 2042| 2| 130| 20| 8/4/2008 03:06:36|
| 12| 13| 30606| 9633| 2147| 1894| 2| 344| 52| 3/9/2008 03:18:56|
| 13| 14| 12651| 1664| 2762| 2059| 2| 244| 37|8/21/2008 03:39:06|
| 14| 15| 46833| 2912| 7910| 1950| 4| 300| 45| 5/4/2008 05:39:08|
| 15| 15| 46833| 19715| 7910| 1970| 1| 75| 11|5/24/2008 06:21:47|
| 16| 17| 20228| 36376| 7837| 1908| 2| 334| 50|3/23/2008 06:27:34|
| 17| 17| 20228| 39913| 7837| 1912| 1| 167| 25|3/27/2008 06:40:29|
| 18| 20| 6300| 23690| 2075| 1904| 1| 66| 10|3/19/2008 06:46:46|
| 19| 20| 6300| 32200| 2075| 1914| 4| 264| 40|3/29/2008 06:46:57|
| 20| 20| 6300| 25316| 2075| 1906| 2| 132| 20|3/21/2008 07:01:03|
+-------+------+--------+-------+-------+------+-------+---------+----------+------------------+
only showing top 20 rows
root
|-- salesid: long (nullable = true)
|-- listid: long (nullable = true)
|-- sellerid: long (nullable = true)
|-- buyerid: long (nullable = true)
|-- eventid: long (nullable = true)
|-- dateid: long (nullable = true)
|-- qtysold: long (nullable = true)
|-- pricepaid: decimal(10,0) (nullable = true)
|-- commission: decimal(10,0) (nullable = true)
|-- saletime: string (nullable = true)
users
# users
usersScehama = T.StructType([
T.StructField('userid', T.LongType()),
T.StructField('username', T.StringType()),
T.StructField('firstname', T.StringType()),
T.StructField('lastname', T.StringType()),
T.StructField('city', T.StringType()),
T.StructField('state', T.StringType()),
T.StructField('email', T.StringType()),
T.StructField('phone', T.StringType()),
T.StructField('likesports', T.BooleanType()),
T.StructField('liketheatre', T.BooleanType()),
T.StructField('likeconcerts', T.BooleanType()),
T.StructField('likejazz', T.BooleanType()),
T.StructField('likeclassical', T.BooleanType()),
T.StructField('likeopera', T.BooleanType()),
T.StructField('likerock', T.BooleanType()),
T.StructField('likevegas', T.BooleanType()),
T.StructField('likebroadway', T.BooleanType()),
T.StructField('likemusicals', T.BooleanType()),
])
usersDF = sparkSession.read.schema(usersScehama).csv("s3://{0}/input/join/users/".format(bucketName))
print("record count[{0}]".format(usersDF.count()))
usersDF.show()
usersDF.printSchema()
===
record count[49990]
+------+--------+---------+---------+------------+-----+--------------------+--------------+----------+-----------+------------+--------+-------------+---------+--------+---------+------------+------------+
|userid|username|firstname| lastname| city|state| email| phone|likesports|liketheatre|likeconcerts|likejazz|likeclassical|likeopera|likerock|likevegas|likebroadway|likemusicals|
+------+--------+---------+---------+------------+-----+--------------------+--------------+----------+-----------+------------+--------+-------------+---------+--------+---------+------------+------------+
| 1|JSG99FHE| Rafael| Taylor| Kent| WA|Etiam.laoreet.lib...|(664) 602-4412| true| true| null| false| true| null| null| true| false| true|
| 2|PGL08LJI| Vladimir| Humphrey|Murfreesboro| SK|Suspendisse.trist...|(783) 492-1886| null| null| null| true| true| null| null| true| false| true|
| 3|IFT66TXU| Lars| Ratliff| High Point| ME|amet.faucibus.ut@...|(624) 767-2465| true| false| null| false| null| false| true| null| null| true|
| 4|XDZ38RDD| Barry| Roy| Omaha| AB| sed@lacusUtnec.ca|(355) 452-8168| false| true| null| false| null| null| null| null| null| false|
| 5|AEB55QTM| Reagan| Hodge| Forest Lake| NS| Cum@accumsan.com|(476) 519-9131| null| null| true| false| null| null| true| true| false| true|
| 6|NDQ15VBM| Victor|Hernandez| Naperville| GA|turpis@accumsanla...|(818) 765-4255| false| null| null| true| null| true| true| true| true| true|
| 7|OWY35QYB| Tamekah| Juarez| Moultrie| WV|elementum@semperp...|(297) 875-7247| null| null| null| true| true| false| null| null| false| false|
| 8|AZG78YIP| Colton| Roy| Guayama| AK|ullamcorper.nisl@...|(998) 934-9210| null| null| true| true| null| true| false| null| false| false|
| 9|MSD36KVR| Mufutau| Watkins| Port Orford| MD|Integer.mollis.In...|(725) 719-7670| true| false| null| false| true| null| null| null| false| true|
| 10|WKW41AIW| Naida| Calderon| Waterbury| MB|Donec.fringilla@s...|(197) 726-8249| false| false| false| null| false| true| null| true| null| null|
| 11|MFN29TYU| Anika| Huff| Rawlins| MT|arcu.Curabitur@se...|(419) 147-8207| null| null| null| true| null| true| null| null| null| null|
| 12|FVK28WAS| Bruce| Beck| Kona| OH| ac@velit.ca|(617) 527-9908| null| null| false| null| null| false| null| null| null| null|
| 13|QTF33MCG| Henry| Cochran|Bossier City| QC|Aliquam.vulputate...|(783) 105-0989| null| true| null| null| null| null| true| true| true| null|
| 14|OVQ88RKY| Mallory| Farrell| Villa Park| ID|vel.est@veliteges...|(711) 160-7386| null| null| null| null| null| true| null| null| false| null|
| 15|OWU78MTR| Scarlett| Mayer| Gadsden| GA|lorem.ipsum@Vesti...|(189) 882-8412| true| false| true| null| null| true| null| null| true| null|
| 16|ZMG93CDD| Kieran| Drake| Hot Springs| BC|molestie.tellus@d...|(192) 914-0016| null| true| true| null| false| null| true| true| null| false|
| 17|WWZ18EOX| Cody| Moss| Mobile| ON|dolor.nonummy@ips...|(412) 488-2896| false| false| null| null| null| null| false| null| false| false|
| 18|VDP05MXU| Germaine| Valdez| Kokomo| WY|cursus.Integer@ar...|(998) 879-8668| null| true| true| null| true| true| null| false| true| true|
| 19|CXQ97IWP| Amal| Landry| Lomita| NT| euismod@turpis.org|(891) 526-1468| null| false| true| null| true| null| null| false| false| true|
| 20|VJT98ZQY| Jane| Dyer| Sharon| NH| et@Nunclaoreet.ca|(411) 638-5867| null| true| null| false| null| false| null| null| true| false|
+------+--------+---------+---------+------------+-----+--------------------+--------------+----------+-----------+------------+--------+-------------+---------+--------+---------+------------+------------+
only showing top 20 rows
root
|-- userid: long (nullable = true)
|-- username: string (nullable = true)
|-- firstname: string (nullable = true)
|-- lastname: string (nullable = true)
|-- city: string (nullable = true)
|-- state: string (nullable = true)
|-- email: string (nullable = true)
|-- phone: string (nullable = true)
|-- likesports: boolean (nullable = true)
|-- liketheatre: boolean (nullable = true)
|-- likeconcerts: boolean (nullable = true)
|-- likejazz: boolean (nullable = true)
|-- likeclassical: boolean (nullable = true)
|-- likeopera: boolean (nullable = true)
|-- likerock: boolean (nullable = true)
|-- likevegas: boolean (nullable = true)
|-- likebroadway: boolean (nullable = true)
|-- likemusicals: boolean (nullable = true)
venue
# venue
venueScehama = T.StructType([
T.StructField('venueid', T.LongType()),
T.StructField('venuename', T.StringType()),
T.StructField('venuecity', T.StringType()),
T.StructField('venuestate', T.StringType()),
T.StructField('venueseats', T.LongType())
])
venueDF = sparkSession.read.schema(venueScehama).csv("s3://{0}/input/join/venue/".format(bucketName))
print("record count[{0}]".format(venueDF.count()))
venueDF.show()
venueDF.printSchema()
===
record count[202]
+-------+--------------------+---------------+----------+----------+
|venueid| venuename| venuecity|venuestate|venueseats|
+-------+--------------------+---------------+----------+----------+
| 1| Toyota Park| Bridgeview| IL| 0|
| 2|Columbus Crew Sta...| Columbus| OH| 0|
| 3| RFK Stadium| Washington| DC| 0|
| 4|CommunityAmerica ...| Kansas City| KS| 0|
| 5| Gillette Stadium| Foxborough| MA| 68756|
| 6|New York Giants S...|East Rutherford| NJ| 80242|
| 7| BMO Field| Toronto| ON| 0|
| 8|The Home Depot Ce...| Carson| CA| 0|
| 9|Dick's Sporting G...| Commerce City| CO| 0|
| 10| Pizza Hut Park| Frisco| TX| 0|
| 11| Robertson Stadium| Houston| TX| 0|
| 13| Rice-Eccles Stadium| Salt Lake City| UT| 0|
| 14| Buck Shaw Stadium| Santa Clara| CA| 0|
| 15| McAfee Coliseum| Oakland| CA| 63026|
| 16| TD Banknorth Garden| Boston| MA| 0|
| 17| Izod Center|East Rutherford| NJ| 0|
| 18|Madison Square Ga...| New York City| NY| 20000|
| 19| Wachovia Center| Philadelphia| PA| 0|
| 20| Air Canada Centre| Toronto| ON| 0|
| 21| United Center| Chicago| IL| 0|
+-------+--------------------+---------------+----------+----------+
only showing top 20 rows
root
|-- venueid: long (nullable = true)
|-- venuename: string (nullable = true)
|-- venuecity: string (nullable = true)
|-- venuestate: string (nullable = true)
|-- venueseats: long (nullable = true)
シナリオ
それぞれのデータ量とリレーションは、下記のとおりです。
データセット名 | データ件数 | データサイズ |
---|---|---|
category | 11 | 465B |
date | 365 | 14.2KB |
event | 8,798 | 435.4KB |
listing | 192,497 | 11MB |
sales | 172,456 | 10.7MB |
users | 49,990 | 5.6MB |
venue | 202 | 7.8KB |
※ サンプルデータベースから引用。
- 最もデータ量が多い、salesとlistingを使用して検証します。LISTIDでジョインします。
- 比較的小さいDATEと、大きいSALESを使用して検証します。dateidでジョインします。
- SALES
- SALESIDがプライマリキー。各行の一意の ID 値。各行は、特定のイベントの 1 枚以上のチケットの各販売を表します (特定のリストで提供)。
- LISTIDがLISTINGの外部キー
- LISTING
- LISTIDがプライマリキー。各行の一意の ID 値。各行は、特定のイベントの一連のチケットをリストで表します。
- DATE
- プライマリキー。各行の一意の ID 値。各行は、1 暦年の中の各日を表します。
各ジョインで、ヒントを与えて下記について確認します。
- 実行計画(Physical Plan)
- 実行時間
両方ともデータが大きい場合
ヒントなし
start_time = time.time()
listingDF.createOrReplaceTempView("listing2")
salesDF.createOrReplaceTempView("sales2")
joinDF = spark.sql("SELECT * FROM listing2 , sales2 WHERE listing2.listid = sales2.listid")
joinDF.explain()
joinDF.show(1)
end_time = time.time()
print("execution time {0}".format(end_time-start_time))
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- SortMergeJoin [listid#1905L], [listid#1977L], Inner
:- Sort [listid#1905L ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(listid#1905L, 1000), ENSURE_REQUIREMENTS, [plan_id=1934]
: +- Filter isnotnull(listid#1905L)
: +- FileScan csv [listid#1905L,sellerid#1906L,eventid#1907L,dateid#1908L,numtickets#1909L,priceperticket#1910,totalprice#1911,listtime#1912] Batched: false, DataFilters: [isnotnull(listid#1905L)], Format: CSV, Location: InMemoryFileIndex(1 paths)[s3://XXXXX/input/join/listing], PartitionFilters: [], PushedFilters: [IsNotNull(listid)], ReadSchema: struct<listid:bigint,sellerid:bigint,eventid:bigint,dateid:bigint,numtickets:bigint,priceperticke...
+- Sort [listid#1977L ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(listid#1977L, 1000), ENSURE_REQUIREMENTS, [plan_id=1935]
+- Filter isnotnull(listid#1977L)
+- FileScan csv [salesid#1976L,listid#1977L,sellerid#1978L,buyerid#1979L,eventid#1980L,dateid#1981L,qtysold#1982L,pricepaid#1983,commission#1984,saletime#1985] Batched: false, DataFilters: [isnotnull(listid#1977L)], Format: CSV, Location: InMemoryFileIndex(1 paths)[s3://XXXXX/input/join/sales], PartitionFilters: [], PushedFilters: [IsNotNull(listid)], ReadSchema: struct<salesid:bigint,listid:bigint,sellerid:bigint,buyerid:bigint,eventid:bigint,dateid:bigint,q...
+------+--------+-------+------+----------+--------------+----------+-------------------+-------+------+--------+-------+-------+------+-------+---------+----------+------------------+
|listid|sellerid|eventid|dateid|numtickets|priceperticket|totalprice| listtime|salesid|listid|sellerid|buyerid|eventid|dateid|qtysold|pricepaid|commission| saletime|
+------+--------+-------+------+----------+--------------+----------+-------------------+-------+------+--------+-------+-------+------+-------+---------+----------+------------------+
| 94| 23652| 6117| 2047| 3| 37| 111|2008-08-09 04:49:27| 75| 94| 23652| 28081| 6117| 2080| 2| 74| 11|9/11/2008 08:42:27|
+------+--------+-------+------+----------+--------------+----------+-------------------+-------+------+--------+-------+-------+------+-------+---------+----------+------------------+
only showing top 1 row
execution time 11.872079372406006
BROADCAST
start_time = time.time()
listingDF.createOrReplaceTempView("listing3")
salesDF.createOrReplaceTempView("sales3")
joinDF = spark.sql("SELECT /*+ BROADCAST(listing3) */ * FROM listing3 , sales3 WHERE listing3.listid = sales3.listid")
joinDF.explain()
joinDF.show(1)
end_time = time.time()
print("execution time {0}".format(end_time-start_time))
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- BroadcastHashJoin [listid#1905L], [listid#1977L], Inner, BuildLeft, false
:- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]),false), [plan_id=2151]
: +- Filter isnotnull(listid#1905L)
: +- FileScan csv [listid#1905L,sellerid#1906L,eventid#1907L,dateid#1908L,numtickets#1909L,priceperticket#1910,totalprice#1911,listtime#1912] Batched: false, DataFilters: [isnotnull(listid#1905L)], Format: CSV, Location: InMemoryFileIndex(1 paths)[s3://XXXXX/input/join/listing], PartitionFilters: [], PushedFilters: [IsNotNull(listid)], ReadSchema: struct<listid:bigint,sellerid:bigint,eventid:bigint,dateid:bigint,numtickets:bigint,priceperticke...
+- Filter isnotnull(listid#1977L)
+- FileScan csv [salesid#1976L,listid#1977L,sellerid#1978L,buyerid#1979L,eventid#1980L,dateid#1981L,qtysold#1982L,pricepaid#1983,commission#1984,saletime#1985] Batched: false, DataFilters: [isnotnull(listid#1977L)], Format: CSV, Location: InMemoryFileIndex(1 paths)[s3://XXXXX/input/join/sales], PartitionFilters: [], PushedFilters: [IsNotNull(listid)], ReadSchema: struct<salesid:bigint,listid:bigint,sellerid:bigint,buyerid:bigint,eventid:bigint,dateid:bigint,q...
+------+--------+-------+------+----------+--------------+----------+-------------------+-------+------+--------+-------+-------+------+-------+---------+----------+------------------+
|listid|sellerid|eventid|dateid|numtickets|priceperticket|totalprice| listtime|salesid|listid|sellerid|buyerid|eventid|dateid|qtysold|pricepaid|commission| saletime|
+------+--------+-------+------+----------+--------------+----------+-------------------+-------+------+--------+-------+-------+------+-------+---------+----------+------------------+
| 1| 36861| 7872| 1850| 10| 182| 1820|2008-01-24 06:43:29| 1| 1| 36861| 21191| 7872| 1875| 4| 728| 109|2/18/2008 02:36:48|
+------+--------+-------+------+----------+--------------+----------+-------------------+-------+------+--------+-------+-------+------+-------+---------+----------+------------------+
only showing top 1 row
execution time 7.517160415649414
MERGE(shuffle sort merge join)
start_time = time.time()
listingDF.createOrReplaceTempView("listing4")
salesDF.createOrReplaceTempView("sales4")
joinDF = spark.sql("SELECT /*+ SHUFFLE_MERGE(listing4) */ * FROM listing4 , sales4 WHERE listing4.listid = sales4.listid")
joinDF.explain()
joinDF.show(1)
end_time = time.time()
print("execution time {0}".format(end_time-start_time))
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- SortMergeJoin [listid#1905L], [listid#1977L], Inner
:- Sort [listid#1905L ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(listid#1905L, 1000), ENSURE_REQUIREMENTS, [plan_id=2282]
: +- Filter isnotnull(listid#1905L)
: +- FileScan csv [listid#1905L,sellerid#1906L,eventid#1907L,dateid#1908L,numtickets#1909L,priceperticket#1910,totalprice#1911,listtime#1912] Batched: false, DataFilters: [isnotnull(listid#1905L)], Format: CSV, Location: InMemoryFileIndex(1 paths)[s3://XXXX/input/join/listing], PartitionFilters: [], PushedFilters: [IsNotNull(listid)], ReadSchema: struct<listid:bigint,sellerid:bigint,eventid:bigint,dateid:bigint,numtickets:bigint,priceperticke...
+- Sort [listid#1977L ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(listid#1977L, 1000), ENSURE_REQUIREMENTS, [plan_id=2283]
+- Filter isnotnull(listid#1977L)
+- FileScan csv [salesid#1976L,listid#1977L,sellerid#1978L,buyerid#1979L,eventid#1980L,dateid#1981L,qtysold#1982L,pricepaid#1983,commission#1984,saletime#1985] Batched: false, DataFilters: [isnotnull(listid#1977L)], Format: CSV, Location: InMemoryFileIndex(1 paths)[s3://XXXXX/input/join/sales], PartitionFilters: [], PushedFilters: [IsNotNull(listid)], ReadSchema: struct<salesid:bigint,listid:bigint,sellerid:bigint,buyerid:bigint,eventid:bigint,dateid:bigint,q...
+------+--------+-------+------+----------+--------------+----------+-------------------+-------+------+--------+-------+-------+------+-------+---------+----------+------------------+
|listid|sellerid|eventid|dateid|numtickets|priceperticket|totalprice| listtime|salesid|listid|sellerid|buyerid|eventid|dateid|qtysold|pricepaid|commission| saletime|
+------+--------+-------+------+----------+--------------+----------+-------------------+-------+------+--------+-------+-------+------+-------+---------+----------+------------------+
| 94| 23652| 6117| 2047| 3| 37| 111|2008-08-09 04:49:27| 75| 94| 23652| 28081| 6117| 2080| 2| 74| 11|9/11/2008 08:42:27|
+------+--------+-------+------+----------+--------------+----------+-------------------+-------+------+--------+-------+-------+------+-------+---------+----------+------------------+
only showing top 1 row
execution time 11.739749908447266
SHUFFLE_HASH(shuffle hash join)
start_time = time.time()
listingDF.createOrReplaceTempView("listing5")
salesDF.createOrReplaceTempView("sales5")
joinDF = spark.sql("SELECT /*+ SHUFFLE_HASH(listing5) */ * FROM listing5 , sales5 WHERE listing5.listid = sales5.listid")
joinDF.explain()
joinDF.show(1)
end_time = time.time()
print("execution time {0}".format(end_time-start_time))
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- ShuffledHashJoin [listid#1905L], [listid#1977L], Inner, BuildLeft
:- Exchange hashpartitioning(listid#1905L, 1000), ENSURE_REQUIREMENTS, [plan_id=2500]
: +- Filter isnotnull(listid#1905L)
: +- FileScan csv [listid#1905L,sellerid#1906L,eventid#1907L,dateid#1908L,numtickets#1909L,priceperticket#1910,totalprice#1911,listtime#1912] Batched: false, DataFilters: [isnotnull(listid#1905L)], Format: CSV, Location: InMemoryFileIndex(1 paths)[s3://XXXXX/input/join/listing], PartitionFilters: [], PushedFilters: [IsNotNull(listid)], ReadSchema: struct<listid:bigint,sellerid:bigint,eventid:bigint,dateid:bigint,numtickets:bigint,priceperticke...
+- Exchange hashpartitioning(listid#1977L, 1000), ENSURE_REQUIREMENTS, [plan_id=2501]
+- Filter isnotnull(listid#1977L)
+- FileScan csv [salesid#1976L,listid#1977L,sellerid#1978L,buyerid#1979L,eventid#1980L,dateid#1981L,qtysold#1982L,pricepaid#1983,commission#1984,saletime#1985] Batched: false, DataFilters: [isnotnull(listid#1977L)], Format: CSV, Location: InMemoryFileIndex(1 paths)[s3://XXXX/input/join/sales], PartitionFilters: [], PushedFilters: [IsNotNull(listid)], ReadSchema: struct<salesid:bigint,listid:bigint,sellerid:bigint,buyerid:bigint,eventid:bigint,dateid:bigint,q...
+------+--------+-------+------+----------+--------------+----------+-------------------+-------+------+--------+-------+-------+------+-------+---------+----------+-----------------+
|listid|sellerid|eventid|dateid|numtickets|priceperticket|totalprice| listtime|salesid|listid|sellerid|buyerid|eventid|dateid|qtysold|pricepaid|commission| saletime|
+------+--------+-------+------+----------+--------------+----------+-------------------+-------+------+--------+-------+-------+------+-------+---------+----------+-----------------+
| 964| 22048| 6058| 1946| 10| 189| 1890|2008-04-30 09:46:24| 960| 964| 22048| 1816| 6058| 1948| 1| 189| 28|5/2/2008 05:23:09|
+------+--------+-------+------+----------+--------------+----------+-------------------+-------+------+--------+-------+-------+------+-------+---------+----------+-----------------+
only showing top 1 row
execution time 7.968986511230469
SHUFFLE_REPLICATE_NL(shuffle-and-replicate nested loop join)
start_time = time.time()
listingDF.createOrReplaceTempView("listing6")
salesDF.createOrReplaceTempView("sales6")
joinDF = spark.sql("SELECT /*+ SHUFFLE_REPLICATE_NL(listing6) */ * FROM listing6 , sales6 WHERE listing6.listid = sales6.listid")
joinDF.explain()
joinDF.show(1)
end_time = time.time()
print("execution time {0}".format(end_time-start_time))
== Physical Plan ==
CartesianProduct (listid#1905L = listid#1977L)
:- *(1) Filter isnotnull(listid#1905L)
: +- FileScan csv [listid#1905L,sellerid#1906L,eventid#1907L,dateid#1908L,numtickets#1909L,priceperticket#1910,totalprice#1911,listtime#1912] Batched: false, DataFilters: [isnotnull(listid#1905L)], Format: CSV, Location: InMemoryFileIndex(1 paths)[s3://XXXX/input/join/listing], PartitionFilters: [], PushedFilters: [IsNotNull(listid)], ReadSchema: struct<listid:bigint,sellerid:bigint,eventid:bigint,dateid:bigint,numtickets:bigint,priceperticke...
+- *(2) Filter isnotnull(listid#1977L)
+- FileScan csv [salesid#1976L,listid#1977L,sellerid#1978L,buyerid#1979L,eventid#1980L,dateid#1981L,qtysold#1982L,pricepaid#1983,commission#1984,saletime#1985] Batched: false, DataFilters: [isnotnull(listid#1977L)], Format: CSV, Location: InMemoryFileIndex(1 paths)[s3://XXXXX/input/join/sales], PartitionFilters: [], PushedFilters: [IsNotNull(listid)], ReadSchema: struct<salesid:bigint,listid:bigint,sellerid:bigint,buyerid:bigint,eventid:bigint,dateid:bigint,q...
+------+--------+-------+------+----------+--------------+----------+-------------------+-------+------+--------+-------+-------+------+-------+---------+----------+------------------+
|listid|sellerid|eventid|dateid|numtickets|priceperticket|totalprice| listtime|salesid|listid|sellerid|buyerid|eventid|dateid|qtysold|pricepaid|commission| saletime|
+------+--------+-------+------+----------+--------------+----------+-------------------+-------+------+--------+-------+-------+------+-------+---------+----------+------------------+
| 1| 36861| 7872| 1850| 10| 182| 1820|2008-01-24 06:43:29| 1| 1| 36861| 21191| 7872| 1875| 4| 728| 109|2/18/2008 02:36:48|
+------+--------+-------+------+----------+--------------+----------+-------------------+-------+------+--------+-------+-------+------+-------+---------+----------+------------------+
only showing top 1 row
execution time 7.117904901504517
片方が小さく、片方が大きい場合
ヒントなし
start_time = time.time()
dateDF.createOrReplaceTempView("date7")
salesDF.createOrReplaceTempView("sales7")
joinDF = spark.sql("SELECT * FROM date7 , sales7 WHERE date7.dateid = sales7.dateid")
joinDF.explain()
joinDF.show(1)
end_time = time.time()
print("execution time {0}".format(end_time-start_time))
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- BroadcastHashJoin [dateid#1779L], [dateid#1981L], Inner, BuildLeft, false
:- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]),false), [plan_id=2755]
: +- Filter isnotnull(dateid#1779L)
: +- FileScan csv [dateid#1779L,caldate#1780,day#1781,week#1782L,month#1783,qtr#1784,year#1785L,holiday#1786] Batched: false, DataFilters: [isnotnull(dateid#1779L)], Format: CSV, Location: InMemoryFileIndex(1 paths)[s3://XXXX/input/join/date], PartitionFilters: [], PushedFilters: [IsNotNull(dateid)], ReadSchema: struct<dateid:bigint,caldate:string,day:string,week:bigint,month:string,qtr:string,year:bigint,ho...
+- Filter isnotnull(dateid#1981L)
+- FileScan csv [salesid#1976L,listid#1977L,sellerid#1978L,buyerid#1979L,eventid#1980L,dateid#1981L,qtysold#1982L,pricepaid#1983,commission#1984,saletime#1985] Batched: false, DataFilters: [isnotnull(dateid#1981L)], Format: CSV, Location: InMemoryFileIndex(1 paths)[s3://XXXX/input/join/sales], PartitionFilters: [], PushedFilters: [IsNotNull(dateid)], ReadSchema: struct<salesid:bigint,listid:bigint,sellerid:bigint,buyerid:bigint,eventid:bigint,dateid:bigint,q...
+------+----------+---+----+-----+---+----+-------+-------+------+--------+-------+-------+------+-------+---------+----------+------------------+
|dateid| caldate|day|week|month|qtr|year|holiday|salesid|listid|sellerid|buyerid|eventid|dateid|qtysold|pricepaid|commission| saletime|
+------+----------+---+----+-----+---+----+-------+-------+------+--------+-------+-------+------+-------+---------+----------+------------------+
| 1875|2008-02-18| TU| 8| FEB| 1|2008| false| 1| 1| 36861| 21191| 7872| 1875| 4| 728| 109|2/18/2008 02:36:48|
+------+----------+---+----+-----+---+----+-------+-------+------+--------+-------+-------+------+-------+---------+----------+------------------+
only showing top 1 row
execution time 6.243089914321899
BROADCAST
start_time = time.time()
dateDF.createOrReplaceTempView("date8")
salesDF.createOrReplaceTempView("sales8")
joinDF = spark.sql("SELECT /*+ BROADCAST(date8) */ * FROM date8 , sales8 WHERE date8.dateid = sales8.dateid")
joinDF.explain()
joinDF.show(1)
end_time = time.time()
print("execution time {0}".format(end_time-start_time))
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- BroadcastHashJoin [dateid#1779L], [dateid#1981L], Inner, BuildLeft, false
:- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]),false), [plan_id=2885]
: +- Filter isnotnull(dateid#1779L)
: +- FileScan csv [dateid#1779L,caldate#1780,day#1781,week#1782L,month#1783,qtr#1784,year#1785L,holiday#1786] Batched: false, DataFilters: [isnotnull(dateid#1779L)], Format: CSV, Location: InMemoryFileIndex(1 paths)[s3://XXXX/input/join/date], PartitionFilters: [], PushedFilters: [IsNotNull(dateid)], ReadSchema: struct<dateid:bigint,caldate:string,day:string,week:bigint,month:string,qtr:string,year:bigint,ho...
+- Filter isnotnull(dateid#1981L)
+- FileScan csv [salesid#1976L,listid#1977L,sellerid#1978L,buyerid#1979L,eventid#1980L,dateid#1981L,qtysold#1982L,pricepaid#1983,commission#1984,saletime#1985] Batched: false, DataFilters: [isnotnull(dateid#1981L)], Format: CSV, Location: InMemoryFileIndex(1 paths)[s3://XXXXX/input/join/sales], PartitionFilters: [], PushedFilters: [IsNotNull(dateid)], ReadSchema: struct<salesid:bigint,listid:bigint,sellerid:bigint,buyerid:bigint,eventid:bigint,dateid:bigint,q...
+------+----------+---+----+-----+---+----+-------+-------+------+--------+-------+-------+------+-------+---------+----------+------------------+
|dateid| caldate|day|week|month|qtr|year|holiday|salesid|listid|sellerid|buyerid|eventid|dateid|qtysold|pricepaid|commission| saletime|
+------+----------+---+----+-----+---+----+-------+-------+------+--------+-------+-------+------+-------+---------+----------+------------------+
| 1875|2008-02-18| TU| 8| FEB| 1|2008| false| 1| 1| 36861| 21191| 7872| 1875| 4| 728| 109|2/18/2008 02:36:48|
+------+----------+---+----+-----+---+----+-------+-------+------+--------+-------+-------+------+-------+---------+----------+------------------+
only showing top 1 row
execution time 6.2336790561676025
MERGE(shuffle sort merge join)
start_time = time.time()
dateDF.createOrReplaceTempView("date9")
salesDF.createOrReplaceTempView("sales9")
joinDF = spark.sql("SELECT /*+ SHUFFLE_MERGE(date9) */ * FROM date9 , sales9 WHERE date9.dateid = sales9.dateid")
joinDF.explain()
joinDF.show(1)
end_time = time.time()
print("execution time {0}".format(end_time-start_time))
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- SortMergeJoin [dateid#1779L], [dateid#1981L], Inner
:- Sort [dateid#1779L ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(dateid#1779L, 1000), ENSURE_REQUIREMENTS, [plan_id=3016]
: +- Filter isnotnull(dateid#1779L)
: +- FileScan csv [dateid#1779L,caldate#1780,day#1781,week#1782L,month#1783,qtr#1784,year#1785L,holiday#1786] Batched: false, DataFilters: [isnotnull(dateid#1779L)], Format: CSV, Location: InMemoryFileIndex(1 paths)[s3://XXXXX/input/join/date], PartitionFilters: [], PushedFilters: [IsNotNull(dateid)], ReadSchema: struct<dateid:bigint,caldate:string,day:string,week:bigint,month:string,qtr:string,year:bigint,ho...
+- Sort [dateid#1981L ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(dateid#1981L, 1000), ENSURE_REQUIREMENTS, [plan_id=3017]
+- Filter isnotnull(dateid#1981L)
+- FileScan csv [salesid#1976L,listid#1977L,sellerid#1978L,buyerid#1979L,eventid#1980L,dateid#1981L,qtysold#1982L,pricepaid#1983,commission#1984,saletime#1985] Batched: false, DataFilters: [isnotnull(dateid#1981L)], Format: CSV, Location: InMemoryFileIndex(1 paths)[s3://XXXXX/input/join/sales], PartitionFilters: [], PushedFilters: [IsNotNull(dateid)], ReadSchema: struct<salesid:bigint,listid:bigint,sellerid:bigint,buyerid:bigint,eventid:bigint,dateid:bigint,q...
+------+----------+---+----+-----+---+----+-------+-------+------+--------+-------+-------+------+-------+---------+----------+------------------+
|dateid| caldate|day|week|month|qtr|year|holiday|salesid|listid|sellerid|buyerid|eventid|dateid|qtysold|pricepaid|commission| saletime|
+------+----------+---+----+-----+---+----+-------+-------+------+--------+-------+-------+------+-------+---------+----------+------------------+
| 1882|2008-02-25| TU| 9| FEB| 1|2008| false| 119824|137202| 26796| 38210| 6825| 1882| 2| 798| 120|2/25/2008 01:44:23|
+------+----------+---+----+-----+---+----+-------+-------+------+--------+-------+-------+------+-------+---------+----------+------------------+
only showing top 1 row
execution time 7.939592599868774
SHUFFLE_HASH(shuffle hash join)
start_time = time.time()
dateDF.createOrReplaceTempView("date10")
salesDF.createOrReplaceTempView("sales10")
joinDF = spark.sql("SELECT /*+ SHUFFLE_HASH(date10) */ * FROM date10 , sales10 WHERE date10.dateid = sales10.dateid")
joinDF.explain()
joinDF.show(1)
end_time = time.time()
print("execution time {0}".format(end_time-start_time))
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- ShuffledHashJoin [dateid#1779L], [dateid#1981L], Inner, BuildLeft
:- Exchange hashpartitioning(dateid#1779L, 1000), ENSURE_REQUIREMENTS, [plan_id=3234]
: +- Filter isnotnull(dateid#1779L)
: +- FileScan csv [dateid#1779L,caldate#1780,day#1781,week#1782L,month#1783,qtr#1784,year#1785L,holiday#1786] Batched: false, DataFilters: [isnotnull(dateid#1779L)], Format: CSV, Location: InMemoryFileIndex(1 paths)[s3://XXXX/input/join/date], PartitionFilters: [], PushedFilters: [IsNotNull(dateid)], ReadSchema: struct<dateid:bigint,caldate:string,day:string,week:bigint,month:string,qtr:string,year:bigint,ho...
+- Exchange hashpartitioning(dateid#1981L, 1000), ENSURE_REQUIREMENTS, [plan_id=3235]
+- Filter isnotnull(dateid#1981L)
+- FileScan csv [salesid#1976L,listid#1977L,sellerid#1978L,buyerid#1979L,eventid#1980L,dateid#1981L,qtysold#1982L,pricepaid#1983,commission#1984,saletime#1985] Batched: false, DataFilters: [isnotnull(dateid#1981L)], Format: CSV, Location: InMemoryFileIndex(1 paths)[s3://XXXX/input/join/sales], PartitionFilters: [], PushedFilters: [IsNotNull(dateid)], ReadSchema: struct<salesid:bigint,listid:bigint,sellerid:bigint,buyerid:bigint,eventid:bigint,dateid:bigint,q...
+------+----------+---+----+-----+---+----+-------+-------+------+--------+-------+-------+------+-------+---------+----------+-------------------+
|dateid| caldate|day|week|month|qtr|year|holiday|salesid|listid|sellerid|buyerid|eventid|dateid|qtysold|pricepaid|commission| saletime|
+------+----------+---+----+-----+---+----+-------+-------+------+--------+-------+-------+------+-------+---------+----------+-------------------+
| 2184|2008-12-24| WE| 52| DEC| 4|2008| false| 492| 493| 29029| 7877| 8059| 2184| 3| 678| 102|12/24/2008 06:12:06|
+------+----------+---+----+-----+---+----+-------+-------+------+--------+-------+-------+------+-------+---------+----------+-------------------+
only showing top 1 row
execution time 7.8767030239105225
SHUFFLE_REPLICATE_NL(shuffle-and-replicate nested loop join)
start_time = time.time()
dateDF.createOrReplaceTempView("date11")
salesDF.createOrReplaceTempView("sales11")
joinDF = spark.sql("SELECT /*+ SHUFFLE_REPLICATE_NL(date11) */ * FROM date11 , sales11 WHERE date11.dateid = sales11.dateid")
joinDF.explain()
joinDF.show(1)
end_time = time.time()
print("execution time {0}".format(end_time-start_time))
== Physical Plan ==
CartesianProduct (dateid#1779L = dateid#1981L)
:- *(1) Filter isnotnull(dateid#1779L)
: +- FileScan csv [dateid#1779L,caldate#1780,day#1781,week#1782L,month#1783,qtr#1784,year#1785L,holiday#1786] Batched: false, DataFilters: [isnotnull(dateid#1779L)], Format: CSV, Location: InMemoryFileIndex(1 paths)[s3://XXXXX/input/join/date], PartitionFilters: [], PushedFilters: [IsNotNull(dateid)], ReadSchema: struct<dateid:bigint,caldate:string,day:string,week:bigint,month:string,qtr:string,year:bigint,ho...
+- *(2) Filter isnotnull(dateid#1981L)
+- FileScan csv [salesid#1976L,listid#1977L,sellerid#1978L,buyerid#1979L,eventid#1980L,dateid#1981L,qtysold#1982L,pricepaid#1983,commission#1984,saletime#1985] Batched: false, DataFilters: [isnotnull(dateid#1981L)], Format: CSV, Location: InMemoryFileIndex(1 paths)[s3://XXXXX/input/join/sales], PartitionFilters: [], PushedFilters: [IsNotNull(dateid)], ReadSchema: struct<salesid:bigint,listid:bigint,sellerid:bigint,buyerid:bigint,eventid:bigint,dateid:bigint,q...
考察
今回、下記のデータセットに対してヒント句を変えて、ジョインを試してみましたが、実行計画の変化は見られたものの、実行時間は大差ありませんでした。
- やや大きめのデータセット(10MB)同士の結合
- やや大きめのデータセット(10MB)と、小さめのデータセット(14KB)同士の結合
もう少し大きいデータを使用する。複雑なクエリを実行することで、違いが出るかもしれません。今後試して見たいと思います。
参考