Master Spark: Optimize File Size & Partitions | Towards Data Scienceの翻訳です。
パーティションの管理、Repartition、Coalesce操作における包括的なガイド
大規模なSparkデータ処理オペレーションの舵を握っていることを思い描きましょう。Sparkにおける最適化に関する講義でよく言及される経験則は、ベストなI/Oパフォーマンスや並列性の向上において、それぞれのデータファイルのサイズは、ファイルを読み込む際のデフォルトのパーティションサイズである128Mb前後にすべきであるというものです[1]。
あなたのファイルを、データ処理のデータ処理の海を航海している船としてイメージしましょう。船が小さすぎると、ファイルをオープンし、ディレクトリを一覧して、オブジェクトのメタデータを取得し、データ転送のセットアップを行い、ファイルを読み込むということのメタファーとして、それらが停泊し再び出港するのに多くの時間を浪費することになります。逆に、あなたの船が大きすぎて、港の多くのドックを使用しないとなると、単一のリーダーがファイル全体の読み込みを完了するまでクエリーの処理を待たなくてならず並列度を引き下げるということのメタファーとして、それらは時間の要する荷物の積み下ろしを待たなくてはなりません[図1]。
ファイルサイズの最適化を鮮明に説明するために、以下の図を参照します。この特定の例では、全てのテーブルは8GBのデータを持ちます。
クエリー | ファイル数 | 実行時間 |
---|---|---|
SELECT count(*) FROM table | 1000 | 14.69 s |
SELECT count(*) FROM table | 1 | 10.78 s |
SELECT count(*) FROM table | 10000 | 1.84 minutes |
SELECT count(*) FROM table | 55 (~128 Mb/file) | 1.96 s |
しかし、このデリケートなバランスを探索することは、特に大規模なバッチジョブを取り扱う際には簡単なタスクではありません。膨大な出力ファイルに対するコントロールを失ったかのように感じるかもしれません。このガイドはあなたが立ち直れるようにするために手助けをします。
理解の鍵: パーティション
ディスクに保存される出力ファイルの数は、オペレーションが実行された際のSparkエグゼキューターにおけるパーティションの数と等しいです。 しかし、書き込みオペレーションの前にパーティションの数を計測することがトリッキーになることがあります。
テーブルを読み込む際、(sql.files.maxPartitionBytes
で変更することができますが)Sparkはデフォルトで、最大サイズが128Mbのブロックを読み込みます。このため、パーティションの数は入力のサイズに依存します。しかし実際には、パーティションの数はsql.shuffle.partitions
とほぼ同じになります。デフォルトの数は200ですが、大規模なワークロードでこれで十分なことはほとんどありません。理想的なシャッフルパーティションの数をどのように設定するのかを学ぶには、こちらの動画をチェックしてください。
ETLに少なくとも一つのワイドな変換処理が含まれている場合には、Sparkエグゼキューターにおけるパーティションの数はsql.shuffle.partitions
と同じになります。狭い変換処理のみが適用される場合には、パーティションの数はファイルを読み込んだ際に作成される数とマッチします。
シャッフルパーティションの数を設定することで、パーティショニングされていないテーブルを取り扱う場合のみにおいて、パーティションの合計数に対するハイレベルのコントロールを行うことができます。パーティショニングされたテーブルのテリトリーに入った場合には、sql.shuffle.partitions
パラメーターの変更で、容易にそれぞれのデータファイルのサイズを制御することができなくなります。
舵輪: RepartitionとCoalesce
実行時のパーティションの数を管理するには二つの主要な方法があります: repartition()
とcoalesce()
です。こちらがクイックなブレークダウンです:
-
リパーティション
repartition(partitionCols, n_partitions)
は、パーティションの数とパーティショニングするカラムの二つのパラメーターを持つ遅延評価の変換処理です。実行されると、Sparkはパーティショニングするカラムに従って、クラスター上でパーティションをシャッフルします。しかし、テーブルが保存されると、リパーティションの情報は失われます。 このため、ファイルを読み込む際に、この有用な情報は使用されません。df = df.repartition("column_name", n_partitions)
-
結合
coalesce(num_partitions)
も遅延評価の変換処理ですが、パーティションの数という一つのみの引数を受け付けます。重要なことですが、coalesceオペレーションはクラスター上でデータをシャッフルしません。- このためrepartition
よりも高速です。 また、coalesceはパーティションの数を削減することしかできませんので、パーティションの数を増やそうとしてもうまくいきません。df = df.coalesce(num_partitions)
ここで持ち帰るべき重要な洞察は、通常はcoalesceメソッドを使う方がより有益であるということです。 これは、リパーティショニングが有用ではないと言っている訳ではありません。実行時にデータフレームのパーティションの数を調整する必要がある時には特に有用です。
私のETLにおける経験において、サイズの異なる複数のテーブルを取り扱い、複雑な変換処理やjoinを実行する際には、sql.shuffle.partitions
は私が必要とする正確なコントロールをしてくれないことに気づきました。例えば、同じETLにおいて2つの小さなテーブルと2つの大きなテーブルをjoinする際に同じ数のシャッフルパーティションを用いると非効率的になり、小さなテーブルにおいては膨大な数の小さなパーティションとなるか、大規模テーブルには不十分な数のパーティションとなります。また、リパーティションによって、偏りのあるjoinやデータによる問題を回避する助けとなります[2]。
とは言っても、テーブルをディスクに書き込む前にリパーティションを行うことは適切とは言えず、多くの場合においてcoalesceで置き換えることができます。coalesceは、いくつかの理由からディスクに書き込む前においてリパーティションよりも利点があります:
- クラスター全体での不必要な再シャッフルを防ぎます。
- 論理的な経験則に従ったデータの並び替えが可能となります。書き込みの前にリパーティションを用いると、データはクラスター全体で再シャッフルされ、その順序の喪失を引き起こします。一方、coalesceを用いると、データは再分配されるのではなく、取得された際の順序を保持します。
データの順序がなぜ重要なのかを見ていきましょう。
並び替えの効果: データの並び替えの重要性
上でrepartiotion
メソッドをいつ、どのように適用するのか、Sparkはテーブルのメタデータにパーティションの情報を保存しないことを述べました。しかし、ビッグデータを取り扱う際には2つの理由からこれは重要な情報となります:
- クエリーの際に、はるかに高速にテーブルに対するスキャンが可能になりマます。
- (Parquet、CSV、JSONなどの)圧縮可能なフォーマットを取り扱っている際には、より良い圧縮が可能となります。こちらは、その理由を理解するのに役立つ素晴らしい記事です。
ここでのテイクアウェイは、保存の前にデータを並び替えるということです。この情報はメタデータに保持され、クエリーの際に活用されることでクエリーをより高速なものにします。
それでは、パーティショニングされていないテーブルに保存することと、パーティショニングされているテーブルに保存することの違いと、パーティショニングされているテーブルへの保存には、なぜ追加の調整が必要となるのかを探索しましょう。
パーティショニングされたテーブルのファイルサイズの管理
パーティショニングされていないテーブルにおいては、保存オペレーションにおけるファイル数の管理は直接的なプロセスです。保存の前にcoalesce
メソッドを活用することで、データがソートされているかどうかに関係なしにこのタスクを成し遂げることができます。
# Example of using coalesce method before saving a non-partitioned table
df.coalesce(10).write.format("parquet").save("/path/to/output")
しかし、パーティショニングされたテーブルを取り扱う際には、coalesceを行う前にデータをアレンジしない限り、この手法は効果的ではありません。なぜそうなのかを理解するためには、データが並び替えられた場合とそうでない場合において、Sparkエグゼキューターで行われるアクションにディープダイブする必要があります[図2]。
このため、パーティショニングされたテーブルにデータを保存する標準的なプロセスは以下のようになるべきです:
# Example of using coalesce method after ordering the data in a partitioned table
df.orderBy("columnName").coalesce(10).write.format("parquet").save("/path/to/output_partitioned")
その他の役に立つ手当て
repartition
やcoalesce
の他には、maxnumberofrecords
が有用であることを知るかもしれません。ファイルが大きくなりすぎることを防ぐための便利な手段であり、上述した手法と一緒に活用することができます。
df.write.option("maxRecordsPerFile", 50000).save("file_path")
まとめ
Sparkジョブにおけるファイルサイズをマスターするには、多くの場合トライアンドエラーが必要となります。ストレージ容量が安価で、数クリックで処理能力を手に入れられる時代においては、最適化は簡単に見過ごされてしまいます。しかし、数テラ、数ペタバイトのデータ処理が普通になってくると、これらのシンプルな最適化テクニックは、金額、時間、環境の面でのコストに大きなインパクトをもたらし得ます。
この記事が、あなたのETLプロセスに効率的な調整を行う助けになれば幸いです。熟練したキャプテンのように、自信と明確さを持ってSparkの海に漕ぎ出さんことを。