2
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

DatabricksAdvent Calendar 2023

Day 18

Databricksレイクハウスモニタリングによるテーブル品質の監視

Last updated at Posted at 2023-12-16

サーバレスが有効化されているリージョンではテーブルや機械学習モデルの監視を行えるレイクハウスモニタリングが利用できるようになっています。

時系列(time series)分析のサンプルノートブックをウォークスルーします。時間とともにデータが変化していくテーブルを監視するタイプです。

レイクハウスモニタリングサンプルノートブック: 時系列分析

ユーザー要件

  • Unity Catalogにアクセスできるクラスターでのコマンド実行権限が必要です。
  • 最低でも一つのカタログに対するUSE CATALOG権限が必要です。最低でも一つのスキーマに対するUSE SCHEMA権限が必要です。このノートブックではテーブルをmain.defaultスキーマに作成します。main.defaultスキーマへの権限がない場合には、権限を持つカタログとスキーマに変更してください。

システム要件:

  • ワークスペースでUnity Catalogが有効化されている必要があります。
  • Databricks Runtime 12.2LTS以降
  • シングルユーザーあるいは共有クラスター

このノートブックでは時系列モニターの作成方法を説明します。

レイクハウスモニタリングの詳細に関してはドキュメントを参照ください。(AWS|Azure)

セットアップ

  • クラスター設定の検証
  • Pythonクライアントのインストール
  • カタログ、スキーマ、テーブル名の定義
# クラスター設定のチェック。このセルが失敗する場合、ノートブックの右上のクラスターセレクターで、Databricks Runtime 12.2 LTS 以降が稼働しているクラスターを設定、選択してください。
import os

assert float(os.environ.get("DATABRICKS_RUNTIME_VERSION", 0)) >= 12.2, "Please configure your cluster to use Databricks Runtime 12.2 LTS or above."
%pip install "https://ml-team-public-read.s3.amazonaws.com/wheels/data-monitoring/a4050ef7-b183-47a1-a145-e614628e3146/databricks_lakehouse_monitoring-0.4.4-py3-none-any.whl"
# 新規にインストールしたwheelを用いて環境をリセットするためにこのステップが必要です。
dbutils.library.restartPython()
# カタログに対する`USE CATALOG`権限とスキーマに対する`USE SCHEMA`権限が必要です。
# 必要に応じて、ここでカタログとスキーマを変更して下さい。

CATALOG = "takaakiyayoi_catalog"
SCHEMA = "monitoring"

テーブル名にユーザー名を埋め込みます。

username = spark.sql("SELECT current_user()").first()["current_user()"]
username_prefixes = username.split("@")[0].split(".")
unique_suffix = "_".join([username_prefixes[0], username_prefixes[1][0:2]])
TABLE_NAME = f"{CATALOG}.{SCHEMA}.wine_ts_{unique_suffix}"
BASELINE_TABLE = f"{CATALOG}.{SCHEMA}.wine_ts_baseline_{unique_suffix}"
TIMESTAMP_COL = "timestamp"
spark.sql(f"DROP TABLE IF EXISTS {TABLE_NAME}")
spark.sql(f"DROP TABLE IF EXISTS {BASELINE_TABLE}")

ユーザージャーニー

  1. テーブルの作成: 生のデータを読み込みプライマリテーブル(監視されるテーブル)とベーステーブル(期待する品質基準を満たす既知のデータを格納)を作成。
  2. プライマリテーブルに対するモニターを作成。
  3. メトリクステーブルの調査。
  4. テーブルを変更してメトリクスを更新。メトリクステーブルの調査。
  5. [オプション] モニターの削除。

1. Unity Catalogでプライマリテーブルと(オプション)ベースラインテーブルを作成

  • これらのテーブルはUnity Catalog配下のDeltaテーブルであり、ノートブックを実行するユーザーが所有者である必要があります。
  • 監視されるテーブルは「プライマリテーブル」とも呼ばれます。
  • ベースラインテーブルは監視されるテーブルと同じスキーマを持つ必要があります。

