How to Simplify CDC With Delta Lake's Change Data Feed - The Databricks Blogの翻訳です。
本書は抄訳であり内容の正確性を保証するものではありません。正確な内容に関しては原文を参照ください。
チェンジデータキャプチャ(CDC)は、数多くのDatabricksのお客様が実装しているユースケースです。このトピックに関する以前のディープダイブをこちらでチェックすることができます。多くの場合、メダリオンアーキテクチャと呼ばれる分析アーキテクチャにデータを取り込む際にCDCを活用するケースを目撃します。メダリオンアーキテクチャは、ソースシステムから到着する生データを受け取り、ブロンズ、シルバー、ゴールドテーブルを通じてデータを洗練していきます。CDCとメダリオンアーキテクチャは、変更、追加されたデータのみを処理するので、ユーザーに対して複数のメリットを提供します。加えて、データサイエンティストやBIアナリストなど異なるペルソナがそれぞれの要件に応じて、このアーキテクチャのそれぞれのテーブルの最新のデータを活用することができます。そして、新機能であるDetla Lakeのチェンジデータフィード(CDF)機能を発表できることを嬉しく思います。これによって、このアーキテクチャの実装がよりシンプルになり、Delta LakeのMERGEオペレーションとログのバージョン管理が利用できるようになります!
なぜCDFの機能が必要なのでしょうか?
他のビッグデータ技術と比較してDelta Lakeを用いた方が実装がシンプルになるので、CDCを行うために多くのお客様がDatabricksを使用しています。しかし、適切なツールを用いたとしても、CDCの実行は依然として課題となっています。コーディングをさらにシンプルにし、以下のようなCDCに関わる最大のペインに取り組むためにCDFを設計しました。
- 品質コントロール - バージョンに渡る行レベルの変更を行うことが困難です。
- 非効率性 - 現在のバージョンの変更はファイルレベルであり、行レベルでないため、変更されていない行を考慮するのが非効率的になり得ます。
チェンジデータフィード(CDF)の実装によって、以下のようにして上の問題の解決を支援します。
- シンプルさと便利さ - 変更を特定するための一般的、使いやすいパターンを活用することで、コードをシンプルかつ便利にし、理解やすくします。
- 効率 - バージョン間で変化した行の身を持つ能力によって、後段でのMerge、Update、Deleteオペレーションを劇的に効率的なものにします。
CDFはDeltaテーブルの変更のみを捕捉し、有効化された以降の変更のみを捕捉します。
チェンジデータフィードの実践!
一般的なユースケースである財務予測におけるCDFのサンプルにダイブしていきましょう。この記事の上部で参照してるノートブックは、財務データを取り込みます。Estimated Earnings Per Share (EPS)は、アナリストによってもたらされる企業の四半期の株価に対する収益を予測する財務データです。生データは数多くの異なるソース、複数の株に対する複数のアナリストからもたらされます。
CDF機能を用いることで、データはシンプルにブロンズテーブルに取り込まれ、シルバーテーブルでフィルタリング、クレンジング、拡張され、最後にシルバーテーブルの変更データに基づいてゴールドテーブルで値が集計、計算されます。
これらの変換処理は複雑になる場合がありますが、お陰様で、行ベースのCDF機能はシンプルかつ効率的となります。しかし、どのように使うのでしょうか?見ていきましょう!
注意
ここでのサンプルはCDFのSQLバージョンと、特定のオペレーションの使い方、バリエーションの評価方法にフォーカスしています。ドキュメントはこちらを参照ください。
Delta LakeテーブルにおけるCDFの有効化
テーブルに対してCDF機能を有効化するためには、当該のテーブルで機能を最初に有効化しなくてはなりません。以下にテーブル作成時にブロンズテーブルでCDFを有効化するサンプルを示します。テーブルをアップデートする際にCDFを有効化することもできます。さらに、クラスターで作成されるすべてのテーブルでCDFを有効化することができます。これらのバリエーションに関しては、ドキュメントを参照ください。
チェンジデータフィードは未来思考の機能であり、テーブルプロパティがセットアップされた後の変更のみを捕捉し、以前のデータは捕捉しません
チェンジデータのクエリー
チェンジデータをクエリーするには、table_changesオペレーションを使用します。以下の例では、インサートされた行と、更新された行の前後のイメージが含まれており、必要に応じて変更の差異を評価することができます。削除された行を返却するdeleteチェンジタイプもあります。
このサンプルは開始バージョンに基づいて変更されたレコードにアクセスしますが、終了バージョン、必要に応じて開始タイムスタンプと終了タイムスタンプに基づいてバージョンをキャップすることもできます。この例ではSQLにフォーカスしていますが、Python、Scala、Java、Rでデータにアクセスすることができます。こちらのドキュメントをご覧ください。
MERGE文におけるCDF行データの使用
ゴールドテーブルにマージするようなMERGE文の集計処理は、本質的に複雑なものとなり得ますが、CDF機能によってこれらの文をよりシンプルかつ効率的なものにします。
上の図からわかるように、table_changesオペレーションを用いて、変更されたあるいは新規のデータに対して必要な集計処理のみが行われるので、CDFはどの行が変更したのかを導き出すことがシンプルになります。以下では、どの日付と株価シンボルが変更されたのかを特定するためにどのように変更データを使用しているのかがわかります。
以下のように、ゴールドテーブルにインサート、あるいは更新される必要がある行のデータのみを集計するために、シルバーテーブルからチェンジデータを使用することができます。このためには、*table_changes(‘table_name’,’version’)*に対してINNER JOINを使用します。
最終的な結果は、時と共にインクリメンタルに変化することができるゴールテーブルの明確かつ簡素なバージョンとなります!
典型的なユースケース
こちらでは、新たなCDF機能の一般的なユースケースとメリットを説明します。
シルバー&ゴールドテーブル
ETL/ELTオペレーションを加速し、シンプルにするために最初のMERGE比較以降の変更のみを処理することでDeltaの性能を改善します。
マテリアライズドビュー
背後にあるすべてのテーブルを再処理する必要なしに、変更が適用される場所のみを更新することで、BIや分析に使用する最新かつ集計されたビューを作成します。
変更の送信
データパイプラインの後段のステージでインクリメンタルに処理を行うために、KafkaやRDBMSのような後段のシステムにチェンジデータフィードを送信します。
監査記録テーブル
いつ削除が行われたのか、何が更新を行ったのかなど時系列の変化を見るために、Deltaテーブルが永続化ストレージと効率的なクエリー機能を提供するので、チェンジデータフィードの出力を捕捉します。
いつチェンジデータフィードを使うべきか
✅ | ❌ |
---|---|
Deltaの変更に更新や削除が含まれる | Deltaの変更は追加のみ |
バッチごとに小さな割合のレコードが更新される | バッチごとにテーブルの大部分のレコードが更新される |
外部ソースから受信するデータがCDCフォーマットである | データ受信が破壊的なロードを引き起こす |
後段のアプリケーションにデータの変更を送信する | レイクハウスの外でデータを検索し取り込む |
まとめ
Databricksにおいては、不可能を可能に、難しいことをシンプルにしたいと考えています。Delta Lakeが作り出されるまでは、CDC、ログのバージョン管理、MERGEオペレーションは実質不可能でした。今では、すばらしいチェンジデータフィード(CDF)の機能によってよりシンプルかつ効率的になりました!