2
0

More than 1 year has passed since last update.

Databricksにおけるキャッシュによるパフォーマンスの最適化

Posted at

Optimize performance with caching | Databricks on AWS [2022/6/14時点]の翻訳です。

本書は抄訳であり内容の正確性を保証するものではありません。正確な内容に関しては原文を参照ください。

Deltaキャッシュは、高速な中間データフォーマットを用いてノードのローカルストレージにリモートファイルのコピーを作成することで、データの読み込みを高速にします。リモートの場所からファイルを取得しなくてはならない際は常に自動でデータがキャッシュされます。同じデータに対する以降の読み込みはローカルで実施され、読み込みスピードを劇的に高速にします。

Deltaキャッシュは全てのParquetファイルでも動作するので、Delta Lakeフォーマットに限定されるものではありません。Deltaキャッシュは Amazon S3、DBFS、HDFS、Azure Blob storage、Azure Data Lake Storage Gen1、Azure Data Lake Storage Gen2にあるParquetファイルの読み込みをサポートしています。CSV、JSON、ORCのような他のストレージフォーマットはサポートしていません

DeltaキャッシュとApache Sparkのキャッシュ

Databricksでは2つのタイプのキャッシュを利用することができます。DeltaキャッシュとSparkキャッシュです。それぞれのタイプの特性を以下に示します。

  • 格納データのタイプ: Deltaキャッシュはリモートデータのローカルコピーを保持します。さまざまなクエリーのパフォーマンスを改善しますが、任意のサブクエリーの結果を格納する目的で使用することはできません。Sparkキャッシュは、任意のサブクエリーの結果やParquet以外のフォーマット(CSV、JSON、ORCなど)で格納されるデータを格納することができます。
  • パフォーマンス: Deltaキャッシュに格納されたデータは、Sparkキャッシュに格納されたデータよりも高速に読み込み、操作を行うことができます。これは、Deltaキャッシュでは効率的な解凍アルゴリズムと、whole-stage code generationを用いた更なる処理に最適なフォーマットでデータを出力するためです。
  • 自動 vs 手動のコントロール: Deltaキャッシュが有効化されると、リモートソースから取得する必要があるデータは自動でキャッシュに追加されます。このプロセスは完全に透明なものであり、いかなるアクションも必要としません。しかし、事前にキャッシュにデータをロードしておくためには、CACHE SELECTコマンドを使用することができます(データのサブセットのキャッシュをご覧ください)。Sparkキャッシュを使用する際には、キャッシュするテーブル、クエリーを指定しなくてはなりません。
  • ディスク vs メモリーベース: Spark内の他のオペレーションからメモリーを奪わないように、Deltaキャッシュはローカルディスクに格納されます。モダンなSSDの高速な読み取り速度によって、Deltaキャッシュはパフォーマンスにネガティブなインパクトを与えることなしに、完全にディスクで完結するようになっています。一方、Sparkキャッシュはメモリーを使用します。

注意
DeltaキャッシュとApache Sparkのキャッシュを同時に使用することができます。

サマリー

以下の表では、ワークフローに最適なツールを選択できるように、DeltaキャッシュとApache Sparkキャッシュの主要な違いをまとめています。

機能 Deltaキャッシュ Apache Sparkキャッシュ
格納場所 ワーカーノードのローカルファイル インメモリーのブロックだが、ストレージレベルに依存
適用対象 S3、WASB、その他のファイルシステム上の任意のParquetテーブル 任意のデータフレーム、RDD
トリガー (キャッシュが有効化されている場合)最初の読み込みの際に自動で実行 手動、コードの変更が必要
評価 遅延 遅延
キャッシュの強制 CACHE SELECTコマンド キャッシュをマテリアライズするために .cache + 任意のアクションあるいは.persist
可用性 設定フラグで有効化、無効化、特定ノードタイプに対する無効化 常に利用可能
削除 LRUで自動で削除、あるいは手動でクラスターを再起動した際に削除 LRUで自動で削除、あるいは手動によるunpersist

Deltaキャッシュの一貫性

データファイルの作成、削除、コンテンツの更新があった際、Deltaキャッシュは自動でそれらを検知します。キャッシュデータを明示的に無効化することなしに、テーブルデータを書き込み、変更、削除することができます。

Deltaキャッシュはキャッシュされた後に更新、上書きされたファイルを自動で検知します。いかなる古いエントリーは、自動で無効化されキャッシュから削除されます。

Deltaキャッシュの使用

Deltaキャッシュを使用する推奨の(そして最も簡単な)方法は、クラスターを設定する際にDeltaキャッシュ高速化ワーカータイプを選択するということです。このようなワーカーはDeltaキャッシュは有効化・設定されています。

c5d、r5d、z1dシリーズのワーカーは、Deltaキャッシュが設定されていますが、デフォルトでは有効化されていません。キャッシュを有効化するには、Deltaキャッシュの有効化、無効化をご覧ください。

Deltaキャッシュは、ワーカーノードに提供されるローカルSSDで利用できる領域のほぼ半分を使用するように設定されています。設定オプションに関しては、Deltaキャッシュの設定をご覧ください。

データのサブセットのキャッシュ

キャッシュするデータのサブセットを明示的に選択するには、以下の構文を使用します。