このサンプルではwinequalityデータセットを使用します。

import pandas as pd

white_wine = pd.read_csv("/dbfs/databricks-datasets/wine-quality/winequality-white.csv", sep=";")
red_wine = pd.read_csv("/dbfs/databricks-datasets/wine-quality/winequality-red.csv", sep=";")

# カテゴリーの追加
white_wine["type"] = "white"
red_wine["type"] = "red"
data_pdf = pd.concat([white_wine, red_wine], axis=0)

# カラム名の整形
data_pdf.columns = data_pdf.columns.str.replace(" ", "_")

データを分割します。baseline_dfはベースラインテーブルです。 ts1_dfはプライマリテーブルであり、ts2_dfは模擬的に未来のデータを示すものです。

data_df = spark.createDataFrame(data_pdf)
baseline_df, ts1_df, ts2_df = data_df.randomSplit(weights=[0.20, 0.40, 0.40], seed=42)

時系列データをシミュレートするために異なるタイムスタンプを作成します。

from datetime import timedelta, datetime
from pyspark.sql import functions as F

# 異なるタイムスタンプを持つデータをシミュレート
timestamp_0 = datetime.now() # ベースラインデータ
timestamp_1 = (datetime.now() + timedelta(1)).timestamp() # 1日後
timestamp_2 = (datetime.now() + timedelta(2)).timestamp() 

baseline_df = baseline_df.withColumn("timestamp", F.lit(timestamp_0).cast("timestamp"))
ts1_df = ts1_df.withColumn("timestamp", F.lit(timestamp_1).cast("timestamp"))
ts2_df = ts2_df.withColumn("timestamp", F.lit(timestamp_2).cast("timestamp"))
baseline_df.display()

Screenshot 2023-12-16 at 16.52.11.png

Unity CatalogにベースラインのDeltaテーブルとプライマリのDeltaテーブルを作成

# Unity Catalogにベースラインテーブルを書き込み

(baseline_df
  .write
  .format("delta")
  .mode("overwrite")
  .option("overwriteSchema",True)
  .option("delta.enableChangeDataFeed", "true")
  .saveAsTable(f"{BASELINE_TABLE}")
)
# Unity Catalogにプライマリテーブルを書き込み。これが監視されるテーブルとなります。このノートブックの後半で、このテーブルに未来のタイムスタンプを持つデータを追加します。 

(ts1_df
  .write
  .format("delta")
  .mode("overwrite")
  .option("overwriteSchema",True)
  .option("delta.enableChangeDataFeed", "true")
  .saveAsTable(f"{TABLE_NAME}")
)

テーブルが作成されます。
Screenshot 2023-12-16 at 16.52.48.png

行数を確認します。

spark.sql(f"SELECT COUNT(*) FROM {TABLE_NAME}").display()

Screenshot 2023-12-16 at 16.53.04.png

  1. モニターの作成
    このノートブックでは、TimeSeriesタイプの分析を説明します。他の分析タイプに関しては、レイクハウスモニタリングのドキュメントを参照ください。(AWS|Azure)

ビジネスやユースケース観点から除外されるべきすべてのカラムを除外するようにしてください。

import databricks.lakehouse_monitoring as lm
# 分析するウィンドウのサイズ
GRANULARITIES = ["1 day"]                       

# データをスライスするエクスプレッション
SLICING_EXPRS = ["type='Red'"]  
print(f"Creating monitor for {TABLE_NAME}")

info = lm.create_monitor(
  table_name=TABLE_NAME,
  profile_type=lm.TimeSeries(
    timestamp_col=TIMESTAMP_COL,
    granularities=GRANULARITIES
  ),
  slicing_exprs=SLICING_EXPRS,
  baseline_table_name=BASELINE_TABLE,
  output_schema_name=f"{CATALOG}.{SCHEMA}"
)

カタログエクスプローラからプライマリテーブルにアクセスし、品質タブをクリックするとモニターを確認できます。
Screenshot 2023-12-16 at 16.53.30.png
Screenshot 2023-12-16 at 16.53.38.png

