はじめに
Databricksのテーブルを監視する機能であるレイクハウスモニタリング(レイクハウス監視)について、以下のようにUIでモニターを設定することはできるのですが、テーブルごとに設定する必要があるため、監視対象のテーブルが多くなると手間がかかります。なので、APIによる設定を試してみました。
<モニター設定箇所>
また、データに異常が発生した際に通知させるために、モニター設定で作成されたメトリックテーブルの値に基づいた Databricks SQL アラートもAPIで作成してみました。
モニターの作成
指定したテーブルにモニターを作成します。プロファイルはスナップショットで設定しています。
DATABRICKS_INSTANCE = '<ワークスペースURL>'
full_table_name = f"{catalog}.{schema}.{table}"
# ヘッダー
headers = {
'Authorization': f'Bearer {TOKEN}',
'Content-Type': 'application/json'
}
# ボディ
body = {
"skip_builtin_dashboard": "true",
"assets_dir": "<ディレクトリ>",
"output_schema_name": "<カタログ>.<スキーマ>",
"snapshot": {},
"schedule": {
"quartz_cron_expression": "0 0 12 * * ?",
"timezone_id": "JST"
},
"notifications": {
"on_failure": {
"email_addresses": [
"<メールアドレス>"
]
}
}
}
# APIエンドポイント
url = f"{DATABRICKS_INSTANCE}/api/2.1/unity-catalog/tables/{full_table_name}/monitor"
# リクエスト送信
response = requests.post(url, headers=headers, json=body)
if response.status_code == 200:
print(response.json())
else:
print(f"Error: {response.status_code}, {response.text}")
- UIで設定する場合、ダッシュボードは必須で作成されてしまうのですが、
"skip_builtin_dashboard": "true"
で作成しないようにできます。 -
"quartz_cron_expression": "0 0 12 * * ?"
は毎日12時に更新する設定となります。
必要な権限や設定値の詳細についてはドキュメントをご確認ください。
クエリの作成
続いてアラートの基となるクエリを作成します。クエリはプロファイルメトリックテーブルまたはドリフトメトリックテーブルに対して作成します。
DATABRICKS_INSTANCE = '<ワークスペースURL>'
# ヘッダー
headers = {
'Authorization': f'Bearer {TOKEN}',
'Content-Type': 'application/json'
}
# ボディ
body = {
"query": {
"display_name": "<クエリ名>",
"parent_path": "<ディレクトリ>",
"query_text": query_text,
"run_as_mode": "<実行者>"
}
}
# APIエンドポイント
url = f"{DATABRICKS_INSTANCE}/api/2.0/sql/queries"
# リクエスト送信
response = requests.post(url, headers=headers, json=body)
if response.status_code == 200:
print(response.json())
else:
print(f"Error: {response.status_code}, {response.text}")
-
query_text
(クエリの中身) について、私が作成したサンプルコードを残しておきます。こちらはドリフトメトリックテーブルに対するクエリで、最新断面で列が増加・減少(全部null)、もしくはスキーマと一致しない列があった場合にフラグが立つようにしているつもりです。
クエリのサンプルコード
query_text =f"""
WITH
last_window_start AS (
SELECT MAX(window.start) AS Window
FROM `{catalog}`.`{schema}`.`{table}_drift_metrics`
),
t01 AS (
SELECT
non_null_columns_delta.added, non_null_columns_delta.missing
FROM `{catalog}`.`{schema}`.`{table}_drift_metrics`
WHERE window.start = (SELECT Window FROM last_window_start)
AND column_name = ":table"
),
t02 AS (
SELECT
count_delta
FROM `{catalog}`.`{schema}`.`{table}_drift_metrics`
WHERE window.start = (SELECT Window FROM last_window_start)
AND column_name = "_rescued_data"
)
select
t01.added as non_null_columns_delta_added,
t01.missing as non_null_columns_delta_missing,
t02.count_delta as _rescued_data_count_delta,
CASE WHEN t01.added >= 1 OR t01.missing >= 1 OR t02.count_delta>= 1 THEN 1 ELSE 0 END as alert_flg
from t01 join t02 on 1=1
"""
必要な権限や設定値の詳細についてはドキュメントをご確認ください。
アラートの作成
次にアラートを作成します。
DATABRICKS_INSTANCE = '<ワークスペースURL>'
# ヘッダー
headers = {
'Authorization': f'Bearer {TOKEN}',
'Content-Type': 'application/json'
}
# ボディ
body = {
"alert": {
"display_name": "<アラート名>",
"condition": {
"op": "EQUAL",
"operand": {
"column": {
"name": "<列名>"
}
},
"threshold": {
"value": {
"double_value": 1
}
},
"empty_result_state": "UNKNOWN"
},
"query_id": "<クエリID>",
"parent_path": "<ディレクトリ>",
"notify_on_ok": "false"
}
}
# APIエンドポイント
url = f"{DATABRICKS_INSTANCE}/api/2.0/sql/alerts"
# リクエスト送信
response = requests.post(url, headers=headers, json=body)
if response.status_code == 200:
print(response.json())
else:
print(f"Error: {response.status_code}, {response.text}")
- クエリ名から
query_id
を取得する方法は以下となります。
クエリIDを取得するAPI
DATABRICKS_INSTANCE = '<ワークスペースURL>'
# ヘッダー
headers = {
'Authorization': f'Bearer {TOKEN}',
'Content-Type': 'application/json'
}
# APIエンドポイント
url = f"{DATABRICKS_INSTANCE}/api/2.0/sql/queries"
# リクエスト送信
response = requests.get(url, headers=headers)
if response.status_code == 200:
results = response.json()
for query in results['results']:
if query['display_name'] == query_name:
query_id = query['id']
print(query_id)
else:
print(f"Error: {response.status_code}, {response.text}")
https://docs.databricks.com/api/azure/workspace/queries/list
必要な権限や設定値の詳細についてはドキュメントをご確認ください。
ジョブの作成
GUIだと下記のようにアラートからスケジュール設定ができるのですが、APIだとこれを直接設定することはできないようなので、ジョブを作成し、アラートをタスクとして設定してスケジュール実行させます。
DATABRICKS_INSTANCE = '<ワークスペースURL>'
# ヘッダー
headers = {
'Authorization': f'Bearer {TOKEN}',
'Content-Type': 'application/json'
}
# ボディ
body = {
"name": "alert_metrics",
"schedule": {
"quartz_cron_expression": "0 0 13 * * ?",
"timezone_id": "JST"
},
"tasks": [
{
"task_key": f"alert_{table}",
"sql_task": {
"alert": {
"alert_id": alert_id,
"subscriptions": [
{
"user_name": "<メールアドレス>"
}
]
},
"warehouse_id": "<SQLウェアハウスID>"
}
}
]
}
# APIエンドポイント
url = f"{DATABRICKS_INSTANCE}/api/2.1/jobs/create"
# リクエスト送信
response = requests.post(url, headers=headers, json=body)
if response.status_code == 200:
print(response.json())
else:
print(f"Error: {response.status_code}, {response.text}")
- アラート名から
alert_id
を取得する方法は以下となります。
アラートIDを取得するAPI
DATABRICKS_INSTANCE = '<ワークスペースURL>'
# ヘッダー
headers = {
'Authorization': f'Bearer {TOKEN}',
'Content-Type': 'application/json'
}
# APIエンドポイント
url = f"{DATABRICKS_INSTANCE}/api/2.0/sql/alerts"
# リクエスト送信
response = requests.get(url, headers=headers)
if response.status_code == 200:
results = response.json()
for alert in results['results']:
if alert['display_name'] == alert_name:
alert_id = alert['id']
print(alert_id)
else:
print(f"Error: {response.status_code}, {response.text}")
必要な権限や設定値の詳細についてはドキュメントをご確認ください。
おわりに
- テーブル一覧を取得して、一気にモニターを設定することも可能かなと思いました。
- 改善点として、アラートの通知先をTeamsにできると良いかなと思います。(今回なぜか上手く通知されず断念しました。)