Getting Started with Delta Live Tables - Databricksの翻訳です。
本書は抄訳であり内容の正確性を保証するものではありません。正確な内容に関しては原文を参照ください。
はじめに
このガイドでは、レイクハウスアーキテクチャのデータ品質標準に準拠するスケーラブル、信頼性の高いデータパイプラインを開発するために、どのようにDelta Live Tablesを活用するのかを説明します。Delta Live Tablesは現在ゲーテッドパブリックプレビューです。Delta Live Tablesのゲーテッドパブリックプレビューを利用し、最新のアップデートを受け取るには、こちらからリクエストを送信するか、アカウントチームにコンタクトしてください。
一般的なシナリオの説明からこのガイドをスタートさせてください。S3、ADLS、GCSのようなクラウドオブジェクトストレージにおける様々なOLTPシステムからデータを取得すると考えます。いくつかのデータセットは定期的にアップデートされ、いくつかはソースシステムの履歴スナップショットとなっています。データの利用者に対する一般的な理解はしており、データの品質を生データ、クレンジングされたデータ、集計されたデータに分類するレイクハウスアーキテクチャに従って準備を進めます。
これらのゴールドテーブルのそれぞれは、BIレポーティングから機械学習モデルのトレーニングに至る様々なコンシューマーで活用され、ソースからゴールドレイヤーに至るデータのジャーニーには、データエンジニアとして注意すべきいくつかの要件が存在します。
- レーテンシー: 新規データを取り込む際、5秒以内にシルバーテーブルに現れる必要があります。
- コスト: これらのアップデートをサポートするために、24/7で固定のキャパシティのクラスターを実行することはできません。
- 精度: リアルタイムのソースで到着が遅れたデータをどれだけ考慮すべきか?
ざっと見る限り、上述したリファレンスパイプラインでこれらの要件の多くは容易に満たせるように見えます。しかし、レイクハウスのパイプラインが意図的にエレガントかつシンプルである一方、実際のところ、毎回分かりやすい直線的なフローを取り扱える訳ではありません。実際には以下のようなものになるでしょう。
新たな洞察で強化するために、追加のデータソースで分析環境を増強し、スケールし始めると、ETLの複雑性は指数関数的に増加し、以下の課題によってこれらのパイプラインは脆弱なものとなります。
- テーブル間の依存関係が不明瞭であるため、エラーハンドリングとリカバリーは手間がかかります
- 制約の強制、モニタリングが主導のプロセスであることによる貧弱なデータ品質
- データリネージュが追跡できない、あるいは、実装に多大な工数を要します
- 個々のバッチ、ストリームレベルでのきめ細かい開発可能性が実現できません
- 統合されたパイプラインでのバッチとストリーミングの両方を考慮することが困難です
Sparkは単一のAPIでバッチとストリーミングのパラダイムに対応でき、Delta Lakeによって単一のデータセットに対するバッチとストリーミングを同時に処理することができ、2階層あるいはLambdaアーキテクチャで必要とされるトレードオフや再処理を不要にします。データセット間の個々のホップとしてストリームとバッチジョブを結合するETLプロセスにおいては特に、ストリームの実装、モニタリングに必要となるさらに多くの作業が必要となります。
宣言型ETL
ETLパイプラインをプログラミングする際、データ変換は多くのケースで「手続き型」として実行されます。これは、ETLエンジンで表現されるデータに対して、一連の計算ステップとしてアクションが実行されることを意味します。多くの場合、AirflowやAzure Data Factoryのようなオーケストレーションツールを使ったとしても、手続きのロジックを含むジョブとして起動されます。オーケストレータはジョブ間の依存関係に注意する必要がある場合がありますが、これらはETLの変換処理やビジネスロジックに対しては不透明なものとなります。
一方で、宣言型ETLでは、明示的に結果に至るために実行すべきステップを順序立てて一覧にすることなしに、ユーザーは期待する結果を記述します。「宣言的」とは、期待するゴールが「何」であるのかにフォーカスしており、DLT(Delta Live Tables)のようなインテリジェントなエンジンが、これらのプロセスを実行するために計算フレームワークが「どのように」処理を行うのかを導出することを意味します。
手続き型 vs 宣言型のETL定義の対比を、ステップバイステップでの運転指示を与えることと、都市の地図、交通量フロー情報を含むGPSを渡すことのように感じるかもしれません。
運転指示は運転手に対して目的地に到達するためのステップを指示しますが、ETA(到着予定時刻)は指示せず、彼らは途中でどの地域を通過するのかを知りません。加えて、ルートの途中で回り道が必要となった場合、ステップバイステップの指示は役に立たなくなりますが、地図とGPSがあれば回り道の経路を再設定することができます。
このメタファーにおいては、地図はお使いのDLTパイプラインとなります。DLTエンジンは、地図を解釈し、最適ルートを決定し、ETAのようなメトリクスを提供するGPSとなります。ルートの途中にある近隣情報に関する詳細は、データリネージュのようなものであり、事故(あるいはバグ)を回避するために回り道を見つけ出す能力は、依存関係の解決やDLTの宣言型の特性によるモジュール性によるものとなります。
はじめてのパイプライン
このガイドにおいては、これらの課題に苦しんでいるパイプラインを実装し、DLTの宣言型開発パラダイムによって、どのようにレイクハウスにおいてシンプルなETL開発、高品質、リネージュ、観察可能性を実現するのかを説明するために、本パイプラインを使用します。
すぐにスタートできるように、最終のパイプラインの結果をDelta Live Tables Notebooks repoに格納しています。こちらのSQLノートブックを参考のためにお使いのDatabricksデプロイメント環境にコピーするか、このままガイドを読み進めることも可能です。
このガイドでは、SQLのパイプラインにフォーカスしますが、同じパイプラインをPythonで実行することも可能です。こちらのノートブックを参照ください。
要件
このガイドを最大限に活用するには、以下の要件を満たす必要があります。
- SQL
- ETLパイプラインの開発、ビッグデータシステムの経験
- Databricksのインタラクティブノートブックとクラスター
- Databricksワークスペースで、新規クラスターの作成、ジョブの実行、外部クラウドストレージあるいはDBFSへのデータを保存できる権限
データセット
初めてのパイプラインにおいては、全てのワークスペースに準備されているdatabricks-datasetsにおけるretail-orgデータセットを使用します。Delta Live Tablesは、レイクハウスにおけるブロンズテーブル(生データ)の細かい部分をハンドリングするためのテクノロジーを。クラウドオブジェクトストレージからインクリメンタルにデータをロードするAuto Loaderを使用します。
ブロンズデータセット:Cloud Filesを用いたデータセットの取り込み
ブロンズデータセットは、最も生の品質を表現します。多くの場合、クレンジングしたデータを検証し、通常はレポートに使用しないフィールドにアクセスできるように、あるいは、完全に新規のパイプラインを作れるようにするために、手が加えられていないデータソースを作成するために、コスト効率の高いクラウドデータストレージを活用してオリジナルからの変更を最小限にするようにします。このステージにおける一般的なパターンは、クラウドストレージのある場所から継続的に新規データを取り込むというものです。
DLTにおける「インクリメンタルアップデート」、「連続プロセッシング」対「ストリーミング」
これらの用語のいくつかは、一般的には同義で用いられますが、DLTにおいては異なる意味となります。Sparkの構造化ストリーミングの経験がある読者は、用語が多すぎると感じるかもしれません。ここでは、これらの用語の違いを説明させてください。
- ストリーミングは、終わりがないものとしてデータセットを取り扱う処理のパラダイムです。
- インクリメンタルとは、最終データになされる変更を最小にするアップデートのパターンです。
- **連続(continuous)**は、任意のタイミングで停止されるまで処理を続けるパイプラインのことを意味し、パイプラインがスタートした際のソースデータの状態に基づいて停止されるパイプラインの対義語となります。
Sparkの構造化ストリーミングのようなストリーム処理フレームワークとDLTのインクリメンタルデータセットの間で重複する部分があることに気づくかもしれません。実際のところ、DLTのインクリメンタルデータセットは、Sparkの構造化ストリーミングの基本部分と、Deltaのトランザクションログを活用していますが、多くの複雑性を抽象化しており、開発者がシステムレベルの作業ではなく、処理要件に応えることに集中できるようになっています。
本ガイドのゴールドセクションでは、DLTのインクリメンタルデータセットとDLTの連続モードがどのように動作するのかを説明します。
警告: 連続(continuous)という用語は、単一レコードから構成されるマイクロバッチにおけるSpark構造化ストリーミングにおける実験段階のトリガーモードでも使用されています。これはDLTの「continuous」と定義が異なります。
例として、取り込みを行うブロンズテーブルの一つを見てみましょう。
CREATE INCREMENTAL LIVE TABLE sales_orders_raw
COMMENT "The raw sales orders, ingested from /databricks-datasets."
TBLPROPERTIES ("quality" = "bronze")
AS
SELECT * FROM cloud_files
("/databricks-datasets/retail-org/sales_orders/",
"json", map("cloudFiles.inferColumnTypes", "true"));
jsonファイルからsales_orders_rawというインクリメンタルライブテーブルを作成するために、このSQL文ではAuto Loaderを使用しています。
cloud_files:
がAuto Loaderを呼び出し、パラメーターとしてクラウドストレージのパスとフォーマットを受け取ります(DLT外でのcloudFilesの呼び出しとAPIが若干異なることに注意してください)。
次に、クラウドオブジェクトストレージからデータを取り込むためにパイプラインを作成しましょう。
ワークスペースにアクセスします。
-
最初のDLTパイプラインのノートブックを作成します。
-
dlt_retail_sales_pipeline
のようなDLTパイプラインに対応する新規ノートブックを作成します。 -
最初のセルに以下のコードを貼り付けます。
SQL
CREATE INCREMENTAL LIVE TABLE customers
COMMENT "The customers buying finished products,
ingested from /databricks-datasets."
TBLPROPERTIES ("quality" = "mapping")
AS SELECT * FROM cloud_files("/databricks-datasets/retail-org/customers/", "csv");
CREATE INCREMENTAL LIVE TABLE sales_orders_raw
COMMENT "The raw sales orders, ingested from /databricks-datasets."
TBLPROPERTIES ("quality" = "bronze")
AS
SELECT * FROM cloud_files("/databricks-datasets/retail-org/sales_orders/",
"json", map("cloudFiles.inferColumnTypes", "true"));
```
**注意:** パイプラインノートブック
DLTのパイプラインノートブックは、標準的なDatabricksノートブックを使用していますが特殊なものです。現在は、パイプラインノートブックをクラスターにアタッチすることはできますが、アタッチされたクラスターはパイプラインを実行するために決してDLTでは使用されません。ベストプラクティスとしては、パイプラインノートブックをデタッチの状態とし、開発中においては別のノートブックで任意のコマンドを実行することをお勧めします。アタッチされたクラスターにパイプラインノートブックを実行すると、以下のようなメッセージが表示されます。
-
新規タブあるいは新規ウィンドウでJobsを開き、
Delta Live Tables
を選択します。- Delta Live Tablesが表示されない場合、お使いのアカウントで有効化されていない可能性があります。Delta Live Tablesを使用するにはサインアップページを使用してください。
-
新規パイプラインを作成するためにCreate Pipelineを選択します。
-
「Sales Order Pipeline」のような名前を指定します。
-
ステップ2で作成したノートブックをNotebook Pathに指定します。これは必須のステップとなりますが、将来的には非ノートブックのライブラリを参照できるように修正される可能性があります。
-
Targetはオプションですが、パイプラインの結果データに他のメンバーがアクセスできるようにtargetデータベースを指定できるので設定をお勧めします。
-
Storage Locationはオプションですが推奨となります。設定済みの外部blobストレージを指定することも可能です。DLTはここにパイプラインのデータセットとメタデータログを生成します。
ティップス: ストレージが指定されない場合、DLTパイプラインで生成される全てのデータとログはDLTによって作成されるDBFSルートストレージ上のパスに格納されます。このパスは、後でEdit SettingのJSONファイルで確認することができます。外部(非DBFSルート)にデータとログを格納するには、DLTパイプラインのStorage Locationを指定する必要があります。
-
パイプラインモードをTriggeredに設定します。
-
Cluster sizeにワーカー数の最小数と最大数を指定します。
-
Startをクリックします。
-
これで初めてのパイプラインを作成できました!
パイプラインのログ
グラフの下にあるセクションにパイプライン実行ログが表示されます。以下に例を示します。
最初の取り込みコードの説明
アイコンはDLTデータセットを表現し、この場合テーブルとなります。これら2つのテーブルをブロンズテーブルと考えます。特に、これらはcloud_files
関数を用いてAuto Loader機能でデータを取り込むインクリメンタルなライブテーブルとなります。
DLTにおけるビューとテーブル
DLTにおいては、ビューはSQLにおける一時ビューと同じようなもので、いくつかの計算処理におけるエイリアスとなります。ビューを用いることで、複雑なクエリーを小規模あるいは理解しやすいクエリーに分割することができます。また、ビューを用いることで、既存の変換処理を一つ以上のテーブルのソースとして再利用することができます。ビューはパイプライン内でのみ利用でき、インタラクティブなクエリーを実行することはできません。
DLTにおいては、テーブルは従来のマテリアライズドビューと同じようなものです。Delta Live Tablesランタイムは自動でDeltaフォーマットのテーブルを作成し、これらのテーブルがテーブルを作成したクエリーの最終結果で更新されることを保証します。
標準的なDeltaテーブルと同じように、コンシューマーはこれらのテーブルやビューをデータレイクハウスから読み取ることができます(例:PythonにおけるデータサイエンスやSQLによるレポート目的)が、DLTエンジンによってこれらのテーブルは更新・管理されます。詳細に関してはこちらのセクションを参照ください。
シルバーデータセット:エクスペクテーションと高品質データ
本セクションでは、以下のDAGで表現されるエンドツーエンドのパイプラインの開発を皆様にお任せします。すでにブロンズデータセットを作成しており、次に2020年のCIDRデータベースカンファレンスで公開されたレイクハウスアーキテクチャで説明されているように、シルバー、ゴールドを作成し、新たなDLTのコンセプトをご説明するためにそれぞれのレイヤーを使用します。
シルバーレイヤーは高品質、多様かつアクセス可能なデータセットの全てに関するものとなります。これらは、低レーテンシーにおけるプロダクションレポートの提供のような特定のユースケースで使用されないかもしれませんが、データサイエンティストやアナリストが容易かつ自信を持ってこれらのテーブルを利用して、前処理、探索的分析、特徴量エンジニアリングを行い、残りの時間を機械学習、洞察の抽出に費やせるように、これらのテーブルはクレンジング、変換、整理されます。
これらのコンシューマーにおける大きな生産性の阻害要因は、データアクセスや前処理だけではなく、使用しているデータの品質に対する信頼性です。このため、これらのデータセットが特定の品質レベルに準拠し、明確にデータセットを説明できるようにDLTを使用します。データのコンシューマーと意思決定者は、制約とコメントを適切に利用することで得られる結果のカタログと品質モニタリングを活用することができます。
-
パイプラインノートブックを開き、新規セルを作成します。
-
新規セルに以下のコードを貼り付けます。
SQL
CREATE INCREMENTAL LIVE TABLE sales_orders_cleaned(
CONSTRAINT valid_order_number EXPECT (order_number IS NOT NULL)
ON VIOLATION DROP ROW
)
PARTITIONED BY (order_date)
COMMENT "The cleaned sales orders with valid order_number(s)
and partitioned by order_datetime."
TBLPROPERTIES ("quality" = "silver")
AS
SELECT f.customer_id, f.customer_name, f.number_of_line_items,
TIMESTAMP(from_unixtime((cast(f.order_datetime as long)))) as order_datetime,
DATE(from_unixtime((cast(f.order_datetime as long)))) as order_date,
f.order_number, f.ordered_products, c.state, c.city, c.lon, c.lat,
c.units_purchased, c.loyalty_segment
FROM STREAM(LIVE.sales_orders_raw) f
LEFT JOIN LIVE.customers c
ON c.customer_id = f.customer_id
AND c.customer_name = f.customer_name
1. 左のナビゲーションバーでJobsを選択し、「Delta Live Tables」を選択し、以前のステップで作成したパイプライン「Sales Order Pipeline」に戻ります。
1. Start/Stopのトグルの隣のドロップダウンを開き、[Full Refresh](https://qiita.com/taka_yayoi/items/6726ad1edfa92d5cd0e9#%E3%83%95%E3%83%AB%E3%83%AA%E3%83%95%E3%83%AC%E3%83%83%E3%82%B7%E3%83%A5%E3%81%AE%E5%AE%9F%E8%A1%8C)を選択します。
`Constraint`: 制約を用いることでデータ品質に対するエクスペクテーションを定義することができます。Sparkのフィルター述語で解決できる文と失敗時のアクションを指定することができます。アクションには、保持、削除、失敗、検疫を指定することができます。詳細に関しては[こちら](https://qiita.com/taka_yayoi/items/6726ad1edfa92d5cd0e9#%E3%83%87%E3%83%BC%E3%82%BF%E5%93%81%E8%B3%AA%E5%88%B6%E7%B4%84%E3%81%AE%E5%AE%9A%E7%BE%A9)を参照ください。整理された品質モニタリングのために全ての制約は記録されます。
`Tblproperties`: Delta Lakeのプロパティ、あるいはDLTパイプラインのプロパティ、任意のプロパティとなりうるキーバリューペアのリストです。任意のtblpropertiesはタグのように使用することができ、データのカタログ作成に使用することができます。この例では、"quality": "silver"はタグとして機能する任意のプロパティです。
`Comment`: テーブルの目的を説明する簡潔な文章であり、将来的にはデータカタログに使用される予定です。
## ゴールドデータセット:コンプリート vs インクリメンタル / 連続 vs トリガー
多くの集計処理はインクリメンタルの処理はできず、ブロンズ、シルバーレイヤーなど集計の上流では新規データがインクリメンタルに処理されていたとしても、再計算が必要となります。しかし、集計されていない「ファスト」データやリアルタイムデータへのアクセス手段を持つことには重大な価値があります。ファスト、スローなデータを処理するために複雑な二層インフラストラクチャを必要とする従来のラムダアーキテクチャと異なり、レイクハウスアーキテクチャでは、リアルタイムかつインクリメンタルな「ファスト」ブロンズ、シルバーレイヤーと、バッチで更新されるゴールドレイヤーの両方を含む単一のパイプラインを実現することができます(これは、Delta Lakeストレージの強力な一貫性保証によって実現されます)。
実際、このパターンはストリームとバッチを別々にデプロイし、維持する必要がある手続き型ETLでは困難となる可能性があります。これを解決するために、DLTでは、パイプラインの他の部分の変更を最小限にしつつも、パイプラインのそれぞれのデータセットがコンプリートあるいはインクリメンタルかを選択することができます。これによって、ブロンズ、シルバーのリアルタイムデータとゴールドの集計レイヤーの結合を含むパイプラインを容易にスケールさせることができます。
count、min、max、sumのようなSparkの集計処理のいくつかはインクリメンタルに書影を行うことができ、この場合ゴールドデータセットをインクリメンタルと宣言することには合理性があります。しかし、シンプルなcountやsumにおいても、非効率的になる場合があり、複数のグルーピングを行なっている場合(例:GROUP BY col1, col2, col3)には推奨しません。
今回のゴールドテーブルのケースでは、シルバーテーブルをと仕事に集計することでコンプリートなゴールドテーブルを作成します。
1. パイプラインノートブックを開き、新規セルを作成します。
1. 以下のコードを新規セルに貼り付けます。
```sql:SQL
CREATE LIVE TABLE sales_order_in_la
COMMENT "Sales orders in LA."
TBLPROPERTIES ("quality" = "gold")
AS
SELECT city, order_date, customer_id, customer_name, ordered_products_explode.curr,
SUM(ordered_products_explode.price) as sales,
SUM(ordered_products_explode.qty) as qantity,
COUNT(ordered_products_explode.id) as product_count
FROM (
SELECT city, DATE(order_datetime) as order_date, customer_id, customer_name,
EXPLODE(ordered_products) as ordered_products_explode
FROM LIVE.sales_orders_cleaned
WHERE city = 'Los Angeles'
)
GROUP BY order_date, city, customer_id, customer_name, ordered_products_explode.curr;
```sql:SQL
CREATE LIVE TABLE sales_order_in_chicago
COMMENT "Sales orders in Chicago."
TBLPROPERTIES ("quality" = "gold")
AS
SELECT city, order_date, customer_id, customer_name,
ordered_products_explode.curr,
SUM(ordered_products_explode.price) as sales,
SUM(ordered_products_explode.qty) as qantity,
COUNT(ordered_products_explode.id) as product_count
FROM (
SELECT city, DATE(order_datetime) as order_date, customer_id, customer_name,
EXPLODE(ordered_products) as ordered_products_explode
FROM LIVE.sales_orders_cleaned
WHERE city = 'Chicago'
)
GROUP BY order_date, city, customer_id, customer_name, ordered_products_explode.curr;
```
- 左のナビゲーションバーでJobsを選択し、「Delta Live Tables」を選択し、以前のステップで作成したパイプライン「Sales Order Pipeline」に戻ります。
- Start/Stopのトグルの隣のドロップダウンを開き、Full Refreshを選択します。
連続 vs トリガーパイプラインモード
DLTにおいては、それぞれのデータセットはIncrementalあるいはCompleteとなりますが、パイプライン全体はTriggeredかContinuousとなります。連続パイプラインが起動されると、インフラストラクチャを準備し、手動あるいはAPIでパイプラインが停止されるまで新規データの取り込みを継続します。トリガーパイプラインはソースの新規データ全てを処理し、自動でインフラストラクチャを停止します。トリガーパイプラインは通常、実運用におけるオーケストレータ、あるいはDatabricksのマルチタスクジョブで、スケジュール処理として実行されます。
TriggeredモードかContinuousモードかを切り替えるためには、パイプラインを開き「Edit Settings」を選択します。ContinuousはJSONでブール値として指定します。 "continuous": "false"を設定することは、パイプラインをTriggeredモードに設定することと等価です。
これによって、コードを大幅にリファクタリングすることなしに、継続的処理パラダイムに徐々に進化させる柔軟性を獲得することができます。これは、継続稼働するクラウドインフラストラクチャの高価なコストを引き起こすことなしに、リアルタイムの洞察の価値を理解し始めた組織においては共通するパターンです。経験のあるSparkエンジニアは、DLTの機能を理解するために以下の表を利用することができます。
読み取り元 | 書き込み先 | Continuousモード | Triggeredモード |
---|---|---|---|
Complete | Complete | 事前定義された間隔で再処理 | 単一の再処理(削除と置換) |
Complete | Incremental | 不可 | 不可 |
Incremental | Complete | 事前定義された間隔で再処理 | マテリアライズされたストリーム結果の再処理 |
Incremental | Incremental | デフォルトトリガーを用いたストリーミング | Trigger.once()のストリーミング |
本格運用
これで我々はパイプラインを定義しました。以下のステップを踏むことで、本格運用に移行することができます。
- コンシューマーがパイプラインのデータセットを容易にクエリーできるようにターゲットデータベースを設定します。
- パイプラインをproductionモードに設定します。
- Triggeredモードの場合、マルチタスクジョブを用いてパイプライン処理をスケジューリングします。
パイプラインの観測可能性および品質モニタリング
イベントログ
DLTは、モニタリング、リネージュ、データ品質レポートで使用できるように、パイプラインのStorage Location上の事前に定義されたDelta Lakeテーブルに対して全てのパイプラインログを出力します。イベントログを調査するために、一般的なログ分析ノートブックをインポートするか、{{your storage location}}/system/events
に格納されたDeltaテーブルにアクセスするためにdbutilsを使用することもできます。
ログテーブルのdetails
カラムが最も活用できる情報となります。DLTがログを出力する幾つかのタイプのアクションが存在し、そのイベントの適切なフィールドはdetails
カラムに存在します。
-
user_action
: パイプラインの作成などのアクションがとられた際に生じるイベント -
flow_definition
: パイプラインのデプロイ、アップデート、リネージュの保持、スキーマ、実行計画情報を含むイベント-
output_dataset
とinput_datasets
: 上流のテーブル/ビュー、出力のテーブル/ビュー -
flow_type
: コンプリートか追加のフローか -
explain_text
: Sparkのexplainの結果
-
-
flow_progress
: データフローがデータバッチの処理開始あるいは終了した際に生じるイベント-
metrics
: 現在はnum_output_rows
を含む -
data_quality
: 特定のデータセットに対するデータ品質ルールの適用結果を含む配列data_quality
-
expectations
name, dataset, passed_records, failed_records
-
データ品質モニタリング(Databricks SQLが必要)
DLTのログはDeltaテーブルとして出力され、ログにはデータエクスペクテーションのメトリクスが含まれるので、お使いのBIツールでデータ品質を監視するためのレポートを容易に作成することができます。DeltaとDatabricksプラットフォームと密に連携されており、容易に管理できる計算エンドポイントを通じた高速なクエリーを実現するDatabricks SQLの活用をお勧めします。
Databricks SQLを用いてデータ品質レポートを作成するには、以下のステップを実行します。
-
パイプラインにアクセスしてEdit Settingsを選択し、お使いのパイプラインの
storage location
をメモします。 -
以下の例とステップ1の
storage location
を用いてログテーブルをメタストアに登録します。SQL
CREATE TABLE {{my_pipeline_logs}}
USING delta
LOCATION '{{pipeline storage location}}/system/events'
1. 左上のドロップダウンから`SQL`ワークスペースに切り替えます。
1. 左のナビゲーションバーから`Queries`を選択します。
1. `Create Query`を選択します。
1. 以下のSQLクエリーをコピーし、`{{my_pipeline_logs}}`をステップ2で作成したテーブル名で置換します。
```sql:SQL
WITH all_expectations AS (
SELECT
explode(
from_json(
details:flow_progress:data_quality:expectations,
schema_of_json("[{'name':'str', 'dataset':'str',
'passed_records':'int', 'failed_records':'int'}]")
)
) AS expectation
FROM {{my_pipeline_logs}}
WHERE details:flow_progress.metrics IS NOT NULL
)
SELECT expectation_name, X_Axis, SUM(Y_Axis) AS Y_Axis
FROM (
SELECT expectation.name AS expectation_name, 'Passed'
AS X_Axis, expectation.passed_records AS Y_Axis
FROM all_expectations
UNION ALL
SELECT expectation.name AS expectation_name, 'Failed'
AS X_Axis, expectation.failed_records AS Y_Axis
FROM all_expectations
)
GROUP BY expectation_name, X_Axis
-
Add Visualization
を選択します。 -
ビジュアライゼーションのタイプで
Chart
を選択し、チャートタイプとしてPie
を選択します。X、Yカラムを設定し、groupingをexpectation_name
に指定します。
Databricks SQLで別のチャートや異なるビジュアライゼーションを試すことができます。通常は、異なる品質モニタリングの目的においては、チャートに対してX_axis、Y_axisを指定し、expectation_nameによるグルーピングを指定します。
まとめ
これで、最初のDelta Live Tablesパイプラインを体験し、途中でキーとなるコンセプトを学んだことになります。皆様がパイプラインを作成するのを楽しみにしています!Delta Live Tablesの詳細に関しては、DLTのドキュメント、デモ動画、ノートブックを確認してみてください!