16
14

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

Spark SQLによるビッグデータ集計をチューニングするために分散処理の基礎から駈けぬける速習ガイド

Last updated at Posted at 2020-07-22

「NetflixはSparkでペタバイト級の集計をやってるらしいですね〜」(2015年の発表
「AWSのEMRとかGCPのDataprocあたりでSpark環境を用意してみました。ふふん」
「とりまクラスタ立ててみましたけど、全然パフォーマンスでないんですけど?」
「っていうか何が起きてるかよく分からないし……分散処理とか馴染みないし……」

という感じの方が、とりあえずSpark SQLの実行計画やWeb UIの情報をざっくり読みとってチューニングできるようになるための道筋を示すことが本記事の目的です。
Sparkの良い解説記事は沢山あるものの話題が多岐に渡り初心者は迷子になりやすいので、それらの主要図を引用しながら駆けぬけていく感じでいきます><

Cluster Manager

Sparkはクラスターの上でデータを処理しますが、その管理はクラスターマネージャー(リソースマネージャー)に依存しています。クラスタマネージャーとして選択できるのは次の4つです。

  • Standalone
  • Apache Mesos
  • Hadoop YARN
  • Kubernetes

Cluster Mode Overview - Spark 3.0.0 Documentation

「Standalone」はその名の通りSparkに内蔵されているクラスターマネージャーです。
「Apache Mesos」「Hadoop YARN」はどちらもApacheのOSSです。Sparkもそうですね。
YARNのアーキテクチャ図を引用するので雰囲気を感じてください。詳しくは『YARN の紹介 – IBM Developer』がお勧めです。

Hadoop YARN
Apache Hadoop YARN – Apache Hadoop

Sparkはこういったクラスターマネージャを通じて、ジョブを実行します。

Spark
Cluster Mode Overview - Spark 3.0.0 Documentation

Worker Nodeというのは実際にデータを扱うマシンだったりインスタンスのことです。Driver ProgramやCluster Managerを実行するものは、この図には描かれてないですがMaster Nodeと呼ばれたりします。

一般的にMaster Nodeは単一障害点になりますが、EMRやDataprocでは複数Node用意して高可用性を保つ機能も用意されていたりします。
複数のマスターノードを持つ EMR クラスターの起動 - Amazon EMR
高可用性モード - Dataproc ドキュメント - Google Cloud
また実行中にWorker Nodeを追加する自動スケーリングも用意されています。
カスタムポリシーによる自動スケーリングをインスタンスグループに使用する - Amazon EMR
クラスタの自動スケーリング - Dataproc ドキュメント - Google Cloud

クラスターマネージャーは spark.master で設定・確認できます。よくある初歩的なミスは、クラスター環境を組んだのに spark.master がローカル開発時のまま local になっていてMaster NodeでしかSparkが動いていなかったというものです。
その他の設定はクラスターマネージャーに依るので公式ドキュメントを参照してください。
Cluster Managers - Configuration - Spark 3.0.0 Documentation

なお、従来は「Hadoop YARN」を使うのが標準的だったかと思うのですが、Spark 2.3頃から今を時めく「Kubernetes」対応が始まっていて、これからはSpark on Kubernetesの導入例が増えていきそうです。
すでにKubernetesの環境を持っていて、マシンリソース管理を纏めたいのであれば試す価値は充分にあると思います。ベンチマーク上では、すでにYARNと同等のパフォーマンスが出ているそうです。
Kubernetes で Spark パフォーマンスを最適化する - Amazon Web Services ブログ

Executor

さて、前の図にExecutorというものがありました。
これの実体はYARNだとContainer、KubernetesだとPodになります。そしてこの中にJVM(Java仮想マシン)が立って、タスクが実行されていくことになります。メモリレイアウトを見ると分かりやすいので引用します。

Executor
Amazon EMR で Apache Spark アプリケーションのメモリをうまく管理するためのベストプラクティス - Amazon Web Services ブログ

Sparkはなるべくインメモリでデータを処理するので、都度ファイルを読み書きする素のHadoopより高速なのですが、そのためのメモリ領域があることも見てとれますね。また処理しながら中間データのメモリを解放するため、JVMのGC (Garbage Collection)がそれなりに起きるであろうことも想像が付くと思います。
GCに馴染みのない方には、かなり古いですが次の記事が入門としてお勧めです。Eden領域はロマン。
ガベージコレクタの仕組みを理解する - チューニングのためのJava VM講座(後編) - @IT

Executorは1 Nodeに複数立てることができて、spark.dynamicAllocation.enabledが有効な場合は実行中にExecutorが自動で増えたり減ったりする様子をWeb UIで眺めることができます。

Jobs Tab
Jobs Tab - Web UI - Spark 3.0.0 Documentation

Executorに限らずSparkの設定はなかなか悩ましいところですが、まずはデフォルトないしは推奨設定にしておいてクエリレベルのチューニングに取り組んだ方がいいと思います。

MapReduce

これまで見てきたようなクラスター環境で行う分散処理の基礎モデルとして、MapReduceというものがあります。ちなみにGoogleの元論文『MapReduce: Simplified Data Processing on Large Clusters』の第一著者は『全盛期のJeff Dean伝説』というネタがある程のレジェンドです。
次の図は、HadoopにおけるMapReduceについてのものなのですが、Sparkでも大きな考え方は同じなので引用します。

MapReduce
MapReduce
Hadoop: The Definitive Guide [Book] - O'Reilly

「Map」「Reduce」は関数型プログラミングに慣れていれば、ScalaやPythonなどで同名のメソッドを使ったことがあるでしょうから、イメージが付きやすいと思います。「Shuffle」はデータのパーティションをシャッフルして作りなおす必要がある操作で、このときWorker Node間で通信が行われます。
SQLと対応付けてみると、だいたいこんな感じです。

| MapReduce | SQL |
| --- | --- | --- |
| Input | FROM |
| Map | SELECT、WHERE、HAVING |
| Shuffle | JOIN、GROUP BY、ORDER BY |
| Reduce | SUMなどの集約関数 |
| Output | INSERT |

※ 正確には後述のように、JoinはアルゴリズムによってShuffleしなかったりします。

「Input」「Map」「Shuffle」「Reduce」「Output」どのフェーズもボトルネックになりえるので、どの操作がどのフェーズで行われていてCPU・メモリ・I/O・ネットワークへの負荷がどのくらい掛かっているのか、を把握するのがパフォーマンスチューニングのポイントになってきます。
場合によっては、クラスタをスケールアップしたりスケールアウトすることになるかもしれません。

Dataset (DataFrame) / RDD

ここまで分散処理の考え方を見てきたのですが、集計クエリを書くときはデータがどうパーティショニングされているかなどは意識せずに、論理的なデータを扱いたいですね。そこで登場するのがみんな大好き表形式DataFrameです。
PythonのPandasを使っている方なら親しみのある用語で、実際PySparkでは .toPandas() でSparkのDataFrameをPandasのDataFrameに変換することができます。ちなみにこのデータ変換はApache Arrowで効率化よく行うことができて、Apache Arrowの主要メンバーにはPandasの作者Wes McKinneyがいたりします。
PySpark Usage Guide for Pandas with Apache Arrow - Spark 3.0.0 Documentation
(翻訳)Apache Arrowと「pandasの10項目の課題」

SparkではDataFrameより汎用的なものとしてDatasetがあって、実際 Dataset[T] の行データを表す型変数 TRow である場合のエイリアスとして DataFrame は定義されています。

type DataFrame = Dataset[Row]

org/apache/spark/sql/package.scala#L46

Row は任意のスキーマの行を表すことができるクラスなので DataFrame だけでも事足りるのですが、 行データを表す型変数 TRow だけでなくScalaのcase classも使えます。そうすると行単位の処理をよりScalaらしく簡潔に書くことができて、さらに型情報でスキーマを表すことができる利点があります。そのため型安全……と言いたいところですが、ちょっとややこしい事情があって詳しくは過去記事を参照してください。
SparkのDataFrame/Datasetって型安全なの?

さて、Datasetに対する操作はメソッドチェーンな書き方とSQLな書き方があります。C#のLINQに似てますね。

ds.groupBy($"a")
  .agg(sum($"b") as "b_sum")
ds.createGlobalTempView("table")
spark.sql("""
  SELECT
    a,
    SUM(b) AS b_sum
  FROM
    global_temp.table
  GROUP BY
    a""")

こうして発行されたクエリを、実際のクラスター環境でどうMapReduce的に分散処理するか計画を立てるのが、Spark SQLの実行エンジンです。これには沢山のステップがあるのですが、フローチャートを見ると分かりやすいので引用します。

Plan
Apache Sparkコミッターが教える、Spark SQLの詳しい仕組みとパフォーマンスチューニング Part1 - ログミーTech

ここでDatasetに対するクエリから作られた最終的な実行計画 (Selected Physical Plan)が何を操作しているかというと、それはRDD (Resilient Distributed Dataset)というデータ構造になります。
Datasetがなかった昔のSparkではRDD操作をユーザーが直接コーディングしていたのですが、JoinなどのShuffle時に自力でKey-Value形式への変換処理などを書く必要があったそうで、Spark SQLの実行エンジンはそれらを自動でやってくれるすごいやつなのですね。もちろん万能ではないので、後述するJoin Hintなどで戦略を与えた方がいい場合もありますが、RDDを直接扱うべきシチュエーションはまず無いと思います。

ただ、Sparkの中でRDDは生きているので、耐障害性などの利点はそのままDataset操作にも引き継がれているというわけです。このおかげで、たとえばEMRでWorker Nodeにスポットインスタンスを使用していたとして処理中にインスタンスが中断されたとしても、そのインスタンスが抱えていたデータは残りのWorker Nodeによって再生成が試みられます。
この仕組みについては、次のスライドが分かりやすいので引用します。

RDD
Apache Sparkのご紹介 (後半:技術トピック) - SlideShare

Application / Job / Stage / Task

Sparkがどのような実行計画で分散処理するか知るための手がかりとして、実行前ならばEXPLAIN、実行後であればWeb UIがあります。Web UIではJob、Stage、Taskといった実行単位が見えますね。

Stages Tab
Stages Tab
Stages Tab - Web UI - Spark 3.0.0 Documentation

これは次のような階層構造になっています。

Application
High Performance Spark [Book] - O'Reilly

まず抑えておくべきポイントは、Sparkのデータ処理は遅延評価ということです。
ではどのタイミングで評価が走るかというと、それはActionと呼ばれるメソッドを呼んだ時です。これはDatasetの外の世界に出るような操作、すなわち collect してScalaのArrayに変換したり write して外部ストレージに出力したりしたりする操作が該当します。
このActionごとに、これまで書かれたDataset操作を辿ったDAG (有向非巡回グラフ)が構築されて、その一纏まりをJobと呼びます。

Jobは、パーティションごとの実行単位であるTaskで構成されているのですが、その間にStageという実行単位があります。これはNode間で待ち合わせをしないといけないShuffleで区切られるもので、つまりStage内ではパーティションの並列処理ができるようになっています。RDDの元論文から引用します。

DAG
Resilient Distributed Datasets: A Fault-Tolerant Abstraction for In-Memory Cluster Computing

実際に構築されたDAGはWeb UIで眺めることができます。これが複雑なグラフになってくると、なんだかすごく高級な集計をやった気になれますね……!

Jobs detail
Jobs detail - Web UI - Spark 3.0.0 Documentation

Cache

ここまででSpark SQLの分散処理を理解するための基本的な用語・概念は一通りさらえたかなと思うのですが、では実際のパフォーマンスチューニングで初めに検討した方がいいのは何かというと、それはキャッシュ機能の活用になります。具体的には ds.cache()ds.persist(MEMORY_AND_DISK) のようにしてキャッシュできます。
公式ドキュメントでも真っ先に言及されていますね。
Caching Data In Memory - Performance Tuning - Spark 3.0.0 Documentation

キャッシュを検討するべき明らかなサインは、コード上に何度も使い回すようなDatasetが現れた時です。
このあたりはPandasのDataFrameと感覚の異なるところで、Sparkは遅延評価なのでコード上ではサブクエリを共通化したつもりでも、実行されるDAGでは次のスライドのように分裂することがあります。

cache
Apache Sparkに手を出してヤケドしないための基本 ~「Apache Spark入門より」~ (デブサミ 2016 講演資料) - SlideShare

他の効用として、キャッシュはRDD再生成のためのセーブポイントになるので、重い処理中に不安になったら仕掛けておくこともあります。
もちろんキャッシュしすぎもよくなくて保存先のメモリなりストレージを食うので、Web UIを眺めて圧迫感を覚えたら ds.unpersist()spark.sqlContext.clearCache() によるキャッシュのクリアも検討しましょう。

Storage Tab
Storage Tab - Web UI - Spark 3.0.0 Documentation

Join

お待たせしました。Spark SQLチューニングの華、Joinアルゴリズムです!
└(´ー`┌)└(´ー`)┘(┐´ー`)┘ヤンヤヤンヤ♪

まず代表的なJoinアルゴリズムといえば、Shuffle Hash Joinです。
イメージとしては次のスライドが分かりやすく、それぞれJoinキーのハッシュ値を計算してShuffleを行い、転送先のパーティションでハッシュ値を元にJoinします。

Shuffle Hash Join
Optimizing Apache Spark SQL Joins

ここで気をつけないといけないのは、Joinキーによってデータサイズが著しく偏っている場合です。Shuffle後のパーティション配置がアンバランスになって並列性が落ちているようであれば、状況によってはJoinキーの再検討をした方がいいかもしれません。
並列性の確認はWeb UIのタイムラインを眺めたり、NodeごとのCPU・メモリ使用率を眺めたりします。

Stage detail
Stage detail - Web UI - Spark 3.0.0 Documentation

また、片方のデータ量が小さい場合にはそれをブロードキャストしてしまったのち、もう片方のパーティションごとにHash Joinしてしまった方が速い時があります。時間と空間のトレードオフですね。

Broadcast Hash Join
Optimizing Apache Spark SQL Joins

では、Spark SQLはブロードキャストした方がいいのかどう判断しているかというと、ANALYZE TABLEの結果などを元に推定したデータサイズがspark.sql.autoBroadcastJoinThresholdを超えるかどうか、をチェックしています。詳しい推定方法については実装を参照してください。
org/apache/spark/sql/catalyst/plans/logical/statsEstimation
ただ推定値はそれなりに外す場合もあるので、Spark 3からAdaptive Query Executionという機能が追加されました。実行時の統計情報を利用して、Sort Merge JoinをBroadcast Hash Joinに切り替えたりできるそうです。
Converting sort-merge join to broadcast join - Performance Tuning - Spark 3.0.0 Documentation
Spark SQL Adaptive Execution at 100 TB

場合によってはSpark SQLの判断結果がいけてなくて、Joinアルゴリズムを指定したい場合があるとおもいます。その場合はJoin Hintを指定することで誘導することができます。
Join Hints - Hints - Spark 3.0.0 Documentation

Join Hintとアルゴリズムの正確な対応関係はドキュメントに載っていないのですが、実装を追うと次のようになっているみたいです。
なお本記事では紹介しきれてないですが、Hash Joinは速いものの等価条件の時しか使えないので、そのような時は別のアルゴリズムを使う必要があります。それでは遅い場合、不等号条件であっても値を丸めるなどして等価条件に持ちこめないか検討してみてください。

Hint EXPLAIN Join条件
broadcast BroadcastHashJoin 等価のみ
broadcast BroadcastNestedLoopJoin 制約なし
merge SortMergeJoin 等価のみ、ソート可能
shuffle_hash ShuffledHashJoin 等価のみ
shuffle_replicate_nl CartesianProduct 制約なし

ちなみにアルゴリズムの詳細も実装を追うしかなさそうなのですが、たとえばShuffle Hash Joinについて知りたければ、まずShuffledHashJoinExecクラスを読みます。

protected override def doExecute(): RDD[InternalRow] = {
  val numOutputRows = longMetric("numOutputRows")
  streamedPlan.execute().zipPartitions(buildPlan.execute()) { (streamIter, buildIter) =>
    val hashed = buildHashedRelation(buildIter)
    join(streamIter, hashed, numOutputRows)
  }
}

org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala#L65

ここで buildPlan というのはHash Joinする際にハッシュテーブルを作る方で、 streamedPlan はループを回しながらそのハッシュテーブルとマッチさせる方です。 zipPartitions はそういったHash Joinをパーティションごとにやりなさいと言っているのですが、ではShuffleの気持ちはどこにあるかというと、

override def requiredChildDistribution: Seq[Distribution] =
  HashClusteredDistribution(leftKeys) :: HashClusteredDistribution(rightKeys) :: Nil

org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala#L49

の部分で指定された HashClusteredDistribution を受けて、子計画の buildPlanstreamedPlanShuffleExchangeExec などが引き起こされるようです。
org/apache/spark/sql/execution/exchange/EnsureRequirements.scala#L61

EXPLAINで実行計画を表示すると、こんな感じになるでしょう。

+- ShuffledHashJoin ...
   :- ShuffleExchangeExec ....
   ...
   +- ShuffleExchangeExec ...

Data Source

ジョブの中でInputがボトルネックな場合は、Inputを司るData Sourceにもチューニングポイントがあります。
まず知っておくべきはプッシュダウンという手法で、これはフィルター処理をなるべくデータソースに近い場所で行い、なる速でデータサイズを削減して高速化しましょうというものです。
What is predicate pushdown? In mapreduce. - Quora

Data Sourceは色んなフィルター処理を行なっていて、たとえばParquetのファイルを読む場合は、

case class ParquetScan(
    ...
    pushedFilters: Array[Filter],
    ...
    partitionFilters: Seq[Expression] = Seq.empty,
    dataFilters: Seq[Expression] = Seq.empty) extends FileScan {

org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScan.scala#L35

とあって親クラスFileScanのコメントによると partitionFilters はPartition Pruningに、 dataFilters はFile Listingに使われるようです。実際にどういうフィルター処理が割り当てられたかは、次のスライドのように実行計画で読みとることができます。

Data Source
Spark SQL - The internal - SlideShare

そして pushedFilters はParquetのライブラリに委譲されるフィルターで、Parquetでは次のスライドのようにRow Groupの統計情報を利用した絞り込みなどができるようです。ちなみにParquetのデータ構造入門には『カラムナフォーマットのきほん 〜データウェアハウスを支える技術〜』がお勧めです。

Predicate Pushdown
The Parquet Format and Performance Optimization Opportunities - SlideShare

そのためクエリレベルでもフィルター処理をなるべく前倒しにしたり、Data Sourceが対応している条件でフィルター処理を書きなおすことで実行計画が変わり、Inputが大幅に改善することがあります。

UDF

任意の関数をSpark SQLで実行できるため便利なUDF (User Defined Function)ですが、Spark SQLのネイティブ関数の方が実行最適化されるため、同じ計算ができるのであればネイティブ関数を組み合わせたほうがパフォーマンス上有利です。
https://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/Column.html
https://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/functions.html

なぜそうなるのか試しに array_max の実装を覗いてみましょう。

case class ArrayMax(child: Expression)
  extends UnaryExpression with ImplicitCastInputTypes with NullIntolerant {
  // ....
  override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
    // ....
    ev.copy(code =
      code"""
         |${childGen.code}
         |boolean ${ev.isNull} = true;
         |$javaType ${ev.value} = ${CodeGenerator.defaultValue(dataType)};
         |if (!${childGen.isNull}) {
         |  for (int $i = 0; $i < ${childGen.value}.numElements(); $i ++) {
         |    ${ctx.reassignIfGreater(dataType, ev, item)}
         |  }
         |}
      """.stripMargin)
  }

  override protected def nullSafeEval(input: Any): Any = {
    var max: Any = null
    input.asInstanceOf[ArrayData].foreach(dataType, (_, item) =>
      if (item != null && (max == null || ordering.gt(item, max))) {
        max = item
      }
    )
    max
  }
}

nullSafeEval は普通のScala実装ですが doGenCode はJavaのコードを生成……しかも子コードの埋め込みなどもしていて、C言語の#defineマクロによるメタプログラミングのようなことをしています。
レコードごとオペレータごとに nullSafeEval のような普通の関数を呼びだす実装だと、ビッグデータ相手では仮想関数テーブルやコールスタックを引くコストすら塵も積もれば山となってしまうため、モジュール化を避けた素朴なコードに変換することでそれらのコストを避け、JVMによるJITのループ展開も期待できるという代物です。さらにこのコード生成はwhole-stage codegenという仕組みにより、できるだけ複数の操作をまとめて一つの大きなJavaのコードにしようとします。ただ大きすぎるとJVMの制限に引っかかるため、その際は分割します。
https://databricks.com/blog/2016/05/23/apache-spark-as-a-compiler-joining-a-billion-rows-per-second-on-a-laptop.html
https://databricks.com/jp/session_na20/understanding-and-improving-code-generation

この仕組みのおかげで、SparkユーザはSQLの高く抽象化された操作を書くだけで、カリカリにチューニングされた職人コードのパフォーマンスを手に入れられるというわけです。凄まじい努力ですね……。
コード生成が行われているかどうかは実行計画で「*」が付いているかどうかで確認できて、付いていない場合はShuffleのようなコード生成対象でない操作か、もしくは nullSafeEval のような普通の関数がインタープリタ実行されているようです。

== Physical Plan ==
*Aggregate(functions=[sum(id#201L)])
+- Exchange SinglePartition, None
+- *Aggregate(functions=[sum(id#201L)])
+- *Filter (id#201L > 100)
+- *Range 0, 1, 3, 1000, [id#201L]

対してUDFは、データ変換をしてUDF呼び出したのちまたデータ変換するため、それなりのオーバーヘッドがあります。
PySparkを使っている場合は、さらにJVMとPythonを行き来するオーバーヘッドがあります。Pandas UDFはApache Arrowでデータを受け渡すためPython UDFより効率は良いですが、それでもScala UDFとは桁違いに遅くなるようです。
https://www.slideshare.net/databricks/a-deep-dive-into-query-execution-engine-of-spark-sql
https://medium.com/quantumblack/spark-udf-deep-insights-in-performance-f0a95a4d8c62

ということでUDFを使いたくなったら、

  1. Sparkのネイティブ関数の組み合わせで表現できないか検討する。
  2. 表現できない場合は、なるべくScala(もしくはJava)でUDFを書く。
  3. 実行速度を切り詰めたい場合は、頑張ってJavaのコード生成まで実装する。
16
14
1

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
16
14

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?