Delta Live Tables event log | Databricks on AWS [2022/1/13時点]の翻訳です。
本書は抄訳であり内容の正確性を保証するものではありません。正確な内容に関しては原文を参照ください。
プレビュー
本機能はパブリックプレビューです。サインアップにはDelta Live Tablesへのアクセスをリクエストしてください。
それぞれのDelta Live Tablesのパイプラインごとにイベントログが作成され、メンテナンスされます。イベントログには、監査ログ、データ品質チェック、パイプラインの進捗状況、データリネージュを含むパイプラインに関する全ての情報が含まれています。このイベントログを用いて、お使いのパイプラインの状態を追跡、理解、モニタリングすることができます。
それぞれのパイプラインに対するイベントログは、DBFS上のDeltaテーブルに格納されます。Delta Live Tablesのユーザーインタフェース、Delta Live Tables API、あるいはDeltaテーブルを直接クエリーすることでイベントログのエントリーを参照することができます。
サンプルノートブックには、本書で論じられるクエリーが含まれており、Delta Live Tablesイベントログの探索に活用することができます。
要件
本文のサンプルでは、Databricksランタイム8.1以降で利用できるJSON SQL関数を使用しています。
イベントログの格納場所
イベントログはstorage
の配下の/system/events
に格納されます。例えば、パイプラインのstorage
設定を/Users/username/data
とした場合、イベントログはDBFSの/Users/username/data/system/events
に格納されます。
storage
を設定しない場合、デフォルトのイベントログの格納場所は、DBFSの/pipelines/<pipeline-id>/system/events
となります。例えば、パイプラインのIDが91de5e48-35ed-11ec-8d3d-0242ac130003
の場合、/pipelines/91de5e48-35ed-11ec-8d3d-0242ac130003/system/events
に格納されます。
イベントログのスキーマ
以下の表では、イベントログのスキーマを説明しています。これらのフィールドのいくつかは、クエリーを実行するためにパーシングが必要とあんるJSONドキュメントを含んでいます。例えば、データ品質メトリクスの分析には、details
JSONドキュメントのフィールドのパーシングが必要となります。本書におけるサンプルでは、必要なパーシングを行うためにPython関数を用いてデモを行なっています。
フィールド | 説明 |
---|---|
id | パイプラインのID |
sequence | イベントを識別し並び替えるためのメタデータを含むJSONドキュメント |
origin | イベントの起源を示すメタデータを含むJSONドキュメント。例えば、クラウドプロバイダー、リージョン、ユーザーID、パイプラインID |
timestamp | イベントが記録された時刻 |
message | イベントを説明する判読可能なメッセージ |
level | イベントタイプ。例えば、INFO、WARN、ERROR、METRICS |
error | エラーが発生した場合、エラーを説明する詳細メッセージ |
details | イベントの詳細を含むJSONドキュメント。イベントを分析する際に用いられる主要なフィールド |
event_type | イベントタイプ |
イベントログに対するクエリー
イベントログに対するクエリーをシンプルにするためにビューを作成することができます。以下の例では、event_log_view
というビューを作成しています。以降の例では、イベントログのレコードにクエリーを実行するために、このビューを使用します。
event_log = spark.read.format('delta').load(event_log_path)
event_log.createOrReplaceTempView("event_log_raw")
event_log_path
をイベントログの格納場所で置き換えてください。
パイプライン実行のそれぞれのインスタンスはupdate(アップデート)
と呼ばれます。以下のクエリーのいくつかにおいては、最新のアップデートに関する情報を抽出しています。最新のアップデートのIDを検索し、変数latest_update_id
に保存するために、以下のコマンドを実行します。
latest_update_id = spark.sql("SELECT origin.update_id FROM event_log_raw WHERE event_type = 'create_update' ORDER BY timestamp DESC LIMIT 1").collect()[0].update_id
spark.conf.set('latest_update.id', latest_update_id)
監査ログ
ユーザーのアクションのようなイベントを監査するためにイベントログを活用することができます。イベントには、user_action
というイベントタイプを持つユーザーのアクションが含まれています。アクションに関する情報はdetails
フィールドのuser_action
オブジェクトに格納されています。ユーザーイベントの監査ログを構成するために、以下のクエリーを使用します。
SELECT timestamp, details:user_action:action, details:user_action:user_name FROM event_log_raw WHERE event_type = 'user_action'
リネージュ
Delta Live Tablesのユーザーインタフェースでパイプラインのグラフ構造のビジュアライゼーションを参照することができます。また、企業におけるコンプライアンスに対するレポートの作成やデータの依存関係の追跡を行うといったタスクを実行するために、この情報をプログラムから抽出することができます。リネージュに関する情報を含むイベントは、イベントタイプflow_definition
となっています。リネージュ情報はdetails
フィールドのflow_definition
に格納されています。flow_definition
オブジェクトのフィールドには、データセット間の関係を推定するために必要な情報が含まれています。
SELECT details:flow_definition.output_dataset, details:flow_definition.input_datasets FROM event_log_raw WHERE event_type = 'flow_definition' AND origin.update_id = '${latest_update.id}'
データ品質
イベントログは、お使いのパイプラインで定義されたエクスペクテーションに基づくデータ品質メトリクスも格納しています。データ品質に関する情報を含むイベントは、イベントタイプflow_progress
となります。データセットにエクスペクテーションが定義された際、データ品質メトリクスはflow_progress.data_quality.expectations
オブジェクトの詳細フィールドに格納されます。以下のサンプルクエリーは、最新のパイプラインアップデートに対するデータ品質メトリクスを取得しています。
SELECT
row_expectations.dataset as dataset,
row_expectations.name as expectation,
SUM(row_expectations.passed_records) as passing_records,
SUM(row_expectations.failed_records) as failing_records
FROM
(
SELECT
explode(
from_json(
details :flow_progress :data_quality :expectations,
"array<struct<name: string, dataset: string, passed_records: int, failed_records: int>>"
)
) row_expectations
FROM
event_log_raw
WHERE
event_type = 'flow_progress'
AND origin.update_id = '${latest_update.id}'
)
GROUP BY
row_expectations.dataset,
row_expectations.name
サンプルノートブック
Delta Live Tablesイベントログに対するクエリー