import time


# モニターの作成を待ちます
while info.status == lm.MonitorStatus.PENDING:
  info = lm.get_monitor(table_name=TABLE_NAME)
  time.sleep(10)

assert(info.status == lm.MonitorStatus.ACTIVE)

また、更新履歴を表示をクリックすることでメトリクスの更新処理の状態を確認できます。
Screenshot 2023-12-16 at 16.53.59.png

モニターを作成すると、自動でメトリクスの更新処理が起動します。

# 作成時にメトリックの更新が自動でトリガーされます
refreshes = lm.list_refreshes(table_name=TABLE_NAME)
assert(len(refreshes) > 0)

run_info = refreshes[0]
while run_info.state in (lm.RefreshState.PENDING, lm.RefreshState.RUNNING):
  run_info = lm.get_refresh(table_name=TABLE_NAME, refresh_id=run_info.refresh_id)
  time.sleep(30)

assert(run_info.state == lm.RefreshState.SUCCESS)

モニターを作成するとメトリクスを確認するためのダッシュボードが自動生成されます。ダッシュボードを開くにはセルのアウトプットでハイライトされているダッシュボードのリンクをクリックします。カタログエクスプローラのUIからダッシュボードに移動することもできます。
Screenshot 2023-12-16 at 17.24.27.png

なお、ダッシュボードやダッシュボードで使用されるクエリーはホームディレクトリ配下のdatabricks_lakehouse_monitoringに格納されます。
Screenshot 2023-12-16 at 16.54.28.png

ダッシュボードにはデータの統計情報などが表示されます。
Screenshot 2023-12-16 at 17.01.35.png
Screenshot 2023-12-16 at 17.01.48.png

以下のメソッドでモニターの情報を表示できます。

lm.get_monitor(table_name=TABLE_NAME)

Screenshot 2023-12-16 at 17.26.49.png

3. メトリクステーブルの調査

デフォルトではメトリクステーブルはdefaultデータベースに格納されます。

create_monitorの呼び出しで2つの新規テーブル作成されます: プロファイルメトリクステーブルとドリフトメトリクステーブルです。

これらの2つのテーブルは分析ジョブのアウトプットを記録します。このテーブルでは監視されるプライマリテーブルと同じ名前を使用し、サフィックス_profile_metrics_drift_metricsを使用します。

プロファイルメトリクステーブルのオリエンテーション

プロファイルメトリクステーブルにはサフィックス_profile_metricsが追加されます。テーブルに表示される統計情報の一覧に関しては、ドキュメントを参照ください(AWS|Azure)。

  • プライマリテーブルのすべてのカラムに対して、ベースラインテーブルとプライマリテーブルに対するサマリーの統計情報を表示します。カラムlog_typeはプライマリテーブルの統計情報を示すINPUT、ベースラインテーブルの統計情報を示すBASELINEを表示しています。プライマリテーブルのカラムはカラムcolumn_nameで識別できます。
  • TimeSeriesタイプの分析では、granularityカラムには行に対する粒度が表示されます。ベースラインテーブルの統計情報ではgranularityカラムの値はnullになります。
  • このテーブルでは、それぞれの時間ウィンドウにおけるそれぞれのスライスキーのそれぞれの値に対する統計情報とテーブル全体に対する統計情報が表示されます。テーブル全体の統計情報はslice_key = slice_value = nullで特定することができます。
  • プライマリテーブルでは、windowカラムに当該行に対応する時間ウィンドウが表示されます。ベースラインテーブルの統計情報では、windowカラムはnullになります。
  • いくつかの統計情報は特定のカラムではなくテーブル全体に対して計算されます。column_nameカラムの:tableによって、これらの統計情報を特定できます。
# プロファイルメトリクステーブルを表示
profile_table = f"{TABLE_NAME}_profile_metrics"
display(spark.sql(f"SELECT * FROM {profile_table}"))

