1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

Delta Live Tablesのイベントログ

Last updated at Posted at 2022-03-10

Delta Live Tables event log | Databricks on AWS [2022/1/13時点]の翻訳です。

本書は抄訳であり内容の正確性を保証するものではありません。正確な内容に関しては原文を参照ください。

プレビュー
本機能はパブリックプレビューです。サインアップにはDelta Live Tablesへのアクセスをリクエストしてください。

それぞれのDelta Live Tablesのパイプラインごとにイベントログが作成され、メンテナンスされます。イベントログには、監査ログ、データ品質チェック、パイプラインの進捗状況、データリネージュを含むパイプラインに関する全ての情報が含まれています。このイベントログを用いて、お使いのパイプラインの状態を追跡、理解、モニタリングすることができます。

それぞれのパイプラインに対するイベントログは、DBFS上のDeltaテーブルに格納されます。Delta Live Tablesのユーザーインタフェース、Delta Live Tables API、あるいはDeltaテーブルを直接クエリーすることでイベントログのエントリーを参照することができます。

サンプルノートブックには、本書で論じられるクエリーが含まれており、Delta Live Tablesイベントログの探索に活用することができます。

要件

本文のサンプルでは、Databricksランタイム8.1以降で利用できるJSON SQL関数を使用しています。

イベントログの格納場所

イベントログはstorageの配下の/system/eventsに格納されます。例えば、パイプラインのstorage設定を/Users/username/dataとした場合、イベントログはDBFSの/Users/username/data/system/eventsに格納されます。

storageを設定しない場合、デフォルトのイベントログの格納場所は、DBFSの/pipelines/<pipeline-id>/system/eventsとなります。例えば、パイプラインのIDが91de5e48-35ed-11ec-8d3d-0242ac130003の場合、/pipelines/91de5e48-35ed-11ec-8d3d-0242ac130003/system/eventsに格納されます。

イベントログのスキーマ

以下の表では、イベントログのスキーマを説明しています。これらのフィールドのいくつかは、クエリーを実行するためにパーシングが必要とあんるJSONドキュメントを含んでいます。例えば、データ品質メトリクスの分析には、details JSONドキュメントのフィールドのパーシングが必要となります。本書におけるサンプルでは、必要なパーシングを行うためにPython関数を用いてデモを行なっています。

フィールド 説明
id パイプラインのID
sequence イベントを識別し並び替えるためのメタデータを含むJSONドキュメント
origin イベントの起源を示すメタデータを含むJSONドキュメント。例えば、クラウドプロバイダー、リージョン、ユーザーID、パイプラインID
timestamp イベントが記録された時刻
message イベントを説明する判読可能なメッセージ
level イベントタイプ。例えば、INFO、WARN、ERROR、METRICS
error エラーが発生した場合、エラーを説明する詳細メッセージ
details イベントの詳細を含むJSONドキュメント。イベントを分析する際に用いられる主要なフィールド
event_type イベントタイプ

イベントログに対するクエリー

イベントログに対するクエリーをシンプルにするためにビューを作成することができます。以下の例では、event_log_viewというビューを作成しています。以降の例では、イベントログのレコードにクエリーを実行するために、このビューを使用します。

Python
event_log = spark.read.format('delta').load(event_log_path)
event_log.createOrReplaceTempView("event_log_raw")

event_log_pathイベントログの格納場所で置き換えてください。

パイプライン実行のそれぞれのインスタンスはupdate(アップデート)と呼ばれます。以下のクエリーのいくつかにおいては、最新のアップデートに関する情報を抽出しています。最新のアップデートのIDを検索し、変数latest_update_idに保存するために、以下のコマンドを実行します。

Python
latest_update_id = spark.sql("SELECT origin.update_id FROM event_log_raw WHERE event_type = 'create_update' ORDER BY timestamp DESC LIMIT 1").collect()[0].update_id
spark.conf.set('latest_update.id', latest_update_id)

監査ログ

ユーザーのアクションのようなイベントを監査するためにイベントログを活用することができます。イベントには、user_actionというイベントタイプを持つユーザーのアクションが含まれています。アクションに関する情報はdetailsフィールドのuser_actionオブジェクトに格納されています。ユーザーイベントの監査ログを構成するために、以下のクエリーを使用します。

SQL
SELECT timestamp, details:user_action:action, details:user_action:user_name FROM event_log_raw WHERE event_type = 'user_action'

Screen Shot 2022-03-10 at 9.39.15.png

リネージュ

Delta Live Tablesのユーザーインタフェースでパイプラインのグラフ構造のビジュアライゼーションを参照することができます。また、企業におけるコンプライアンスに対するレポートの作成やデータの依存関係の追跡を行うといったタスクを実行するために、この情報をプログラムから抽出することができます。リネージュに関する情報を含むイベントは、イベントタイプflow_definitionとなっています。リネージュ情報はdetailsフィールドのflow_definitionに格納されています。flow_definitionオブジェクトのフィールドには、データセット間の関係を推定するために必要な情報が含まれています。

SQL
SELECT details:flow_definition.output_dataset, details:flow_definition.input_datasets FROM event_log_raw WHERE event_type = 'flow_definition' AND origin.update_id = '${latest_update.id}'

Screen Shot 2022-03-10 at 9.43.01.png

データ品質

イベントログは、お使いのパイプラインで定義されたエクスペクテーションに基づくデータ品質メトリクスも格納しています。データ品質に関する情報を含むイベントは、イベントタイプflow_progressとなります。データセットにエクスペクテーションが定義された際、データ品質メトリクスはflow_progress.data_quality.expectationsオブジェクトの詳細フィールドに格納されます。以下のサンプルクエリーは、最新のパイプラインアップデートに対するデータ品質メトリクスを取得しています。

SQL
SELECT
  row_expectations.dataset as dataset,
  row_expectations.name as expectation,
  SUM(row_expectations.passed_records) as passing_records,
  SUM(row_expectations.failed_records) as failing_records
FROM
  (
    SELECT
      explode(
        from_json(
          details :flow_progress :data_quality :expectations,
          "array<struct<name: string, dataset: string, passed_records: int, failed_records: int>>"
        )
      ) row_expectations
    FROM
      event_log_raw
    WHERE
      event_type = 'flow_progress'
      AND origin.update_id = '${latest_update.id}'
  )
GROUP BY
  row_expectations.dataset,
  row_expectations.name

サンプルノートブック

Delta Live Tablesイベントログに対するクエリー

Databricks 無料トライアル

Databricks 無料トライアル

1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?