これらの記事の続きです。
Delta Live Tablesの本格運用
DLTはパフォーマンスと使いやすさのために自動でデータを最適化します。
ベストプラクティス
-
What:
- DLTはDLTのテーブルを作成する際に、Deltaのベストプラクティスを組み込みます。
-
How:
- DLTは以下のプロパティを設定します:
- optimizeWrite
- autoCompact
- tuneFileSizesForRewrites
物理データ
-
What:
- DLTは、コストを最小化し、パフォーマンスを最適化するために、物理データを自動で管理します。
-
How:
- 日次でvacuumを実行
- 日次でoptimizeを実行
- どのように最適化するのかを指示することもできます(ZORDER)
スキーマ進化
- What:
- スキーマ進化が動作します。
- How:
- マテリアライズドビューのカラムの追加/削除/名称変更を行うと自動で適切な動作をします。
- ストリーミングテーブルでカラムを削除すると、古い値は保持されます。
ライフサイクル管理(Unity Catalogのみ)
DLTのコードから何かを削除すると、自動で対応するデータを削除します。
- DLTで移行スクリプトが不要に
- コードからデータセットを削除すると、Unity Catalogから削除します
- テーブルのプロパティを削除するとクリアします
- パーティショニングを削除するとそのパーティションを削除します
- パイプラインを削除すると、作成されたすべてのテーブルをクリーンアップします
- 削除は遅延処理かつ記録されるので、データを失うことなしに間違いから復旧できます
パイプラインUI
ETLのデバッグとオペレーションのワンストップショップを提供します。
- テーブル間のデータフローを可視化
- それぞれのテーブルのメタデータと品質を特定
- 過去の更新処理へのアクセス
- オペレーションのコントロール
- イベントのディープダイブ
パイプラインの設定
- パイプラインモード:
- トリガーモード - すべてのデータの処理が完了したらすぐに停止します。
- 連続モード - 新規データを連続的に処理します。
- クラスターモード:
- 固定サイズ - N個のノードを割り当て
- 強化オートスケール - 負荷に応じてクラスターをスケールアップ/ダウン
- レガシーオートスケール - バッチでのみ動作しますが、ストリームでは動作しません
- エディション - core、pro、advanced (doc)
- チャネル - current あるいは preview (主にテスト用)
- ターゲットスキーマ - テーブルを作成するデータベース名
- ストレージ - クラウドロケーション (ベストプラクティス: DBFSルートは使わないでください!)
- クラスター設定をカスタマイズできます。
オーケストレーション
パイプラインの起動
Delta Live Tablesでは結果の更新をどの頻度で行うのかを選択できます。
- Triggered: 手動
- Triggered: Databricksジョブでスケジュール
- 連続
ワークフローによるオーケストレーション
ワークフローを用いてTriggered DLTパイプラインをオーケストレーションできます。
- マルチタスクジョブでDLTパイプラインやその他のタスクを容易にオーケストレートするためにDatabricksジョブを活用します
- Databricksプラットフォームと完全にインテグレーションされており、結果の調査、デバッグを迅速にします
- マルチクラウド環境のワークロードをオーケストレート、管理します
- また、Apache AirflowやAzure Data Factoryのデータ処理ワークフローの一部としてDelta Live Tablesパイプラインを実行できます
ベストプラクティス:オートスケーリングでコスト削減
オートスケーリングで予算を設定してDLTにチューニングをお任せください。
プロダクションでの様々な設定に対する考え方:
- 最小ノード数: デフォルトのまま
- 最大ノード数: タイムリーな結果に対して支払いたいと考える最大値
- インスタンスタイプ: 設定しない – DLTが適切なインスタンスタイプを選択できる自由度を確保
この戦略はクラウドの弾力性を活用します:
- スピードアップできると判断した場合にのみ、オートスケーリングはリソースを追加します。
- 不要になるとすぐにリソースは解放されます。すべてのアップデートが完了するとクラスターはシャットダウンします。
- SLAを違反しないように、最大のサイズを大きするほど、入力ボリュームの変化に対して堅牢になります。
デプロイメント
DLTパイプラインのデプロイメント
- DLTパイプライン = ノートブック/ファイル + パイプライン設定
- DLTパイプラインは以下の方法でデプロイ/変更/削除できます:
-
Databricks DLT REST API:
- 課題: ノートブック/パスが変更した際に環境間でのプロモートが困難に
- 課題: クラスターポリシーなどに対する依存関係の環境間での取り扱いが困難に
-
Databricks CLIのpipelineコマンド:
- 課題: 上と同じ
- Databricks Terraform providerのdatabricks_pipelineリソース
- メリット: オブジェクト間の依存関係の取り扱いが容易 - ノートブック、クラスターポリシーなど…
- 課題: Databricks Terraform exporterが手動でパイプラインを記述することを回避する助けになるかもしれませんが、Terraformへの習熟が必要
- おすすめ: Databricks Asset Bundles!
- Databricks CLIの新たなパーツ
- Databricks Visual Studio拡張とインテグレーション
-
Databricks DLT REST API:
TerraformによるDLTパイプラインの配備
より複雑なセットアップはこちら
CI/CDの自動化: Databricks Asset Bundles(DAB)
リソース作成をDABが自動化
コードをワークスペースに自動で同期し、ジョブやパイプラインを作成します。
- 個別のチェックアウトの作成 → パイプラインは面倒で / エラーが混入しやすくなります
- DAB によってすべてのバージョン管理が可能になります
- 複数環境の作成を自動化します
yamlでリソースを指定
ジョブやパイプラインの設定はコードとともにバージョン管理されます。
DLTにおけるCI/CDパターン
デプロイメントとテストを改善するためにパイプラインを構造化しましょう。
- ソース & 変換処理のノートブック/ファイルの分離
- ソースとは外部入力から読み込みを行うクエリーです。理想的には、これらには変換処理はなく、非常にシンプルなものです。外部ストレージコストを避けるためにビューを使うこともできます。
- 変換処理はソースから読み込みを行い実際の処理を行います。
- 検証でエクスペクテーションを活用
- 理想的には、検証のほとんどは開発とプロダクションの両方で動作します。
- テスト専用のノートブックには、dev/testのみの追加テストを持つMVを含めることができます。
ソースと変換処理の分離
dev & testのための小規模なデータや理不尽な入力で置き換えます。
テストデータの作成
プロダクションデータのサブセット
CREATE LIVE VIEW sales
AS SELECT * FROM prod.sales
WHERE date > now() – INTERVAL 1 day
ハードコード
CREATE LIVE VIEW sales AS
SELECT -1 AS profit, “test-negative” AS region
UNION ALL
SELECT 99999 AS profit, “test-large” AS region
ファイルから読み込み
CREATE LIVE VIEW sales
AS SELECT * FROM json.`/test-data/sales.json`
ランダムに生成
@dlt.table
def sales():
return (spark.range(1, 1000)
.select(random_profit(), random_region()))
バンドルによるソースと変換処理
パイプラインの監視
イベントログ
イベントログは自動ですべてのパイプラインオペレーションを記録します。
- オペレーションの統計情報
- すべてのオペレーションの時刻と現在のステータス
- パイプラインとクラスターの設定
- 行数
- 来歴
- テーブルのスキーマ、定義、宣言されたプロパティ
- テーブルレベルのリネージ
- テーブル更新に用いられたクエリープラン
- データ品質
- エクスペクテーションの合格 / 失敗 / 削除の統計
- エクスペクテーションの失敗を引き起こした入力/出力の行数
ベストプラクティス: イベントログの連携
お使いの既存オペレーションツールでイベントログの情報を活用しましょう。イベントログは単にそれぞれのパイプラインで作成されるDeltaテーブルですので、他のシステムから容易に読み込むことができます。
イベントログのクエリーの例
以下からの引用です。
SELECT timestamp, details :cluster_resources.num_executors, details :cluster_resources.optimal_num_executors,
details :cluster_resources.latest_requested_num_executors, details :cluster_resources.avg_num_queued_tasks
FROM delta.`${event_log.location}`
WHERE event_type = 'cluster_resources' AND origin.update_id like '${latest_update.id}'
その他のイベントログのクエリー例
テーブル/エクスペクテーションごとの合格 & 失敗レコード数:
SELECT row_expectations.dataset as dataset, row_expectations.name as expectation,
SUM(row_expectations.passed_records) as passing_records, SUM(row_expectations.failed_records) as failing_records
FROM (SELECT explode(from_json(details :flow_progress :data_quality :expectations,
"array<struct<name: string, dataset: string, passed_records: int, failed_records: int>>")) row_expectations
FROM delta.`${event_log.location}`
WHERE event_type = 'flow_progress' AND origin.update_id like '${latest_update.id}')
GROUP BY row_expectations.dataset, row_expectations.name
ORDER BY row_expectations.dataset, row_expectations.name
オートスケールのイベント:
SELECT timestamp, details :autoscale.status,
Double(case when details :autoscale.status = 'RESIZING' then details :autoscale.requested_num_executors else null end) as starting_num_executors,
Double(case when details :autoscale.status = 'SUCCEEDED' then details :autoscale.requested_num_executors else null end) as succeeded_num_executors,
Double(case when details :autoscale.status = 'PARTIALLY_SUCCEEDED' then details :autoscale.requested_num_executors
else null end) as partially_succeeded_num_executors,
Double(case when details :autoscale.status = 'FAILED' then details :autoscale.requested_num_executors else null end) as failed_num_executors
FROM delta.`${event_log.location}` WHERE event_type = 'autoscale' AND origin.update_id like '${latest_update.id}' ORDER BY timestamp DESC
障害対応
DLTはビルトインのリトライロジックで一時的な問題に対応します。
- DLTでは、スピードと信頼性をバランスを取るためにリトライのエスカレーションを行います
- 個々のトランザクションをリトライします。
- 次に、不正な状態にある場合にはクラスターを再起動します。
- 最後の処理の成功以降にDBRがアップグレードされた場合には、退行を検知するために自動で古いバージョンで試行します。
- トランザクション保証
- テーブルに対するオペレーションは原子的です。
- パイプラインのそれぞれのテーブルのアップデートは原子的ではありません。ベストエフォートで可能な限り多くのテーブルをアップデートします。
ベストプラクティス:すべてのテーブル属性をコードとして管理
ガバナンス、ディスカバリー、データレイアウトに対する単一のソースを実現します。
CREATE MATERIALIZED VIEW report(
customer_id LONG COMMENT "the customer id in salesforce", …
)
TBLPROPERTIES (
delta.deletedFileRetentionDuration = "interval 30 days",
pipelines.autoOptimize.zOrderCols = "timestamp,user_id",
my_etl.quality = "gold",
my_etl.has_pii = "false"
)
COMMENT "Weekly E-Staff report on key customer metrics, produced by growth team."
AS …
- カラムとテーブル全体のコメントは、データのソースやどのような変換処理が行われているのかに関するドキュメント作成に活用できます。
-
delta.*
プロパティは格納されるデータの保持期間やその他の物理観点でDelta Lakeによってサポートされるオプションです。 -
pipelines.*
プロパティはデータをどのように整理するのかを含むDLTのオペレーションの制御に関するオプションです。 - アプリケーション固有のプロパティはキー/バリューペアとして格納されます。
- ベストプラクティスは、これらのプロパティを特定するために共通のプレフィクスを使うと言うものです。