SQL
CACHE SELECT column_name[, column_name, ...] FROM [db_name.]table_name [ WHERE boolean_expression ]

Deltaキャッシュが適切に動作するようにするためにこのコマンドを使用する必要はありません(データは最初にアクセスされた際に自動でキャッシュされます)。しかし、一貫性のあるクエリーパフォーマンスを必要とする際に役立ちます。

サンプルや詳細に関しては、以下をご覧下さい。

Deltaキャッシュのモニタリング

Spark UIのStorageタブにおけるエグゼキューターごとのDeltaキャッシュの現在の状態を確認することができます。

最初のテーブルでは、それぞれのアクティブなエグゼキューターノードの以下のメトリクスをまとめています。

  • Disk Usage: Parquetデータページを格納するのにDeltaキャッシュマネージャで使用するトータルサイズ
  • Max Disk Usage Limit: Parquetデータページを格納するのにDeltaキャッシュマネージャに割り当て可能なディスクの最大サイズ。
  • Percent Disk Usage: Parquetデータページを格納するために割り当てられる最大サイズに対して、Deltaキャッシュマネージャによって使用されるディスク領域の割合。ノードが100%のディスク使用量に到達した際に、キャッシュマネージャは新たなデータに領域を空けるために、最近使用されていないキャッシュエントリーが削除されます。
  • Metadata Cache Size: Parquetメタデータ(ファイルフッター)をキャッシュするために使用するトータルのサイズ。
  • Max Metadata Cache Size Limit: Parquetメタデータ(ファイルフッター)を格納するのにDeltaキャッシュマネージャに割り当て可能なディスクの最大サイズ。
  • Percent Metadata Usage: Parquetメタデータ(ファイルフッター)に割り当てられる最大サイズに対して、Deltaキャッシュマネージャによって使用される領域の割合。
  • Data Read from IO Cache (Cache Hits): 当該ノードでIOキャッシュから読み込まれたParquetデータのトータルサイズ。
  • Data Written to IO Cache (Cache Misses): 当該ノードでIOキャッシュに存在せず、結果的に書き込まれたParquetデータのトータルサイズ。
  • Cache Hit Ratio: 当該ノードで読み込まれた全ParquetデータのうちIOキャッシュから読み込まれたParquetデータの割合。

二つ目のテーブルでは、現在アクティブではないノードを含む全ノードの以下のメトリクスをまとめています。

  • Data Read from External Filesystem (All Formats): IOキャッシュに存在せず、外部のファイルシステムから読み込まれた人のフォーマットのデータのトータルサイズ。
  • Data Read from IO Cache (Cache Hits): 稼働クラスターにおいてIOキャッシュから読み込まれたParquetデータのトータルサイズ。
  • Data Written to IO Cache (Cache Misses): 稼働クラスターにおいて、IOキャッシュに存在せず、結果的にIOキャッシュに書き込まれたParquetデータのトータルサイズ。
  • Cache Hit Ratio: 稼働クラスターにおける全Parquetデータ読み込みに対して、IOキャッシュから読み込まれたParquetデータの割合。
  • Estimated Size of Repeatedly Read Data: 2回以上飲み込まれたデータサイズの近似値。このカラムはspark.databricks.io.cache.estimateRepeatedReadstrueの場合にのみ表示されます。
  • Cache Metadata Manager Peak Disk Usage: DeltaキャッシュマネージャがIOキャッシュを実行する際に使用するピークのトータルサイズ。

Deltaキャッシュの設定

お使いのクラスターで、キャッシュ高速化ワーカーインスタンスタイプを選択することをお勧めします。これらのインスタンスは、Deltaキャッシュに対して最適に動作するように自動で設定されています。

注意
ワーカーが停止されると、当該ワーカーに格納されていたSparkキャッシュは失われます。オートスケーリングが有効化されている場合、キャッシュが不安定になる場合があります。Sparkは必要に応じて欠けているパーティションを再度ソースから読み込む必要があります。

ディスク使用の設定

Deltaキャッシュがどようにノードのローカルストレージを使用するのかを設定するためには、クラスター作成時に以下のSpark設定を適用する必要があります。

  • spark.databricks.io.cache.maxDiskUsage: キャッシュデータに予約されているノードあたりのディスク領域のバイト数
  • spark.databricks.io.cache.maxMetaDataCache: キャッシュされたメタデータに予約されているノードあたりのディスク領域のバイト数
  • spark.databricks.io.cache.compression.enabled: 圧縮フォーマットでデータをキャッシュするかどうか

サンプル設定

ini
spark.databricks.io.cache.maxDiskUsage 50g
spark.databricks.io.cache.maxMetaDataCache 1g
spark.databricks.io.cache.compression.enabled false

Deltaキャッシュの有効化、無効化

Deltaキャッシュを有効化、無効化するには以下を実行します。

Scala
spark.conf.set("spark.databricks.io.cache.enabled", "[true | false]")

キャッシュを無効化しても、すでにローカルストレージにあるデータを削除することを意味しません。そうではなく、クエリーがキャッシュに新規データを追加できないようにし、キャッシュからデータを読み込まないようにします。

Databricks 無料トライアル

Databricks 無料トライアル

2
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
0