1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

Sparkで実行が速いクエリを書くにはsuffleを減らせ

Last updated at Posted at 2020-01-01

動機

SparkSQLでは同じ結果が得られてもクエリの書き方によって遅かったり速かったりする。
どういうクエリを書けば速いのか。

それぞれの要求される仕様によってどういうクエリを書けば速いのか変わってくる。しかし、一定の方針がほしい。

方針

ノード間通信を減らすようなクエリを書く。
Sparkではノード間通信が連発するようなクエリを書くと計算時間に時間がかかる。

以下のただ単に最小値を探すクエリを考えてみる。

SELECT 
min(_id)
FROM
dataframe

SQLServerと比較して考えてみる。SQLServerでは普通1台のマシンで動作が完結する。1人で何でもやらなければならない。
方やSparkでは複数のマシンが協力して動作する。例えば4人で_idの最小値を頑張って探そうとしよう。。最後に4人各自が探し出した最小値を照らし合わせて最小値を探し出す。

Sparkmin.png

つまり、group by, distinct, except など、レコード同士の比較が必要なSQL文ではノード間通信が発生することになる。ノード間通信を連発すると計算の遅延につながる。

例題

例えば次のような売上データがあるとしよう。次の3つのカラムがあるデータがある。

  • id: 通し番号。1つのレコードにユニークな値が振られている。ダブリはない。
  • customer_id: お客様番号。お客様ごとにユニークな値が振られている。
  • price: お客様が買ったものの値段。

このデータに新しくmax_priceというカラムをつけてほしい。max_priceはお客様が買った商品の中で最も高額なものに1をつけてほしい。

スクリーンショット 2019-12-01 22.13.30.png

実行環境の作成

AzureのHDinsightを使用した。

サンプルデータの作成


from pyspark.sql import SparkSession
from pyspark.sql import functions as F

spark = SparkSession.builder.appName("sample").getOrCreate()
# 適当な量のデータを生成
df = spark.range(10000000000)
df = df.withColumn("customer_id", F.expr("id % 1000000"))
df = df.withColumn("price", F.expr("id % 100000"))
df.write.orc("path/to/file")

解法1: JOINを使う方法

  1. レコードからcustomer_idでグループ分けして一番priceが大きなレコードだけを抽出
  2. もとのレコードにjoin

コード

df.write.orc("path/to/file")
df.createOrReplaceTempView("df")

maxprice = spark.sql("""
SELECT
max(id) as id,
max(price) as tmp_maxprice
FROM
df
GROUP BY customer_id
""")

maxprice.createOrReplaceTempView("maxpricedf")

final_df = spark.sql("""
SELECT 
a.*,
CASE 
    WHEN a.price = b.tmp_maxprice and a.id = b.id THEN 1
    ELSE 0
END as maxprice
FROM
df as a
LEFT JOIN maxpricedf as b
ON 
a.id = b.id and a.price = b.tmp_maxprice
""")

final_df.write.orc("wasbs:///jointest")

DAG SCHEDULER の観察

スクリーンショット 2019-11-17 21.58.30.png

実行結果

スクリーンショット 2019-11-17 22.36.12.png

36分かかった。

解法2:

  1. window関数を用いてpriceが一番高いものの列を作成
  2. priceが一番高いものにフラグを付ける

コード

df = spark.read.orc("path/to/file")
df.createOrReplaceTempView("df")

max_df = spark.sql("""
SELECT 
*,
max( price ) over(partition by customer_id ) as tmp_max_price
from 
df
""")

max_df.createOrReplaceTempView("max_df")

final_df2 = spark.sql("""
SELECT 
*,
CASE 
    WHEN price = tmp_max_price then 1 
    else 0
END as max_price
FROM 
max_df
""").drop("tmp_max_price")

final_df2.write.orc("wasbs:///jointest2")

DAG SCHEDULER の観察

スクリーンショット 2019-11-17 22.38.54.png

実行結果

スクリーンショット 2019-11-17 23.02.40.png

25分かかった。

まとめ

解法1はgrpup by と left join の2回suffleが実行されるコマンドがあるので、DAGのstageが4つある。(group byで1ステージ、left joinで2ステージ実施)

解法2は max ... over の1回suffleが実行されるコマンドがあるので、DAGのStageが2つで済む。

結果、解法2のほうが11分ほど短縮された。

DAGのstageが多ければ多いほどsuffleが多く発生したことを表している。同じ結果が得られるならばsuffleが少ないクエリのほうが速い。

参考文献

High Performance Spark のChapeter 5

1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?