How Uplift Scales CDC data pipelines With Databricks Delta Live Tables - The Databricks Blogの翻訳です。
本書は抄訳であり内容の正確性を保証するものではありません。正確な内容に関しては原文を参照ください。
この記事はUpliftのRuchiraとJoydeepによる共著であり、Databricksレイクハウスプラットフォーム導入における貢献とソートリーダーシップに感謝の意を評します。
Upliftは人々が人生からより多くのものを手に入られ、適切なタイミングで購入が行えるように、Buy Now、Pay Later(即時購入、後で支払い)ソリューションをリードしています。Upliftの柔軟性のある支払いオプションは、購買者に対して、即時購入そしてその後の期間にわたる支払いのための、シンプルかつ安心できる選択肢を提供しています。
Upliftのソリューションは、200以上の出店パートナーの支払いのフローを、最高レベルのセキュリティ、プライバシー、データ管理とインテグレーションしています。これによって、顧客は摩擦を感じることなしに、オンライン、コールセンター、対面での体験を通じて買い物を楽しむことができます。この巨大なパートナーエコシステムによって、自身のエンジアリングチームに対してデータエンジアリングと分析の両方で課題を生み出すことになりました。データによって企業が指数関数的に成長することが主要な価値創造のドライバーであるため、Upliftはインフラストラクチャと管理すべき「管理コード(janitor code)」を最小化する極度にスケーラブルなソリューションを必要としました。
数百のパートナーとデータソースを用いて、Upliftは以下のような洞察やオペレーションを促進するためにインテグレーションを行うことで、自身のコアとなるデータパイプラインを活用しています。
- ファンネルメトリクス - アプリケーション率、承認率、テイクアップ率、コンバージョン率、トランザクションの規模
- ユーザーメトリクス - リピートユーザー率、トータルのアクティブユーザー、新規ユーザー、解約率、チャネル横断ショッピング
- パートナーレポート - パートナーレベルでのファンネルメトリクス、収益メトリクス
- ファンディング - 受給条件の評価指標、メトリクス、融資した資産に対するモニタリング
- 融資 - ロール率、過失のモニタリング、回復、クジレット/不正の承認ファンネル
- カスタマーサポート - コールセンターの統計情報、キューのモニタリング、支払いポータルのアクティビティ
これを実現するためにUpliftは、KafkaやS3オブジェクトストレージから数百のトピックを容易に取り込み、オーケストレートする堅牢なデータインテグレーションシステムを構築するためにDatabricksのレイクハウスプラットフォームを活用しました。それぞれのデータソースは別々に格納されつつも、アプリケーションエンジニアリングチームによって自動で新規のソースは検知、取り込みがなされ、後段の分析チームで利用できるようにそれぞれのデータソースのデータは独立に進化していきます。
レイクハウスプラットフォームによる標準化をする前は、新規データソースの追加は、新たなデータパイプラインの開発が必要であったため、新規データソースの追加やチーム間での変更に関するコミュニケーションは、手動、エラーの混入可能性が高く、時間を浪費するものとなっていました。Delta Live Tablesを用いることで、彼らのシステムはスケーラブルかつ、変更に対して自動で対応し、設定可能なものとなり、開発、管理、オーケストレートするノートブックの数を削減(100以上から2つのパイプラインに)することで、洞察を得るのに要する時間を劇的に加速しました。
このデータ取り込みパイプラインにおいては、Upliftでは以下の要件がありました。
- Delta Lakeを基盤にし、Kafka/S3から100以上のトピックをスケーラブルにレイクハウスに取り込める能力を提供することで、アナリストによって生データからテーブルフォーマットの形で活用できるようにすること。
- 任意のタイミングで到着する可能性のある新規のKafkaトピックに対するテーブルを動的に作成するフレキシブルなレイヤーを提供すること。これによって、新たなデータの発見と活用を容易にする。
- Kafkaからのデータの変更によるそれぞれのトピックのスキーマ変更を自動で更新すること。
- 本格運用されているテーブルが適切に管理されるように、スキーマ強制、データ品質に対する期待、データ型のマッピング、デフォルト値などの明示的なテーブルルールによって設定可能な後段のレイヤーを提供すること。
- 明示的に設定された全てのテーブルに対するSCD Type1を取り扱えるデータパイプラインがあること。
- サマリー統計情報やトレンドの集計を作成できるように後段のアプリケーションと連携できること。
これらの要件は「多重化(multiplexing)」と呼ばれるデザインパターンに対するユースケースと適合していました。独立したストリームのセットの全てが同じソースを共有する場合に多重化が使われます。この例では、100もの変更イベントがある生データを持つKafkaのメッセージキューと一連のS3バケットから単一のDeltaテーブルに取り込み、並列でパーシングを行います。
多重化は1対1のソースとターゲットストリームの典型的なパターンとは異なるトレードオフを持つ複雑なストリーミングデザインパターンであることに注意してください。あなたが多重化を必要と考えており、まだ実装していないのであれば、基本的なストリーミングに関するベストプラクティス、そして、このデザインパターンの実装におけるトレードオフをカバーしているストリーミングの本格運用に関する動画を見るのが良いスタート地点となるかもしれません。
Delta Lakeによるメダリオンアーキテクチャを活用したこのユースケースにおける2つの汎用ソリューションをレビューしていきましょう。これは、以下のソリューションの両方を強化する基本的なフレームワークとなります。
多重化ソリューション
- DatabricksにおけるSpark構造化ストリーミングはforeachBatchメソッドを用いて1対多のストリーミングを活用します。このソリューションはブロンズステージテーブルを読み込み、マイクロバッチの中で単一のストリームを複数のテーブルに分割します。
- DatabricksのDelta Live Tables(DLT)は、並列で全てのストリームを作成し、管理するために用いられます。このプロセスでは、ブロンズテーブルのすべての一意のトピックを動的に識別し、トピックごとに明示的にコードを書いたりチェックポイントを管理することなしに、トピックごとに独立したストリームを生成します。
*本書の残りの部分では、皆様がSpark構造化ストリーミングとDelta Live Tablesの基礎を理解していることを前提としています。
ここでの例では、Delta Live Tablesは高度にフレキシブルなマネージドアーキテクチャの中で全てのテーブル定義の設定を行える宣言型パイプラインを提供します。DLTは1つのデータパイプラインを用いて、テーブルレベルの柔軟性を失うことなしに設定可能なパイプラインの中に100のテーブルを定義、ストリーム、管理することができます。例えば、ある後段のテーブルは日次で更新し、他のテーブルは分析のためにリアルタイムで更新することが可能です。これらすべてを1つのデータパイプラインで管理することができます。
Delta Live Tables (DLT) ソリューションにディープダイブするまえに、DatabricksにおけるSpark構造化ストリーミングを用いた既存のソリューションデザインにも触れておきましょう。
ソリューション1: DatabricksのDeltaとSpark構造化ストリーミングによる多重化
構造化ストリーミングデザインパターンのアーキテクチャを以下に示します。
構造化ストリーミングタスクにおいては、ストリームがKafkaから複数のトピックを読み込み、foreachBatch文の中で複数のテーブルに対して1つのストリームを流し込みます。以下のコードブロックは、1つのストリームで複数のてーブルへの書き込みを行う例です。
df_bronze_stage_1 = spark.readStream.format(“json”).load()
def writeMultipleTables(microBatchDf, BatchId):
df_topic_1 = (microBatchDf
.filter(col("topic")== lit("topic_1"))
)
df_topic_2 = (microBatchDf
.filter(col("topic")== lit("topic_2"))
)
df_topic_3 = (microBatchDf
.filter(col("topic")== lit("topic_3"))
)
df_topic_4 = (microBatchDf
.filter(col("topic")== lit("topic_4"))
)
df_topic_5 = (microBatchDf
.filter(col("topic")== lit("topic_5"))
)
### Apply schemas
## Look up schema registry, check to see if the events in each event type are equal to the most recently registered schema, Register new schema
##### Write to sink location (in series within the microBatch)
df_topic_1.write.format("delta").mode("overwrite").option("path","/data/dlt_blog/bronze_topic_1").saveAsTable("bronze_topic_1")
df_topic_2.write.format("delta").option("mergeSchema", "true").option("path", "/data/dlt_blog/bronze_topic_2").mode("overwrite").saveAsTable("bronze_topic_2")
df_topic_3.write.format("delta").mode("overwrite").option("path", "/data/dlt_blog/bronze_topic_3").saveAsTable("bronze_topic_3")
df_topic_4.write.format("delta").mode("overwrite").option("path", "/data/dlt_blog/bronze_topic_4").saveAsTable("bronze_topic_4")
df_topic_5.write.format("delta").mode("overwrite").option("path", "/data/dlt_blog/bronze_topic_5").saveAsTable("bronze_topic_5")
return
### Using For each batch - microBatchMode
(df_bronze_stage_1 # This is a readStream data frame
.writeStream
.trigger(availableNow=True) # ProcessingTime='30 seconds'
.option("checkpointLocation", checkpoint_location)
.foreachBatch(writeMultipleTables)
.start()
)
Spark構造化ストリーミングソリューションにおける幾つかのキーとなる設計上の検討事項があります。
構造化ストリーミングで1対多でテーブルにストリーミングを行うためには、foreachBatch関数を使う必要があり、それぞれのマイクロバッチに対する関数の中でテーブルの書き込みを行う必要があります(上の例をご覧ください)。これは非常にパワフルなデザインですが、いくつかの制限があります。
- スケーラビリティ: テーブルの数が少ない場合は1対多のテーブル書き込みは簡単ですが、上のコード例で示したように、デフォルトでは全てのテーブルを直列で書き込む(sparkのコードは順序通りに動作し、次の処理が開始するためには前の処理が完了しなくてはなりません)ことを意味するので100のテーブルになるとスケールしません。これは、テーブルが追加されるたびにジョブのトータルの実行時間が大きく増えることを意味します。
- 複雑性: 書き込み処理はハードコーディングされており、これら新規のトピックに対して自動で検知を行いテーブルを作成するシンプルな方法がないことを意味します。新たなデータソースができるたびに、コードのリリースが必要となります。これは重大な時間の浪費となり、パイプラインを脆弱なものにします。可能ではありますが、膨大な開発工数を必要とします。
- 厳格性: テーブルは異なる周期で更新される必要があるかもしれません。また、異なるデータ品質期待、パーティションやデータレイアウトの要件に応じて異なる前処理ロジックを必要とする場合もあります。このため、異なるテーブルのグループに対して完全に独立したジョブの作成が必要になります。
- 効率: テーブルごとにデータボリュームの違いがあり、それらが同じストリーミングのクラスターを使用すると、クラスターの利用率が上がらない場合があります。これらのストリームのロードバランスには、開発工数とよりクリエイティブなソリューションが必要となります。
全体的にはこのソリューションはうまく動作しますが、単一のDLTパイプラインを用いることで、これらの課題に対処することができ、ソリューションさらにシンプルにすることができます。
ソリューション2: DatabricksのDelta Live Tables(Python)を用いた多重化とCDC(Change Data Capture)
上述の要件(新規テーブルの自動検知、1つの上部での並列ストリーム処理、データ品質の強制、テーブルごとのスキーマ進化、全てのテーブルの最終ステージでのCDCアップサートの実行)を簡単に満足するために、ステージごとに並列に全てのテーブルを宣言し作成するために、我々はPythonによるDetla Live Tablesのメタプログラミングモデルを使用します。
Delta Live Tablesを用いたこのソリューションのアーキテクチャは以下のようになります。
これは2つのタスクで構成される1つのジョブで実現されます。
-
タスクA: 1つのDeltaテーブルであるBronze Stage 1に対して、全てのKafkaトピックからの生データをreadStreamします。すると、タスクAはストリームが検知した個別のトピックに対するビューを作成します。(次のタスクでパースするそれぞれのトピックのペイロードを明示的に格納し、スキーマを使用するためにスキーマレジストリを使用することも可能です。このビューはそのスキーマレジストリを保持することもできますし、他のスキーマ管理システムを使用することもできます)。この例では、シンプルにトピックごとのJSONペイロードから全てのスキーマを動的に推定し、シルバーテーブルでデータ型の変換を行います。
-
タスクB: Bronze Stage 1からのストリームを受け取る1つのDelta Live Tablesパイプラインは、最初のタスクで生成されたビューを設定として用い、トリガーされるたびにビューにある全てのトピックに対するBronze Stage 2テーブルを作成するために、メタプログラミングモデルを使用します。
すると、同じDLTパイプラインが、より厳密なデータ品質期待とデータ型の強制を用いて「プロダクション化された」テーブルを登録するために、明示的な設定(この場合JSON設定)を読み込みます。このステージでは、パイプラインが全てのBronze Stage 2テーブルをクレンジングし、最終的なSilver Stageに更新をマージするために、プロダクション化されたテーブルに対してAPPLY CHANGES INTOメソッドを実装します。
最後に、レポートに活用できるような分析に供するテーブルであるGold StageがSilver Stageから集計されます。
Delta Live Tablesによる多重化 + CDCの実装ステップ
以下にDelta Live Tablesによる多重化 + CDCをセットアップするための個々の実装ステップを示します。
- 生データからBronze Stage 1へ - Kafkaからのトピック読み込みとBronze Stage 1 Deltaテーブルに書き込みを行うコードサンプル。
- トピック/イベントの一意なビューの作成 - Bronze Stage 1からビューを作成。
- 1つのBronze Stage 1から個々のテーブルに分岐 ビューからBronze Stage 2を作成するコードサンプル(メタプログラミング)。
- Bronze Stage 2をSilver Stageへ - シルバー設定レイヤーとシルバーテーブル管理設定サンプルによるメタプログラミングモデルのデモのコードサンプル。
- Gold集計の作成 - 完全なGoldサマリーテーブルを作成するDelta Live Tablesのコードサンプル。
- DLTパイプラインのDAG - Bronze Stage 1からGoldに至るDLTパイプラインのテストと実行。
- DLTパイプラインの設定 - パラメーター、クラスターのカスタマイズ、プロダクションでの実装に必要となるその他の設定変更によるDelta Live Tablesパイプラインの設定。
- マルチタスクジョブの作成 - ステップ1とステップ2-7(これら全てが1つのDLTパイプライン)を、2つのタスクが直列で実行される単一のDatabricksジョブで結合。
ステップ1: 生データからBronze Stage 1へ
startingOffsets = "earliest"
kafka = (spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", kafka_bootstrap_servers_plaintext)
.option("subscribe", topic )
.option("startingOffsets", startingOffsets)
.load()
)
read_stream = (kafka.select(col("key").cast("string").alias("topic"), col("value").alias("payload"))
)
(read_stream
.writeStream
.format("delta")
.mode("append")
.option("checkpointLocation", checkpoint_location)
.option("path", )
saveAsTable("PreBronzeAllTypes")
)
ステップ2: トピック/イベントの一意なビューの作成
%sql
CREATE VIEW IF NOT EXISTS dlt_types_config AS
SELECT DISTINCT topic, sub_topic -- Other things such as schema from a registry, or other helpful metadata from Kafka
FROM PreBronzeAllTypes;
ステップ3: 1つのBronze Stage 1から個々のテーブルに分岐
%python
bronze_tables = spark.read.table("cody_uplift_dlt_blog.dlt_types_config")
## Distinct list is already managed for us via the view definition
topic_list = [[i[0],i[1]] for i in bronze_tables.select(col('topic'), col('sub_topic')).coalesce(1).collect()]
print(topic_list)
import re
def generate_bronze_tables(topic, sub_topic):
topic_clean = re.sub("/", "_", re.sub("-", "_", topic))
sub_topic_clean = re.sub("/", "_", re.sub("-", "_", sub_topic))
@dlt.table(
name=f"bronze_{topic_clean}_{sub_topic_clean}",
comment=f"Bronze table for topic: {topic_clean}, sub_topic:{sub_topic_clean}"
)
def create_call_table():
## For now this is the beginning of the DAG in DLT
df = spark.readStream.table('cody_uplift_dlt_blog.PreBronzeAllTypes').filter((col("topic") == topic) & (col("sub_topic") == sub_topic))
## Pass readStream into any preprocessing functions that return a streaming data frame
df_flat = _flatten(df, topic, sub_topic)
return df_flat
for topic, sub_topic in topic_list:
#print(f”Build table for {topic} with event type {sub_topic}”)
generate_bronze_tables(topic, sub_topic)
ステップ4: Bronze Stage 2をSilver Stage
Bronze Stage 2の変換処理とテーブル設定を生成するDLT関数の定義
def generate_bronze_transformed_tables(source_table, trigger_interval, partition_cols, zorder_cols, column_rename_logic = '', drop_column_logic = ''):
@dlt.table(
name=f"bronze_transformed_{source_table}",
table_properties={
"quality": "bronze",
"pipelines.autoOptimize.managed": "true",
"pipelines.autoOptimize.zOrderCols": zorder_cols,
"pipelines.trigger.interval": trigger_interval
}
)
def transform_bronze_tables():
source_delta = dlt.read_stream(source_table)
transformed_delta = eval(f"source_delta{column_rename_logic}{drop_column_logic}")
return transformed_delta
Delta Live TablesにおけるCDCを伴うSilverテーブルを生成する関数の定義
def generate_silver_tables(target_table, source_table, merge_keys, where_condition, trigger_interval, partition_cols, zorder_cols, expect_all_or_drop_dict, column_rename_logic = '', drop_column_logic = ''):
#### Define DLT Table this way if we want to map columns
@dlt.view(
name=f"silver_source_{source_table}")
@dlt.expect_all_or_drop(expect_all_or_drop_dict)
def build_source_view():
#
source_delta = dlt.read_stream(source_table)
transformed_delta = eval(f"source_delta{column_rename_logic}{column_rename_logic}")
return transformed_delta
#return dlt.read_stream(f"bronze_transformed_{source_table}")
### Create the target table definition
dlt.create_target_table(name=target_table,
comment= f"Clean, merged {target_table}",
#partition_cols=["topic"],
table_properties={
"quality": "silver",
"pipelines.autoOptimize.managed": "true",
"pipelines.autoOptimize.zOrderCols": zorder_cols,
"pipelines.trigger.interval": trigger_interval
}
)
## Do the merge
dlt.apply_changes(
target = target_table,
source = f"silver_source_{source_table}",
keys = merge_keys,
#where = where_condition,#f"{source}.Column) <> col({target}.Column)"
sequence_by = col("timestamp"),#primary key, auto-incrementing ID of any kind that can be used to identity order of events, or timestamp
ignore_null_updates = False
)
return
Silverテーブル設定の取得およびマージ関数への引き渡し
for table, config in silver_tables_config.items():
##### Build Transformation Query Logic from a Config File #####
#Desired format for renamed columns
result_renamed_columns = []
for renamed_column, coalesced_columns in config.get('renamed_columns')[0].items():
renamed_col_result = []
for i in range( 0 , len(coalesced_columns)):
renamed_col_result.append(f"col('{coalesced_columns[i]}')")
result_renamed_columns.append(f".withColumn('{renamed_column}', coalesce({','.join(renamed_col_result)}))")
#Drop renamed columns
result_drop_renamed_columns = []
for renamed_column, dropped_column in config.get('renamed_columns')[0].items():
for item in dropped_column:
result_drop_renamed_columns.append(f".drop(col('{item}'))")
#Desired format for pk NULL check
where_conditions = []
for item in config.get('upk'):
where_conditions.append(f"{item} IS NOT NULL")
source_table = config.get("source_table_name")
upks = config.get("upk")
### Table Level Properties
trigger_interval = config.get("trigger_interval")
partition_cols = config.get("partition_columns")
zorder_cols = config.get("zorder_columns")
column_rename_logic = ''.join(result_renamed_columns)
drop_column_logic = ''.join(result_drop_renamed_columns)
expect_all_or_drop_dict = config.get("expect_all_or_drop")
print(f"""Target Table: {table} \n
Source Table: {source_table} \n
ON: {upks} \n Renamed Columns: {result_renamed_columns} \n
Dropping Replaced Columns: {renamed_col_result} \n
With the following WHERE conditions: {where_conditions}.\n
Column Rename Logic: {column_rename_logic} \n
Drop Column Logic: {drop_column_logic}\n\n""")
### Do CDC Separate from Transformations
generate_silver_tables(target_table=table,
source_table=config.get("source_table_name"),
trigger_interval = trigger_interval,
partition_cols = partition_cols,
zorder_cols = zorder_cols,
expect_all_or_drop_dict = expect_all_or_drop_dict,
merge_keys = upks,
where_condition = where_conditions,
column_rename_logic= column_rename_logic,
drop_column_logic= drop_column_logic
)
ステップ5: Gold集計の作成
Gold集計テーブルの作成
@dlt.table(
name='Funnel_Metrics_By_Day',
table_properties={'quality': 'gold'}
)
def getFunnelMetricsByDay():
summary_df = (dlt.read("Silver_Finance_Update").groupBy(date_trunc('day', col("timestamp")).alias("Date")).agg(count(col("timestamp")).alias("DailyFunnelMetrics"))
)
return summary_df
ステップ6: DLTパイプラインのDAG
全てをまとめることで以下のDLTパイプラインを作成します。
ステップ7: DLTパイプラインの設定
{
"id": "c44f3244-b5b6-4308-baff-5c9c1fafd37a",
"name": "UpliftDLTPipeline",
"storage": "dbfs:/pipelines/c44f3244-b5b6-4308-baff-5c9c1fafd37a",
"configuration": {
"pipelines.applyChangesPreviewEnabled": "true"
},
"clusters": [
{
"label": "default",
"autoscale": {
"min_workers": 1,
"max_workers": 5
}
}
],
"libraries": [
{
"notebook": {
"path": "/Streaming Demos/UpliftDLTWork/DLT - Bronze Layer"
}
},
{
"notebook": {
"path": "/Users/DataEngineering/Streaming Demos/UpliftDLTWork/DLT - Silver Layer"
}
}
],
"target": "uplift_dlt_blog",
"continuous": false,
"development": true
}
この設定では、パイプラインレベルのパラメーター、IAMインスタンスプロファイルのようなクラウド設定、クラスター設定などを設定することができます。利用できるDLT設定の完全なリストについては、こちらのドキュメントをご覧ください。
ステップ8: マルチタスクジョブの作成
DLTパイプラインと前処理を1つのジョブにまとめます。
Delta Live Tablesでは、パイプラインコードを変更することなしにテーブルの設定を通じて、それぞれのテーブルの全ての設定を独立にコントロールすることができます。これはパイプラインの変更をシンプルなものにし、高度なオートスケーリングによってスケーラビリティを大きく開演し、テーブルの並列生成によって効率を改善します。最後になりますが、100以上のテーブルを持つパイプライン全体は1つのジョブでサポートされており、全てのストリーミングのインフラストラクチャはシンプルな設定に抽象化されており、シンプルなUIでパイプラインの全てのテーブルに対するデータ品質が管理されます。Delta Live Tables以前では、このようなパイプラインのデータ品質とリネージュの管理は、手動かつ非常に時間を消費するものだったことでしょう。
これは、洗練されたパイプラインをデータエンジニアとアナリストが内製し、自前で管理しようとしたら数百時間を要するところでしたが、Delta Live Tablesを用いることでデータエンジアリングの体験をシンプルにしたという偉大な例と言えます。
最終的には、Delta Live Tablesによって、Upliftは数千行の「管理コード」や個々のデータソースに苦慮することなしに、自身のパートナーに対してよりスマートかつ効果的な製品を提供することにフォーカスできるようになりました。