Screenshot 2023-12-16 at 17.02.10.png

ドリフトメトリクステーブルのオリエンテーション

ドリフトメトリクステーブルにはサフィックス_drift_metricsが追加されます。テーブルに表示される統計情報の一覧に関しては、ドキュメントを参照ください(AWS|Azure)。

  • プライマリテーブルのすべてのカラムに対して、ドリフトテーブルではテーブルの現在の値と以前の分析実行時の値、ベースラインテーブルと比較した一連のメトリクスを表示します。ベースラインテーブルに対するドリフトでは、カラムdrift_typeBASELINEが表示され、以前の時間ウィンドウに対するドリフトでは、CONSECUTIVEが表示されます。プロファイルテーブルと同じように、プライマリテーブルのカラムはカラムcolumn_nameで特定できます。
    • この時点では、このモニターの最初の実行であるため、比較する以前のウィンドウがありません。このため、drift_typeCONSECUTIVEである行はありません。
  • TimeSeriesタイプの分析では、granularityカラムには行に対する粒度が表示されます。
  • このテーブルでは、それぞれの時間ウィンドウにおけるそれぞれのスライスキーのそれぞれの値に対する統計情報とテーブル全体に対する統計情報が表示されます。テーブル全体の統計情報はslice_key = slice_value = nullで特定することができます。
  • windowカラムには当該行に対応する時間ウィンドウが表示されます。window_cmpカラムには比較対象のウィンドウが表示されます。ベースラインテーブルとの比較の場合には、window_cmpnullになります。
  • いくつかの統計情報は特定のカラムではなくテーブル全体に対して計算されます。column_nameカラムの:tableによって、これらの統計情報を特定できます。
# ドリフトメトリクステーブルの表示
drift_table = f"{TABLE_NAME}_drift_metrics"
display(spark.sql(f"SELECT * FROM {drift_table}"))

Screenshot 2023-12-16 at 17.02.29.png

4. テーブルに新規データを追加し、メトリクスをリフレッシュ

テーブルに新規データを追加

以下のセルではプライマリテーブルに、シミュレートした未来のデータts2_dfを追加します。

(ts2_df
  .write.format("delta").mode("append") 
  .option("mergeSchema",True) 
  .option("delta.enableChangeDataFeed", "true") 
  .saveAsTable(f"{TABLE_NAME}")
)

行数を確認します。

spark.sql(f"SELECT COUNT(*) FROM {TABLE_NAME}").display()

Screenshot 2023-12-16 at 17.03.05.png

メトリクスのリフレッシュ

run_info = lm.run_refresh(table_name=TABLE_NAME)
while run_info.state in (lm.RefreshState.PENDING, lm.RefreshState.RUNNING):
  run_info = lm.get_refresh(table_name=TABLE_NAME, refresh_id=run_info.refresh_id)
  time.sleep(30)

assert(run_info.state == lm.RefreshState.SUCCESS)

2回目のリフレッシュが実行されます。
Screenshot 2023-12-16 at 17.03.25.png

リフレッシュが完了したら、変化を確認するためにモニタリングダッシュボードを開きます。
Screenshot 2023-12-16 at 17.12.24.png
Screenshot 2023-12-16 at 17.12.49.png

時間ウィンドウ間の変化を確認することができます。
Screenshot 2023-12-16 at 17.13.05.png
Screenshot 2023-12-16 at 17.13.59.png

[オプション] モニターの削除

モニターをクリーンアップするには以下のコードのコメントを解除してください。一つのテーブルには単一のモニターのみを追加できます。

# lm.delete_monitor(table_name=TABLE_NAME)

テーブルの統計情報をモニタリングするためには、これまでには手組みで色々なロジックを構成しないといけませんでしたが、例クハウスモニタリングを活用いただくことで簡単にテーブルの状態を監視できるようになります。(日本リージョンで使えるようになったら)是非お試しください!

Databricksクイックスタートガイド

Databricksクイックスタートガイド

Databricks無料トライアル

Databricks無料トライアル

2
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?