Databricks Delta — Partitioning best practice | by gregzrichardson | Nintex Developers | Mediumの翻訳です。
本書は抄訳であり内容の正確性を保証するものではありません。正確な内容に関しては原文を参照ください。
パーティショニングは良いものですが...悪くもあります
Deltaのデータをパーティショニング(バケッティング)は明らかにポジティブなものです。お使いのデータは別々のバケット(blobストレージのフォルダ)にフィルタリングされ、このストアにクエリーを実行した際には、興味のあるバケットからのみデータをロードするだけで済みます。
シンプルなパーティションフィルタリングを用いることで、不要なデータのロードを避けることができますので、良いもののように聞こえると思いますが、あまりに多くのパーティションはトラブルを引き起こします。大量のパーティションは、大量の小さいデータファイルを生み出します。これは、多すぎるメタデータを生み出し、ストリームがデータストアから読み込みを行う際には、全てのメタデータをロードする必要があります。これは、診断が困難なドライバーのエラーの原因となります。我々のケースでは、Stream initializing…
でストリーミングがスタックすることになります。
いくつのファイル?
Sparkにおけるデータは、少ない数の128MBから1GBの間のサイズの大規模ファイルに格納されることが理想的です。これによって、ドライバーとワーカーは効率的にオペレーションすることができます。大量のサイズの小さいファイルにデータが分断されると、Deltaストアからの読み込みが遅くなり、一度に大量の小さいファイルに対するメタデータをロードしようとするので、ドライバーのメモリーを圧迫します。
ファイル分断のの原因には2つあります。最適化されないアップデートと過度のパーティショニングです。
最適化されないアップデート
継続的なDeltaストリーミングにおいては、一連のマイクロバッチの中でストリームが流れこむにつれて、小さい塊でデータが追加されます。デフォルト設定では、これが大量の小さいファイルを作成します。
過度のパーティショニング
パーティショニングに選択されるデータカラムに高いカーディナリティ(多くの異なる値)が存在する場合、Deltaストアは数千のパーティションを持つことになります。これは、ファイルストアではデータはきれいに見えますが、データのマイクロバッチごとに、多くの小さなファイルに分割されることになります。
最悪の組み合わせ
ストリームが最適化されないアップデートを行い、過度のパーティショニングを行なった場合、2つの要素が掛け合わされます。このように構成されたDeltaストアは、数百万の分断された小さなファイル(パーティション数 * パーティションごとの小規模アップデートの数となります)を持つことになってしまいます。
なぜそんなに悪いのですか?
Databricksエンジニアとの議論から、Databricksでは現在(2020/3)はDeltaストリーミングの実装に問題があります。データはきれいに別々のフォルダーにパーティショニングされますが、メタデータはそうではありません。Databricksエンジニアによると、メタデータに対するパーティションフィルタの適用はロードマップにあり、将来的には問題にならなくなるかもしれません。
これによって、ストリームがDeltaストアの特定のオフセットからデータの読み込みをリクエストした際に、読み込みストリームがソースデータの小規模なサブセットを読み込んだとしても、ソーステーブル全体のメタデータをチェックする必要があります。過度なパーティションニングと経年的な断片化が組み合わさると、数百万のファイルがドライバーを圧倒します。
自動optimizeは助けになりますか?
YesでもありNoでもあります。自動optimizeは全てのエグゼキューターに対して、パーティションに書き込む前に出力をマージすることを強制します。これは、多くのパーティションやバッチが存在する場合は助けになりますが、依然として多くの出力parquetファイルが存在することになります。自動コンパクションは既存ファイルに新規ファイルをまとめることで、さらに役立ちますが、あまりに多くのパーティションが存在する場合には役に立ちません。
しかし、流入データの津波に遭遇していないのであればオンにしてください。
ALTER TABLE myDatabase.myTable SET TBLPROPERTIES (delta.autoOptimize.optimizeWrite = true, delta.autoOptimize.autoCompact = true)
ジャミング
我々は、ブロンズDeltaストアからのストリーム読み込みがジャムるケースに遭遇しました。Stream initializing…
でスタックしました。ブロンズテーブルには、日時で最適化ジョブが実行されていましたが、自動optimizeが有効化されておらず、多くのパーティションが存在していました。customerIdでパーティションを作成しており、数千のパーティションが存在していました。
spark.readStream
.format("delta")
.partitionBy("platform", "product", "customerId")
...
.start(...)
フレッシュなチェックポイントからシルバーストリームを再起動した際には問題なく動作し、物事がさらにややこしくなりました。既存のチェックポイントからストリームをスタートした際にのみ問題が起きているようです。
ここで起こっていたのは、日次の最適化はうまく動作しており、過去24時間を通じて生じた大量のファイルを非常に少ない数のファイルにコンパクションしていたということです。既存のチェックポイントなしにシルバーストリームを実行した際、これらの新規結合ファイルを使用しているので正常に動作していました。
既存チェックポイントを指定してストリームを実行した際、どの小規模ファイルが対象なのかを知っていますが、Deltaはファイル内ではなく、ファイル全体でのみチェックポイントを作成します。数千のファイルを持つパーティションが1つに最適化されたとしても、ストリームは依然として次の大きいファイルがスタートするところまで、残りの小規模ファイルを処理する必要がありました。数千のファイルはパーティションの数(我々の場合は数千でした)と掛け合わされました。結果として、数百万の小さいファイルを処理する必要がありました。これによって、シルバーストリームのクラスターのドライバーは、大量のファイルをチェックする必要があり、メモリーが枯渇し処理に失敗しました。
対策
このことから、対策はもちろん、最初から小さなファイルが堆積することを防ぎ、パーティション数を減らすために、最初に自動optimizeと自動コンパクションを有効化することでした。
パーティションの選び方
レポート(シンク)テーブル
Spark SQLを通じてれポーティングのためにのみ使用されるテーブルは、より積極的にパーティショニングすることができます。しかし、SQLクエリーのWHEREフィルターで使用される値でのみパーティションを作成します。
このケースでは、クエリーを処理するためにDeltaがフェッチする必要があるでtーあを制限するのに、パーティションは非常に役立ち、Deltaストアがパーティション全体を処理するためにメタデータをフェッチする必要がないため無駄にはなりません。SQLクエリーにはチェックポイントは含まれません。
ソーステーブル
ストリーミングのソースとして使用されるテーブルでは、パーティショニングを最低限にします。ストリーミングクエリーがパーティションを使用するとしても、チェックポイントを評価するために、ストリームは依然として全てのパーティションのメタデータを処理する必要があります。
場合によります
我々のケースでは、シルバーテーブルのパーティションを別に作成する必要がありました。データの最終的なシンクであり、SQLレポーティングに直接使用されているテーブルには、パーティションにcustomer ID(数千の値)を含めました。ゴールドテーブルのストリーミングソースとして使用されているテーブルに対しては、大量のパーティションが生じないように我々はcustomer IDを含めていません。
結果として、ドライバーは再びハッピーになりました。