Top 5 Reasons to Convert Your Cloud Data Lake to a Delta Lake - The Databricks Blogの翻訳です。
過去5年間のSpark Summitsのアジェンダをチェックしてみると、Apache Spark™をETLとクエリエンジンとして活用し、Apache Parquetをファイルフォーマットとして適用して、クラウドにデータレイクを構築するためのベストプラクティスを紹介しているトピックには事欠かないことに気づくことでしょう。データをどのようにパーティションで分けるのか(あるいは分けるべきでないのか)、理想的なファイルサイズをどのように算出するのか、進化するスキーマをどのように管理するのか、データレイクに生データのストリームをどのように流し込むのかなどに関してアドバイスするセッションが多く存在します。
Databricksでは、お客様との協働を通じて、データレイクをDelta Lake(AI Summit in 2019でオープンソース化)に移行する際のベストプラクティスをまとめました。Apache ParquetベースのデータレイクをDelta Lakeに移行することによるメリットは数多くありますが、この記事では以下の5つの理由に焦点を当てています:
- データ破損の回避
- 高速なクエリー
- データ鮮度の改善
- 機械学習モデルの再現性確保
- コンプライアンスへの対応
基本的には、Delta Lakeはデータとともにトランザクションログを管理します。これによって、Delta Lakeのテーブルに対して、ACID準拠の読み書きを行うことができます。
データ破損の回避
本来データレイクは、オンプレミスにおけるデータウェアハウスのビッグデータの代替材であり、クラウド上ではなくHDFS上に構築されるものとして受け止められました。同様のデザインパターンは、Amazon Web Services(AWS)のS3のようなBLOBデータストレージにも引き継がれた結果、結果的整合性に基づく新たな課題が生まれました。本来のHadoopのコミットプロトコルはトランザクションに対するRENAME機能を前提としたものでしたが、これはHDFSではサポートしていましたがS3ではサポートしていませんでした。このことはエンジニアに対して、安全だが遅いプロトコルか、速いが安全ではないプロトコルの二択を迫ることになりました。Delta Lakeをリリースする以前、Databricksは当初独自のコミットプロトコルを開発していました。2017年のSpark Summitのプレゼンテーションにおいては、クラウドストレージに対する書き込みのトランザクションが説明されており、課題とその時点での我々のソリューションが説明されていました。
Delta Lakeは初めからBLOBストレージと連携するように設計されており、結果的整合性とデータの品質特性は最初から考慮されていました。Delta LakeテーブルへのETLジョブが途中で失敗した場合でも、クエリーを損なうことはありません。それぞれのSQLクエリーは常にテーブルの整合性が保たれている状態を参照できます。これにより、企業のデータエンジニアは、なぜETLジョブが失敗したのかを検証し、修正後ジョブを再実行することができます。この際には、部分的に書き込まれた不完全なファイルを削除したり、ユーザーに警告を発したり、以前の状態に戻すことを心配する必要はありません。
Delta Lake以前の一般的なデザインパターンは、バッチIDでデータをパーティショニングし、データ投入に失敗した際には当該パーティションを削除し、再試行の際に新たなパーティションを作成するというものでした。このデザインパターンはETLの処理を回復するのには役立ちましたが、結果的には少数のParquetファイルから構成される大量のパーティションを生み出すことになり、下流におけるクエリー性能を悪化させる原因にもなりました。通常は、より大規模のパーティションのテーブルにデータを複製することで、この問題に対応していました。Delta Lakeもパーティションをサポートしていますが、必要なのは想定されるクエリーのパターンに合わせて、それぞれが十分大きいサイズのパーティションを作るということだけです。このことで、データにおけるパーティションの数を限定し、少数のファイルをスキャンすることになり性能を改善できます。
SparkにおけるmergeSchema
オプションによって、異なるスキーマを持つParquetをマージすることが可能となります。標準的なParquetファイルベースのデータレイクでは、パーティションごとにスキーマが異なる場合がありますが、パーティション内ではスキーマは同一である必要があります。しかし、Delta Lakeのテーブルには同じ制約は適用されません。Delta Lakeにおいては、データエンジニアはテーブルのスキーマを進化させるか、書き込み時にスキーマを強制するかを選択することができます。互換性のないスキーマ変更が検知された場合には、Delta Lakeは例外をスローすることで、互換性のない型を持つカラムによってテーブルが破損することを防ぐことができます。また、Delta LakeのテーブルにはNOT NULL制約を追加することができますが、従来のParquetテーブルではこの制約を追加することはできません。これによって、必要なデータにNULL値を持つレコードがロードされること(さらには後段のプロセスを妨げること)を防ぐことができます。
最後に紹介するDelta Lakeがデータ破損を防ぐ手段はMERGE文のサポートです。多くのテーブルは追加のみをサポートするように構成されていますが、パイプラインに投入するレコードの重複を許すことは一般的ではありません。MERGE文を使うことで、新規レコードはINSERTするか、Deltaテーブルに存在する既存レコードを無視するようにパイプラインを設定できます。
高速なクエリー
Delta Lakeには、従来のParquetと比較しても高速なクエリーを実行できる特性があります。従来のParquetのように、クエリのたびに時間を要するBlobストレージに対するLIST処理を行うのではなく、Deltaのトランザクションログがマニフェストとして機能します。
トランザクションログはParquetのファイル名を追跡するだけではなく、それらの統計情報も管理します。統計情報とは、Parquetファイルの末尾にあるカラムの最小値、最大値です。これによって、Delta Lakeはデータがクエリーに合致するかどうかを判断することができるので、不要なデータの取り込みをスキップすることが可能となります。
不要なデータの取り込みをスキップするもう一つのテクニックは、クエリーの検索条件に該当するファイルの数が少なくなるように、物理的にデータを配置するというものです。これは、ZORDERによるデータの再配置の考え方に基づいたものです。このテーブル設計は、パーティションキーではないカラムに対する検索の高速化に適しています。これらのデータスキッピングのテクニックは、2018年のブログ記事で紹介されています。
Databricks Deltaを用いてペタバイトのデータを数秒で処理する
Spark+AI Summit 2020においては、Databricksは新たなDeltaエンジンを発表し、これによってさらなる性能改善が図られています。Deltaエンジンは、データを取り込む際にクラスターのSSD/NVMEドライブにデータをキャッシュするためのインテリジェントなキャッシング層を有しており、後続のクエリーが高速になります。また、Deltaエンジンは一般的なクエリーパターンを高速に処理するためのクエリーオプティマイザーを有しています。しかし、最大のイノベーションは、C++で記述されたネイティブのベクトル化エンジンPhotonの実装です。これらすべてのコンポーネントによって、Deltaエンジンは、Apache SparkのオープンなAPIを維持しつつも、劇的な性能改善を実現します。
Delta Lakeはオープンソースプロジェクトであるため、周辺にコミュニティが形成されており、他のクエリエンジンのサポートも進んでいます。もし、以下のクエリーエンジンを既に使用しているのであれば、Delta Lakeを利用開始できますし、すぐにメリットを教授することができます。
- Apache Hive
- Azure Synapse Analytics
- Presto および AWS Athena
- AWS Redshift Spectrum
- Snowflake
- Starburst Enterprise Presto
データ鮮度の改善
Parquetベースのデータレイクの多くは毎日、毎時更新されますが、数分おきに更新されることは稀です。ときに、このことは集計の粒度に影響を及ぼします。しかし、これは主にリアルタイムストリームとしてデータをデータレイクに投入できるのかという技術的課題に起因しています。最初からDelta Lakeはバッチとストリーミング両方のユースケースに対応できるように設計されています。Delta Lakeで構造化ストリーミングを活用することで、あるDeltaテーブルから他のテーブルにデータを変換する際に、自動的にビルトインのチェックポイント機構を活用することができます。トリガーの設定を変更するだけで、データ取り込みはバッチからストリーミングに切り替わります。
ストリームによるデータ取り込みにおける課題の一つは、頻繁なデータの書き込みによって小規模のファイルが大量に発生し、後続のクエリーの性能に悪影響を及ぼすというものです。一般論として、大量の小規模ファイル群に対して検索を行うより、少量の大規模ファイル群に検索する方が早くなります。それぞれのParquetファイルサイズは同一(多くの場合は128MB〜512MB)であるべきです。長い年月を通じて、データエンジニアはこれらの小規模ファイルを大規模ファイルにまとめる処理を自分の手で開発してきました。しかしながら、Blobストレージにはトランザクションがないため、後段の処理に影響を耐えないように、これらの定期処理は深夜に実行されていました。Delta Lakeにおいては、これらのファイルのコンパクト化の処理はOPTIMIZEコマンド一つで実行できますし、ACID特性を備えているので、ユーザーが検索処理を実行している際にOPTIMIZEを実行することが可能です。また、Delta Lakeには、継続的に最適なサイズのファイルに書き込みを行う自動OPTIMIZE機能もあります。
機械学習モデルの再現性確保
機械学習モデルを改善していくためには、データサイエンティストはモデルの結果を再現できるようにする必要があります。モデルのトレーニングをした人が会社を辞めてしまっている場合、この作業は非常に大変な作業になります。再現するためには、同じパラメーター、同じロジック、同じライブラリ、同じ環境を使用しなくてはなりません。Databricksでは、この問題を解決するために2018年にMLflowを開発しました。
再現性を確保するために追跡すべき他の要素は、トレーニング用データセットとテスト用データセットです。タイムトラベル機能によるデータのバージョン管理を活用することで、特定のタイミングにおけるデータを取得することが可能になります。データをコピーすることなしに、全く同じデータを用いて再トレーニングを行い、機械学習モデルの結果を再現することが可能となります。
コンプライアンスへの対応
GDPRやCCPAのような新たな法規制は、特定の個人に該当するデータを削除することを企業に要求します。従来のParquetベースのデータレイクの場合、データの削除や更新は非常に計算資源を消費するものとなります。特定の個人に対応する個人情報を含む全てのファイルを特定し、取り込み、除外した上で新たなファイルとして書き出し、最終的に元のファイルを削除します。そして、この操作はテーブルに対するクエリーを阻害しないように行われなくてはなりません。
Delta Lakeでは、DELETEとUPDATEがサポートされておりテーブルの操作を容易に行えます。詳細に関しては、ベストプラクティス:Delta LakeによるGDPR、CCPA準拠(英語)、Delta LakeとApache Spark™によるGDPR、CCPA対応のシナリオ(英語)を参照ください。
まとめ
これまで説明した通り、ParquetベースのデータレイクからDelta Lakeに移行することによって多くのメリットがあります。特に重要な5つの理由は以下の通りです:
- データ破損の回避
- 高速なクエリー
- データ鮮度の改善
- 機械学習モデルの再現性確保
- コンプライアンスへの対応
ParquetベースのデータレイクからDelta Lakeに移行する理由の最後の一つは、CONVERTコマンドを用いたテーブルの変換がシンプルかつ高速であるというものです。さあ、Delta Lakeを試してみませんか?
参考資料
- Delta Lakeクイックスタートガイド - Qiita
- Delta Lakeに対するFAQ - Qiita
- Deltaエンジンのご紹介 - Qiita
- Delta Lakeのデータマネジメントに対するお客様からの質問 - Qiita