動機
SparkSQLでは同じ結果が得られてもクエリの書き方によって遅かったり速かったりする。
どういうクエリを書けば速いのか。
それぞれの要求される仕様によってどういうクエリを書けば速いのか変わってくる。しかし、一定の方針がほしい。
方針
ノード間通信を減らすようなクエリを書く。
Sparkではノード間通信が連発するようなクエリを書くと計算時間に時間がかかる。
以下のただ単に最小値を探すクエリを考えてみる。
SELECT
min(_id)
FROM
dataframe
SQLServerと比較して考えてみる。SQLServerでは普通1台のマシンで動作が完結する。1人で何でもやらなければならない。
方やSparkでは複数のマシンが協力して動作する。例えば4人で_idの最小値を頑張って探そうとしよう。。最後に4人各自が探し出した最小値を照らし合わせて最小値を探し出す。
つまり、group by, distinct, except など、レコード同士の比較が必要なSQL文ではノード間通信が発生することになる。ノード間通信を連発すると計算の遅延につながる。
例題
例えば次のような売上データがあるとしよう。次の3つのカラムがあるデータがある。
- id: 通し番号。1つのレコードにユニークな値が振られている。ダブリはない。
- customer_id: お客様番号。お客様ごとにユニークな値が振られている。
- price: お客様が買ったものの値段。
このデータに新しくmax_priceというカラムをつけてほしい。max_priceはお客様が買った商品の中で最も高額なものに1をつけてほしい。
実行環境の作成
AzureのHDinsightを使用した。
サンプルデータの作成
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
spark = SparkSession.builder.appName("sample").getOrCreate()
# 適当な量のデータを生成
df = spark.range(10000000000)
df = df.withColumn("customer_id", F.expr("id % 1000000"))
df = df.withColumn("price", F.expr("id % 100000"))
df.write.orc("path/to/file")
解法1: JOINを使う方法
- レコードからcustomer_idでグループ分けして一番priceが大きなレコードだけを抽出
- もとのレコードにjoin
コード
df.write.orc("path/to/file")
df.createOrReplaceTempView("df")
maxprice = spark.sql("""
SELECT
max(id) as id,
max(price) as tmp_maxprice
FROM
df
GROUP BY customer_id
""")
maxprice.createOrReplaceTempView("maxpricedf")
final_df = spark.sql("""
SELECT
a.*,
CASE
WHEN a.price = b.tmp_maxprice and a.id = b.id THEN 1
ELSE 0
END as maxprice
FROM
df as a
LEFT JOIN maxpricedf as b
ON
a.id = b.id and a.price = b.tmp_maxprice
""")
final_df.write.orc("wasbs:///jointest")
DAG SCHEDULER の観察
実行結果
36分かかった。
解法2:
- window関数を用いてpriceが一番高いものの列を作成
- priceが一番高いものにフラグを付ける
コード
df = spark.read.orc("path/to/file")
df.createOrReplaceTempView("df")
max_df = spark.sql("""
SELECT
*,
max( price ) over(partition by customer_id ) as tmp_max_price
from
df
""")
max_df.createOrReplaceTempView("max_df")
final_df2 = spark.sql("""
SELECT
*,
CASE
WHEN price = tmp_max_price then 1
else 0
END as max_price
FROM
max_df
""").drop("tmp_max_price")
final_df2.write.orc("wasbs:///jointest2")
DAG SCHEDULER の観察
実行結果
25分かかった。
まとめ
解法1はgrpup by と left join の2回suffleが実行されるコマンドがあるので、DAGのstageが4つある。(group byで1ステージ、left joinで2ステージ実施)
解法2は max ... over の1回suffleが実行されるコマンドがあるので、DAGのStageが2つで済む。
結果、解法2のほうが11分ほど短縮された。
DAGのstageが多ければ多いほどsuffleが多く発生したことを表している。同じ結果が得られるならばsuffleが少ないクエリのほうが速い。
参考文献
High Performance Spark のChapeter 5