Simplifying Change Data Capture with Databricks Delta - The Databricks Blogの翻訳です。
注意 効率的なUPSERT、DELETEを行うためのMERGEコマンドの使い方を説明しているDatabricks Delta Lakeによる効率的なUPSERTを読むことをお勧めします。
Databricksにおいて遭遇する一般的なユースケースは、お客様において、複数のソースから一連のDatabricks Deltaテーブルに至るチェンジデータキャプチャ(CDC)を実行するというものです。これらのソースはオンプレミスあるいはクラウドに存在するオペレーショナルデータベースやデータウェアハウスとなります。これらを結びつける接着剤となるのが、以下の方法で生成されたチェンジセットです。
- Oracle GoldenGateやInformatica PowerExchangeのようなETL
- ベンダーが提供する変更テーブル(例:Oracle Change Data Capture)
- insert/update/deleteトリガーを用いて変更をキャプチャするユーザー管理のデータベーステーブル
そして、これらのチェンジセットをDatabricksのDelta Lakeにマージしたいと考えます。このユースケースを数多くのお客様に対して実装した我々の経験に基づき、Databricks Delta Lakeで利用できる機能を用いてCDCを行うためのリファレンスアーキテクチャを示します。
背景
チェンジデータキャプチャ、略してCDCは、一連のデータソースにおける変更を捕捉し、多くの場合データウェアハウスであるターゲットテーブルに変更をマージするプロセスのことを指します。これらは多くの場合、夜間、時間単位、あるいは、それより短い周期(例えば15分ごと)で実行されます。ここではこの周期をリフレッシュ周期と呼びます。
リフレッシュ周期内の特定のテーブルにおける変更レコードセットは、チェンジセットとして参照されます。最後に、チェンジセット内で同じ主キーを持つ一連のレコードをレコードセットとして参照します。直感的に、これらは最終テーブルにおける同じレコードに対する別々の変更として参照されます。
FLAG | ID | VALUE | CDC_TIMESTAMP |
---|---|---|---|
I | 1 | 10 | 2018-01-01 16:02:00 |
U | 1 | 11 | 2018-01-01 16:02:01 |
D | 1 | 11 | 2018-01-01 16:02:03 |
U | 2 | 20 | 2018-01-01 16:02:00 |
D | 3 | 30 | 2018-01-01 16:02:00 |
表1: テーブルTの 2018-01-01 17:00:00時点のチェンジセットC
表1には特定の時点におけるテーブルTのチェンジセットCを示しています。チェンジセットCには4つのカラムがあります。
- FLAGは変更のタイプI/U/D(insert/update/delete)を示します。
- IDはレコードを一意に識別します。
- VALUEはレコードが更新された際の値を示します。
- CDC_TIMESTAMPはいつレコードが追加、更新、削除されたのかを示しています。ターゲットテーブルTはFLAG以外は同じスキーマとなっています。
このチェンジセットにおいては、ID1のレコードが追加、更新、削除されています(1行目から3行目)。このように、ID=1のレコードセットは3つのレコードを持っています。ID2のレコードは更新のみが行われ、ID3のレコードは削除されています。レコードID2とID3は事前に追加されたものと考えて構いません。
Databricks Delta Lake以前のCDC
Delta Lakeの出現前において、我々のお客様におけるサンプルCDCパイプラインは、Informatica => Oracle => Sparkによる夜間バッチジョブ => Databricksというものでした。
このシナリオにおいては、Informaticaが30以上のデータソースからチェンジセットをプッシュし、Oracleのデータウェアハウスで統合していました。おおよそ1日に1回、DatabricksのジョブがJDBC経由でこれらのチェンジセットを収集し、Databricks上のテーブルをリフレッシュしていました。このスキームはうまく本格運用で来ていましたが、2つの大きな欠点がありました。
- すでに過負荷なOracleインスタンスに負荷をかけており、ETLジョブをいつ、どのように実行するのかに関して制約がありました。
- 純粋なParquetテーブル(Databricks Delta Lake以前)の同時実行性の限界から、リフレッシュレートがベストだったのは夜間でした。
Databricks Delta LakeによるCDC
DatabricksのDelta Lakeによって、CDCパイプラインはInformatica => S3 => Sparkによる夜間バッチジョブ => Deltaという形に整理され、より高い頻度でリフレッシュされるようになりました。このシナリオにおいては、InformaticaはチェンジセットをInformaticaのParquetライターを用いてS3に直接書き込みます。ターゲットのDatabricks Deltaテーブルを更新するためにより高い頻度(例:15分単位、1時間単位、3時間単位など)でDatabricksのジョブが実行されます。
わずかの修正で、このパイプラインはKafkaからCDCレコードを読み込めるようにもなりましたので、パイプラインはKafka => Spark => Deltaという流れになりました。このセクションの残りでは、このプロセスと、Databricks DeltaをどのようにCDCワークフローのシンクとして利用したのかを詳細に説明します。
あるお客様に対して、我々は最大かつ高頻度でリフレッシュされるETLパイプライン上にCDCテクニックを実装しました。このお客様のシナリオにおいては、Informaticaが65のテーブルで生じる変更を15分周期でS3にチェンジセットとして書き込んでいました。チェンジセット自身は小さいもの(< 1000レコード)でしたが、ターゲットテーブルが非常に大きなものになっていました。65テーブルのうち、およそ6テーブルは50m-100m行となっており、残りは50mより小さいものでした。Informaticaと同期するためにOrackeにおいて、このパイプラインは15分周期で処理を実行していました。Databricks Deltaにおいては、我々は当初、S3のレーテンシーのために1時間くらいになるのではないかと考えていましたが、喜ばしいことにクラスターサイズによって、30分、さらには15分リフレッシュを実現することができました。
Insert Overwriteの使用
このアプローチの背後にある基本的な考え方は、特定のレコードセットにおける全ての更新を集積するステージングテーブル、ユーザーがクエリーを行うことができる現時点で最新のスナップショットを保持するファイナルテーブルを保持するというものでした。
*図1: InformaticaのソースからDatabricks DeltaのクラウドストレージへのInsert Overwriteフロー*それぞれのリフレッシュ周囲において、Sparkのジョブは2つのINSERTを実行します。
- Insert(Insert 1): 当該リフレッシュ周期で、S3やKafkaからチェンジセットを読み込み、ステージングテーブルに変更をINSERTする。
- Insert Overwrite(Insert 2): ステージングテーブルから全てのレコードセットの現在のバージョンを取得し、ファイナルテーブル上で対応するレコードを上書きする。
CDC実践者に馴染みのある分類スキームは、slowly changing dimensions(SCDs)として知られる異なるタイプの更新のハンドリング手法です。我々のステージングテーブルはSCD Type2スキームに近いものであり、ファイナルテーブルはSCD Type1スキームに近いものになっています。
実装
2つのステップの詳細を見ていきましょう。まずは最初のinsertからです。
%scala
val changeSets = Array(file1, file2, …)
spark.read.parquet(changeSets :_*).createOrReplaceTempView("incremental")
%sql
INSERT INTO T_STAGING
PARTITION(CREATE_DATE_YEAR)
SELECT ID, VALUE, CDC_TIMESTAMP
FROM INCREMENTAL
ここでは、最初のコマンドで二つ目のコマンドのINSERT INTO
に与えるチェンジセットに対する一時ビューを定義しています。INSERT INTO
については、PARTITION
以外はわかりやすいものとなっていますので、この詳細を見ていきましよう。
クラウドデータストレージとHDFSにおいては、レコードはファイルに保存されるので、更新単位もファイルとなることを思い出してください。Databricks Delta Lakeの場合、こちらの記事で説明したように、これらはParquetファイルです。レコードが更新される必要があるときには、Sparkはこのファイル全体を読み込んで、再度書き込みを行う必要があります。このため、可能な限り更新対象のファイルを少なくするように、更新を局所化することが重要です。以上のことから、我々はステージングテーブル、ファイナルテーブルの両方を、CDCで操作される行数を最小化するカラムでパーティショニングするために、PARTITION句(Azure|AWS)にパーティションカラムを指定し、Databricks Detla LakeがT_STAGING
の適切なパーティションにレコードを追加できるようにしています。
次に、二つ目のinsertを見ましょう。
%sql
INSERT OVERWRITE TABLE T_FINAL
PARTITION(CREATE_DATE_YEAR)
SELECT ID, VALUE, CDC_TIMESTAMP
FROM (
SELECT A.*,
RANK() OVER (PARTITION BY ID ORDER BY CDC_TIMESTAMP DESC) AS RNK
FROM T_STAGING A.*
WHERE CREATE_DATE_YEAR IN (2018, 2016, 2015)
) B
WHERE B.RNK = 1 AND B.FLAG < > 'D'
T_STAGING
から読み込む内部のクエリーからスタートしましょう。ステージングテーブルには、特定のレコードセットに対するあらゆる数のinsert、update、deleteが含まれることを思い出してください。複数のリフレッシュ周期にまたがってこれらはステージングテーブルに追加されるので、これらの変更は特定のチェンジセット(例:表1におけるID=1の3つの変更)、あるいは複数のチェンジセットによる場合があります。内部のRANK
と外部のフィルターB.RNK=1 and B.FLAG <> 'D'
によって、以下を保証します。
- 特定のレコードセットにおける最新の変更のみをピックアップします。
- 最新の変更が
'D'
の場合、当該レコードセット全てをファイナルステージには追加しないことで、レコードのdeleteを実現します。
次に、WHERE CREATE_DATE_YEAR IN ( … )
句に注意を払います。外部クエリーのPARTITION(CREATE_DATE_YEAR)
と共に、これによってDatabricks Delta Lakeが 2018、2016、2015のような特定のパーティションのみを上書きし、他のものには触れないことを確実にします。上では分かりやすくするためにパーティションをハードコーディングしていますが、実際の実装では、これらのパーティションはScalaのリストとして提供され、以下のようにチェンジセットに対するクエリーから動的に生成されます。
val partitionsToOverwrite = spark.sql("select year(to_date(create_date, "MM/dd/yyyy")) from incremental")
...
spark.sql(s"""
INSERT OVERWRITE T_FINAL
...
WHERE CREATE_DATE_YEAR IN ( ${partitionsToOverwrite.mkString(",") )
...
""")
性能
上述したように、Databricks Delta Lakeによって、ユーザーがデータに対して一貫性のあるビューをクエリーできるように、CDCパイプラインは同時並行に実行されます。ここでは、リーダー、ライター両方を最適化するために用いられる2つの機能を説明します。
-
パーティションプルーニング: 上の2つ目のinsert(ライター)においては、Databricks Delta Lakeは、更新するパーティションのみを読み取り、再書き込みするために、
PARTITION
の指定とWHERE
句のIN
リストを参照します。実際、これによって操作されるテーブルの割合は半分、通常はそれ以下となり、2つ目のinsertにおけるT_FINAL
の更新およびT_STAGING
のSELECT
の局所化を改善します。 -
データスキッピング/ZORDERインデックス:
T_FINAL
にクエリーを行うユーザーは、BIツールからアドホックのSQLクエリーと多岐に渡ります。ここでは、WHERE
句にパーティションカラムCREATE_DATE_YEAR
が含まれる場合もあれば含まれない場合もあります。
%sql
SELECT …
FROM T_FINAL
WHERE COL1 = val and COL2 = val
この場合、COL1
もCOL2
もパーティションカラムに含まれていません。しかし、ユーザーはこれら二つのカラムに対するZ-orderインデックスを作成することができます。
OPTIMIZE T_FINAL ZORDER BY (COL1, COL2)
内部では、Databricks Delta Lakeが、上のクエリーにおいてCOL1 = val
とCOL2 = val
を含むファイルのみにアクセスするように、Z値に基づいてParquetファイルをクラスタリングします。
一連のクエリーを拡張するZ-orderインデックスを利用できるケースについて、2つの説明を加えます。
- 上記のケースにおいては、
COL1
のみ(あるいはCOL2
のみ)にフィルタリングを行うクエリーもインデックスの恩恵を受けることができます。これは、RDBMSの複合インデックスと異なり、Z-orderインデックスにおいては、インデックスされたカラムリストのプレフィクスに基づくフィルターに対するバイアスが無いためです。 - 上のケースと異なり、クエリーがパーティションカラムに対するフィルタリングを行う場合には、クエリー実行の際にパーティションプルーニングとZ-orderインデックスの両方が劇的に処理対象のファイルの数を削減します。
どのようにデータスキッピングとZ-orderインデックスがパーティションプルーニングと共に動作するのかに関しては、こちらの素晴らしい記事を参照ください。
同時実効性
以前の記事で触れたように、Databricks Delta Lakeはクラウドストレージにトランザクション機能を導入します。この機能を以下のように活用することができます。パーティションを上書きしている間、Databricks Delta Lakeは新たなParquetファイルの作成に加え、このデータに対して同時にクエリーを行っているユーザー向けに古いParquetファイルをそのままにしておきます。上書き処理が完了した後にスタートしたクエリーは、新たなデータを参照するようになります。Deltaは、クエリーが一貫性のあるデータバージョンを参照するようにトランザクションログを活用します。
コンパクションおよびクリーンアップ
時間が経つと、T_STAGING
、T_FINAL
の両方に古く、かつ使われないレコードが集積していきます。例えば、T_STAGING
におけるRANK > 1
のレコードや、ファイルの上書きによって古いとマークされたT_FINAL
のレコードです。これらはクエリーの正確性には影響しませんが、時間が経つにつれてCDCとクエリーのパフォーマンスを劣化させます。幸運なことに、Databricks Delta Lakeにおいては、このようなメンテナンスタスクはシンプルなものとなっています。T_FINAL
における古いファイルの消去は以下のようにシンプルなものとなっています。
%sql
VACUUM T_FINAL
保持期間のパラメーター(VACUUMのドキュメントを参照ください: Azure|AWS)を指定しない場合は、トランザクションログに存在せず、7日より古いファイルは全て削除されます。これは、それらのファイルにアクセスしているリーダーが存在しないと考えるには十分長い時間です。
一方、T_STAGING
におけるハウスクリーニングでは、RANK > 1
である全てのレコードを全て削除します。これを行うシンプルな方法は、T_FINAL
をT_STAGING
にコピーすることです。
%sql
INSERT OVERWRITE T_STAGING SELECT * FROM T_FINAL
メンテナンスタスクとして、上の両方のコマンド、および上で説明したOPTIMIZE
コマンドをノートブックにまとめ、Databricksジョブとして実行をスケジューリングすることができます。
まもなく公開される(2018/10の記事です)Databricks Runtime 5.0においては、Databricks Delta Lakeのパフォーマンスの改善、レコードのdelete(D)サポートのために、MERGE INTOを用いることができます。
パイプラインの本格運用
プラットフォームとしてのDatabricksは、ETLパイプラインの構築だけでなく、これらのパイプラインを本格運用するための時間を短縮することができます。ここでは、CDCパイプラインを本格運用する際に活用できるApache Sparkの2つの機能と有用なテクノロジーを説明します。
設定駆動プログラミング
大規模アプリケーションを構築する際の一般的なデザインパターンは、設定によってソフトウェアの振る舞いを制御するというものです(例:YAMLやJSONベースの設定ファイル)。SparkのSQL、Scala、Pythonなどの汎用プログラミング言語のサポートによって、設定をテーブルに保持し、設定を用いた動的なSQL生成が可能となるため、このデザインパターンに適していると言えます。CDCの文脈でこれがどのように動作するのかを見ていきましょう。
まず、我々のCDCパイプラインには65のテーブルがあったことを思い出してください。65テーブルのそれぞれが行となり、列にはCDCのSQL文を構築するのに必要な情報を含むCONFIG
テーブルを保持します。
TABLE | PARTITION_COLUMN_EXPRESSION | PARTITION_COLUMN_ALIAS | RANK_EXPRESSION | IS_INSERT_ONLY |
---|---|---|---|---|
T1 | year(to_date(create_date, “MM/dd/yyyy”)) | create_date_year | PARTITION BY ID ORDER BY CDC_TIMESTAMP DESC | N |
T2 | year(to_date(transaction_date, “MM/dd/yyyy”)) | transaction_date_year | PARTITION BY ID1, ID2 ORDER BY CDC_TIMESTAMP DESC | N |
T3 | null | null | null | Y |
T4 | ... | ... | ... | ... |
表2 - 一連のテーブルに対するCDCパイプラインを実行するための設定テーブル |
特定のテーブルに対する設定情報を取得し、テーブルに対してCDCロジックを適用するために、以下のコードを使用します。
val hiveDb = “mydb”
val CONFIG_TABLE = “CONFIG”
// Table is a notebook input widget
val table=s"""${dbutils.widgets.get("wTable")}"""
val (partitionColumnExpression, partitionColumnAlias, rankExpression, isInsertOnly) = spark.sql(s"""
SELECT PARTITION_COLUMN_EXPRESSION, PARTITION_COLUMN_ALIAS, RANK_EXPRESSION, IS_INSERT_ONLY
FROM ${hiveDb}.${CONFIG_TABLE}
WHERE TABLE_NAME=LOWER('$table')
""").as[(String, String, String, Boolean)].head
...
/*
* Insert 1 above would look like following. Here, the table
* variable is set to T1 or T2 from the config table
*/
spark.sql(s"""
INSERT INTO ${table}_STAGING
PARTITION(${partitionColumnAlias)
SELECT ${projectListFromIncremental}
FROM INCREMENTAL
""")
...
// Insert 2 could look like
val partitionsToOverwrite = spark.sql(s"""SELECT DISTINCT ${partitionColumnExpression} FROM INCREMENTAL""").as[String].collect
spark.sql(s"""
INSERT OVERWRITE TABLE ${table}_FINAL
PARTITION(${partitionColumnAlias})
SELECT ${projectListFromIncremental}
FROM (SELECT A.*, RANK() OVER (${rankExpression}) AS RNK
FROM ${table}_STAGING A.*
WHERE ${partitionColumnAlias} IN (${partitionsToOverwrite.mkString(“,”) )
) B
WHERE B.RNK = 1 AND B.FLAG < > ‘D’
""")
ノートブックワークフローおよびジョブ
上の処理はProcessIncremental
という名前のノートブックに実装されているとします。ここで、ノートブックワークフローを用いて、65テーブルのそれぞれに対して処理を行い、特筆すべきチェンジセットを検知し、それらに対してProcessIncremental
を実行するコントローラーノートブックを作成します。
val startDate = “20180101”
val tables = spark.sql(s"""
SELECT TABLE_NAME
FROM $hiveDb.$CONFIG_TABLE
""").as[String].collect.map(_.toLowerCase)
tables.foreach { tbl =>
val processTheseChangeSets = dbutils.notebook.run("GetNextChangeSets", 0, Map(
"wHiveDb" -> hiveDb,
"wTable" -> tbl,
"wStartDate" -> startDate
)
)
if(!processTheseChangeSets.isEmpty) {
val stats = dbutils.notebook.run("ProcessIncremental", 0, Map(
"wHiveDb" -> hiveDb,
"wIncrFiles" -> processTheseChangeSets,
"wTable" -> tbl)
)
)
}
好きな頻度でCDCパイプラインを実行するように、容易にコントローラーノートブックをDatabricksのジョブとしてスケジューリングすることができます。最後に、上のループはシリアルですが、シリアルなコレクションをパラレルなコレクションに変換する.parイディオムあるいはScalaの機能を用いて、容易にパラレルループに変更することができます。
まとめ
この記事では、CDCツール(例:Oracle GoldenGate、Informatica PowerExchange)、ベンダー(例:Oracle Change Data Capture)によって管理される変更テーブル、ユーザーのinsert/update/deleleトリガーによって管理される変更テーブルで捕捉されるチェンジセットとDatabricks Delta Lakeをマージする際のリファレンスアーキテクチャを説明しました。エンドユーザーの読み取りを最適化できるように、Databricks Delta Lakeにこれらの変更を反映する際に使用されるSpark SQLの深堀り、二つのパフォーマンス観点(パーティショニング、Z-orderインデックス)での検討、コンパクションやクリーンアップといった補助的な部分に対する検討を行いました。そして、設定駆動プログラミングサポートによるETLパイプラインの構築、ノートブックワークフローとジョブによるワークフローの本格運用を通じて、どのようにDatabricksがワークフローの構築、運用を加速するのかを見ました。