サーバレスが有効化されているリージョンではテーブルや機械学習モデルの監視を行えるレイクハウスモニタリングが利用できるようになっています。
時系列(time series)分析のサンプルノートブックをウォークスルーします。時間とともにデータが変化していくテーブルを監視するタイプです。
レイクハウスモニタリングサンプルノートブック: 時系列分析
ユーザー要件
- Unity Catalogにアクセスできるクラスターでのコマンド実行権限が必要です。
- 最低でも一つのカタログに対する
USE CATALOG
権限が必要です。最低でも一つのスキーマに対するUSE SCHEMA
権限が必要です。このノートブックではテーブルをmain.default
スキーマに作成します。main.default
スキーマへの権限がない場合には、権限を持つカタログとスキーマに変更してください。
システム要件:
- ワークスペースでUnity Catalogが有効化されている必要があります。
- Databricks Runtime 12.2LTS以降
- シングルユーザーあるいは共有クラスター
このノートブックでは時系列モニターの作成方法を説明します。
レイクハウスモニタリングの詳細に関してはドキュメントを参照ください。(AWS|Azure)
セットアップ
- クラスター設定の検証
- Pythonクライアントのインストール
- カタログ、スキーマ、テーブル名の定義
# クラスター設定のチェック。このセルが失敗する場合、ノートブックの右上のクラスターセレクターで、Databricks Runtime 12.2 LTS 以降が稼働しているクラスターを設定、選択してください。
import os
assert float(os.environ.get("DATABRICKS_RUNTIME_VERSION", 0)) >= 12.2, "Please configure your cluster to use Databricks Runtime 12.2 LTS or above."
%pip install "https://ml-team-public-read.s3.amazonaws.com/wheels/data-monitoring/a4050ef7-b183-47a1-a145-e614628e3146/databricks_lakehouse_monitoring-0.4.4-py3-none-any.whl"
# 新規にインストールしたwheelを用いて環境をリセットするためにこのステップが必要です。
dbutils.library.restartPython()
# カタログに対する`USE CATALOG`権限とスキーマに対する`USE SCHEMA`権限が必要です。
# 必要に応じて、ここでカタログとスキーマを変更して下さい。
CATALOG = "takaakiyayoi_catalog"
SCHEMA = "monitoring"
テーブル名にユーザー名を埋め込みます。
username = spark.sql("SELECT current_user()").first()["current_user()"]
username_prefixes = username.split("@")[0].split(".")
unique_suffix = "_".join([username_prefixes[0], username_prefixes[1][0:2]])
TABLE_NAME = f"{CATALOG}.{SCHEMA}.wine_ts_{unique_suffix}"
BASELINE_TABLE = f"{CATALOG}.{SCHEMA}.wine_ts_baseline_{unique_suffix}"
TIMESTAMP_COL = "timestamp"
spark.sql(f"DROP TABLE IF EXISTS {TABLE_NAME}")
spark.sql(f"DROP TABLE IF EXISTS {BASELINE_TABLE}")
ユーザージャーニー
- テーブルの作成: 生のデータを読み込みプライマリテーブル(監視されるテーブル)とベーステーブル(期待する品質基準を満たす既知のデータを格納)を作成。
- プライマリテーブルに対するモニターを作成。
- メトリクステーブルの調査。
- テーブルを変更してメトリクスを更新。メトリクステーブルの調査。
- [オプション] モニターの削除。
1. Unity Catalogでプライマリテーブルと(オプション)ベースラインテーブルを作成
- これらのテーブルはUnity Catalog配下のDeltaテーブルであり、ノートブックを実行するユーザーが所有者である必要があります。
- 監視されるテーブルは「プライマリテーブル」とも呼ばれます。
- ベースラインテーブルは監視されるテーブルと同じスキーマを持つ必要があります。
このサンプルではwinequality
データセットを使用します。
import pandas as pd
white_wine = pd.read_csv("/dbfs/databricks-datasets/wine-quality/winequality-white.csv", sep=";")
red_wine = pd.read_csv("/dbfs/databricks-datasets/wine-quality/winequality-red.csv", sep=";")
# カテゴリーの追加
white_wine["type"] = "white"
red_wine["type"] = "red"
data_pdf = pd.concat([white_wine, red_wine], axis=0)
# カラム名の整形
data_pdf.columns = data_pdf.columns.str.replace(" ", "_")
データを分割します。baseline_df
はベースラインテーブルです。 ts1_df
はプライマリテーブルであり、ts2_df
は模擬的に未来のデータを示すものです。
data_df = spark.createDataFrame(data_pdf)
baseline_df, ts1_df, ts2_df = data_df.randomSplit(weights=[0.20, 0.40, 0.40], seed=42)
時系列データをシミュレートするために異なるタイムスタンプを作成します。
from datetime import timedelta, datetime
from pyspark.sql import functions as F
# 異なるタイムスタンプを持つデータをシミュレート
timestamp_0 = datetime.now() # ベースラインデータ
timestamp_1 = (datetime.now() + timedelta(1)).timestamp() # 1日後
timestamp_2 = (datetime.now() + timedelta(2)).timestamp()
baseline_df = baseline_df.withColumn("timestamp", F.lit(timestamp_0).cast("timestamp"))
ts1_df = ts1_df.withColumn("timestamp", F.lit(timestamp_1).cast("timestamp"))
ts2_df = ts2_df.withColumn("timestamp", F.lit(timestamp_2).cast("timestamp"))
baseline_df.display()
Unity CatalogにベースラインのDeltaテーブルとプライマリのDeltaテーブルを作成
# Unity Catalogにベースラインテーブルを書き込み
(baseline_df
.write
.format("delta")
.mode("overwrite")
.option("overwriteSchema",True)
.option("delta.enableChangeDataFeed", "true")
.saveAsTable(f"{BASELINE_TABLE}")
)
# Unity Catalogにプライマリテーブルを書き込み。これが監視されるテーブルとなります。このノートブックの後半で、このテーブルに未来のタイムスタンプを持つデータを追加します。
(ts1_df
.write
.format("delta")
.mode("overwrite")
.option("overwriteSchema",True)
.option("delta.enableChangeDataFeed", "true")
.saveAsTable(f"{TABLE_NAME}")
)
行数を確認します。
spark.sql(f"SELECT COUNT(*) FROM {TABLE_NAME}").display()
ビジネスやユースケース観点から除外されるべきすべてのカラムを除外するようにしてください。
import databricks.lakehouse_monitoring as lm
# 分析するウィンドウのサイズ
GRANULARITIES = ["1 day"]
# データをスライスするエクスプレッション
SLICING_EXPRS = ["type='Red'"]
print(f"Creating monitor for {TABLE_NAME}")
info = lm.create_monitor(
table_name=TABLE_NAME,
profile_type=lm.TimeSeries(
timestamp_col=TIMESTAMP_COL,
granularities=GRANULARITIES
),
slicing_exprs=SLICING_EXPRS,
baseline_table_name=BASELINE_TABLE,
output_schema_name=f"{CATALOG}.{SCHEMA}"
)
カタログエクスプローラからプライマリテーブルにアクセスし、品質タブをクリックするとモニターを確認できます。
import time
# モニターの作成を待ちます
while info.status == lm.MonitorStatus.PENDING:
info = lm.get_monitor(table_name=TABLE_NAME)
time.sleep(10)
assert(info.status == lm.MonitorStatus.ACTIVE)
また、更新履歴を表示をクリックすることでメトリクスの更新処理の状態を確認できます。
モニターを作成すると、自動でメトリクスの更新処理が起動します。
# 作成時にメトリックの更新が自動でトリガーされます
refreshes = lm.list_refreshes(table_name=TABLE_NAME)
assert(len(refreshes) > 0)
run_info = refreshes[0]
while run_info.state in (lm.RefreshState.PENDING, lm.RefreshState.RUNNING):
run_info = lm.get_refresh(table_name=TABLE_NAME, refresh_id=run_info.refresh_id)
time.sleep(30)
assert(run_info.state == lm.RefreshState.SUCCESS)
モニターを作成するとメトリクスを確認するためのダッシュボードが自動生成されます。ダッシュボードを開くにはセルのアウトプットでハイライトされているダッシュボードのリンクをクリックします。カタログエクスプローラのUIからダッシュボードに移動することもできます。
なお、ダッシュボードやダッシュボードで使用されるクエリーはホームディレクトリ配下のdatabricks_lakehouse_monitoring
に格納されます。
以下のメソッドでモニターの情報を表示できます。
lm.get_monitor(table_name=TABLE_NAME)
3. メトリクステーブルの調査
デフォルトではメトリクステーブルはdefaultデータベースに格納されます。
create_monitor
の呼び出しで2つの新規テーブル作成されます: プロファイルメトリクステーブルとドリフトメトリクステーブルです。
これらの2つのテーブルは分析ジョブのアウトプットを記録します。このテーブルでは監視されるプライマリテーブルと同じ名前を使用し、サフィックス_profile_metrics
と_drift_metrics
を使用します。
プロファイルメトリクステーブルのオリエンテーション
プロファイルメトリクステーブルにはサフィックス_profile_metrics
が追加されます。テーブルに表示される統計情報の一覧に関しては、ドキュメントを参照ください(AWS|Azure)。
- プライマリテーブルのすべてのカラムに対して、ベースラインテーブルとプライマリテーブルに対するサマリーの統計情報を表示します。カラム
log_type
はプライマリテーブルの統計情報を示すINPUT
、ベースラインテーブルの統計情報を示すBASELINE
を表示しています。プライマリテーブルのカラムはカラムcolumn_name
で識別できます。 -
TimeSeries
タイプの分析では、granularity
カラムには行に対する粒度が表示されます。ベースラインテーブルの統計情報ではgranularity
カラムの値はnull
になります。 - このテーブルでは、それぞれの時間ウィンドウにおけるそれぞれのスライスキーのそれぞれの値に対する統計情報とテーブル全体に対する統計情報が表示されます。テーブル全体の統計情報は
slice_key
=slice_value
=null
で特定することができます。 - プライマリテーブルでは、
window
カラムに当該行に対応する時間ウィンドウが表示されます。ベースラインテーブルの統計情報では、window
カラムはnull
になります。 - いくつかの統計情報は特定のカラムではなくテーブル全体に対して計算されます。
column_name
カラムの:table
によって、これらの統計情報を特定できます。
# プロファイルメトリクステーブルを表示
profile_table = f"{TABLE_NAME}_profile_metrics"
display(spark.sql(f"SELECT * FROM {profile_table}"))
ドリフトメトリクステーブルのオリエンテーション
ドリフトメトリクステーブルにはサフィックス_drift_metrics
が追加されます。テーブルに表示される統計情報の一覧に関しては、ドキュメントを参照ください(AWS|Azure)。
- プライマリテーブルのすべてのカラムに対して、ドリフトテーブルではテーブルの現在の値と以前の分析実行時の値、ベースラインテーブルと比較した一連のメトリクスを表示します。ベースラインテーブルに対するドリフトでは、カラム
drift_type
にBASELINE
が表示され、以前の時間ウィンドウに対するドリフトでは、CONSECUTIVE
が表示されます。プロファイルテーブルと同じように、プライマリテーブルのカラムはカラムcolumn_name
で特定できます。- この時点では、このモニターの最初の実行であるため、比較する以前のウィンドウがありません。このため、
drift_type
がCONSECUTIVE
である行はありません。
- この時点では、このモニターの最初の実行であるため、比較する以前のウィンドウがありません。このため、
-
TimeSeries
タイプの分析では、granularity
カラムには行に対する粒度が表示されます。 - このテーブルでは、それぞれの時間ウィンドウにおけるそれぞれのスライスキーのそれぞれの値に対する統計情報とテーブル全体に対する統計情報が表示されます。テーブル全体の統計情報は
slice_key
=slice_value
=null
で特定することができます。 -
window
カラムには当該行に対応する時間ウィンドウが表示されます。window_cmp
カラムには比較対象のウィンドウが表示されます。ベースラインテーブルとの比較の場合には、window_cmp
はnull
になります。 - いくつかの統計情報は特定のカラムではなくテーブル全体に対して計算されます。
column_name
カラムの:table
によって、これらの統計情報を特定できます。
# ドリフトメトリクステーブルの表示
drift_table = f"{TABLE_NAME}_drift_metrics"
display(spark.sql(f"SELECT * FROM {drift_table}"))
4. テーブルに新規データを追加し、メトリクスをリフレッシュ
テーブルに新規データを追加
以下のセルではプライマリテーブルに、シミュレートした未来のデータts2_df
を追加します。
(ts2_df
.write.format("delta").mode("append")
.option("mergeSchema",True)
.option("delta.enableChangeDataFeed", "true")
.saveAsTable(f"{TABLE_NAME}")
)
行数を確認します。
spark.sql(f"SELECT COUNT(*) FROM {TABLE_NAME}").display()
メトリクスのリフレッシュ
run_info = lm.run_refresh(table_name=TABLE_NAME)
while run_info.state in (lm.RefreshState.PENDING, lm.RefreshState.RUNNING):
run_info = lm.get_refresh(table_name=TABLE_NAME, refresh_id=run_info.refresh_id)
time.sleep(30)
assert(run_info.state == lm.RefreshState.SUCCESS)
リフレッシュが完了したら、変化を確認するためにモニタリングダッシュボードを開きます。
[オプション] モニターの削除
モニターをクリーンアップするには以下のコードのコメントを解除してください。一つのテーブルには単一のモニターのみを追加できます。
# lm.delete_monitor(table_name=TABLE_NAME)
テーブルの統計情報をモニタリングするためには、これまでには手組みで色々なロジックを構成しないといけませんでしたが、例クハウスモニタリングを活用いただくことで簡単にテーブルの状態を監視できるようになります。(日本リージョンで使えるようになったら)是非お試しください!