0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Databricksのレイクハウスモニタリングの一連の設定をAPIで実行

Posted at

はじめに

Databricksのテーブルを監視する機能であるレイクハウスモニタリング(レイクハウス監視)について、以下のようにUIでモニターを設定することはできるのですが、テーブルごとに設定する必要があるため、監視対象のテーブルが多くなると手間がかかります。なので、APIによる設定を試してみました。
<モニター設定箇所>
image.png

また、データに異常が発生した際に通知させるために、モニター設定で作成されたメトリックテーブルの値に基づいた Databricks SQL アラートもAPIで作成してみました。

モニターの作成

指定したテーブルにモニターを作成します。プロファイルはスナップショットで設定しています。

create_monitor
DATABRICKS_INSTANCE = '<ワークスペースURL>'
full_table_name = f"{catalog}.{schema}.{table}"

# ヘッダー
headers = {
    'Authorization': f'Bearer {TOKEN}',
    'Content-Type': 'application/json'
}

# ボディ
body = {
    "skip_builtin_dashboard": "true",
    "assets_dir": "<ディレクトリ>",
    "output_schema_name": "<カタログ>.<スキーマ>",
    "snapshot": {},
    "schedule": {
      "quartz_cron_expression": "0 0 12 * * ?",
      "timezone_id": "JST"
    },
    "notifications": {
      "on_failure": {
        "email_addresses": [
          "<メールアドレス>"
        ]
      }
    }
}

# APIエンドポイント
url = f"{DATABRICKS_INSTANCE}/api/2.1/unity-catalog/tables/{full_table_name}/monitor"

# リクエスト送信
response = requests.post(url, headers=headers, json=body)

if response.status_code == 200:
  print(response.json())
else:
  print(f"Error: {response.status_code}, {response.text}")
  • UIで設定する場合、ダッシュボードは必須で作成されてしまうのですが、"skip_builtin_dashboard": "true" で作成しないようにできます。
  • "quartz_cron_expression": "0 0 12 * * ?" は毎日12時に更新する設定となります。

必要な権限や設定値の詳細についてはドキュメントをご確認ください。

クエリの作成

続いてアラートの基となるクエリを作成します。クエリはプロファイルメトリックテーブルまたはドリフトメトリックテーブルに対して作成します。

create_query
DATABRICKS_INSTANCE = '<ワークスペースURL>'

# ヘッダー
headers = {
    'Authorization': f'Bearer {TOKEN}',
    'Content-Type': 'application/json'
}

# ボディ
body = {
  "query": {
    "display_name": "<クエリ名>",
    "parent_path": "<ディレクトリ>",
    "query_text": query_text,
    "run_as_mode": "<実行者>"
  }
}

# APIエンドポイント
url = f"{DATABRICKS_INSTANCE}/api/2.0/sql/queries"

# リクエスト送信
response = requests.post(url, headers=headers, json=body)

if response.status_code == 200:
  print(response.json())
else:
  print(f"Error: {response.status_code}, {response.text}")
  • query_text(クエリの中身) について、私が作成したサンプルコードを残しておきます。こちらはドリフトメトリックテーブルに対するクエリで、最新断面で列が増加・減少(全部null)、もしくはスキーマと一致しない列があった場合にフラグが立つようにしているつもりです。
クエリのサンプルコード
query_text
query_text =f"""
WITH 
last_window_start AS (
  SELECT MAX(window.start) AS Window
  FROM `{catalog}`.`{schema}`.`{table}_drift_metrics`
),
t01 AS (
SELECT 
    non_null_columns_delta.added, non_null_columns_delta.missing
  FROM `{catalog}`.`{schema}`.`{table}_drift_metrics`
  WHERE window.start = (SELECT Window FROM last_window_start)
  AND column_name = ":table"
),
t02 AS (
SELECT 
    count_delta
  FROM `{catalog}`.`{schema}`.`{table}_drift_metrics`
  WHERE window.start = (SELECT Window FROM last_window_start)
  AND column_name = "_rescued_data"
)
select 
 t01.added as non_null_columns_delta_added, 
 t01.missing as non_null_columns_delta_missing, 
 t02.count_delta as _rescued_data_count_delta, 
 CASE WHEN t01.added >= 1 OR t01.missing >= 1 OR t02.count_delta>= 1 THEN 1 ELSE 0 END as alert_flg
from t01 join t02 on 1=1
"""

必要な権限や設定値の詳細についてはドキュメントをご確認ください。

アラートの作成

次にアラートを作成します。

create_alert
DATABRICKS_INSTANCE = '<ワークスペースURL>'

# ヘッダー
headers = {
    'Authorization': f'Bearer {TOKEN}',
    'Content-Type': 'application/json'
}

# ボディ
body = {
  "alert": {
    "display_name": "<アラート名>",
    "condition": {
      "op": "EQUAL",
      "operand": {
        "column": {
          "name": "<列名>"
        }
      },
      "threshold": {
        "value": {
          "double_value": 1
        }
      },
      "empty_result_state": "UNKNOWN"
    },
    "query_id": "<クエリID>",
    "parent_path": "<ディレクトリ>",
    "notify_on_ok": "false"
  }
}

# APIエンドポイント
url = f"{DATABRICKS_INSTANCE}/api/2.0/sql/alerts"

# リクエスト送信
response = requests.post(url, headers=headers, json=body)

if response.status_code == 200:
  print(response.json())
else:
  print(f"Error: {response.status_code}, {response.text}")
  • クエリ名から query_id を取得する方法は以下となります。
クエリIDを取得するAPI
get_query_id
DATABRICKS_INSTANCE = '<ワークスペースURL>'

# ヘッダー
headers = {
    'Authorization': f'Bearer {TOKEN}',
    'Content-Type': 'application/json'
}

# APIエンドポイント
url = f"{DATABRICKS_INSTANCE}/api/2.0/sql/queries"

# リクエスト送信
response = requests.get(url, headers=headers)

if response.status_code == 200:
  results = response.json()
  for query in results['results']:
    if query['display_name']  == query_name:
      query_id = query['id']
      print(query_id)
else:
  print(f"Error: {response.status_code}, {response.text}")

https://docs.databricks.com/api/azure/workspace/queries/list

必要な権限や設定値の詳細についてはドキュメントをご確認ください。

ジョブの作成

GUIだと下記のようにアラートからスケジュール設定ができるのですが、APIだとこれを直接設定することはできないようなので、ジョブを作成し、アラートをタスクとして設定してスケジュール実行させます。

image.png

create_alert_job
DATABRICKS_INSTANCE = '<ワークスペースURL>'

# ヘッダー
headers = {
    'Authorization': f'Bearer {TOKEN}',
    'Content-Type': 'application/json'
}

# ボディ
body = {
  "name": "alert_metrics",
  "schedule": {
      "quartz_cron_expression": "0 0 13 * * ?",
      "timezone_id": "JST"
    },
  "tasks": [
    {
        "task_key": f"alert_{table}",
        "sql_task": {
            "alert": {
                "alert_id": alert_id,
                "subscriptions": [
                    {
                        "user_name": "<メールアドレス>"
                    }
                ]
            },
            "warehouse_id": "<SQLウェアハウスID>"
        }
    }
  ]
}

# APIエンドポイント
url = f"{DATABRICKS_INSTANCE}/api/2.1/jobs/create"

# リクエスト送信
response = requests.post(url, headers=headers, json=body)

if response.status_code == 200:
  print(response.json())
else:
  print(f"Error: {response.status_code}, {response.text}")
  • アラート名から alert_id を取得する方法は以下となります。
アラートIDを取得するAPI
get_alert_id
DATABRICKS_INSTANCE = '<ワークスペースURL>'

# ヘッダー
headers = {
    'Authorization': f'Bearer {TOKEN}',
    'Content-Type': 'application/json'
}

# APIエンドポイント
url = f"{DATABRICKS_INSTANCE}/api/2.0/sql/alerts"

# リクエスト送信
response = requests.get(url, headers=headers)

if response.status_code == 200:
  results = response.json()
  for alert in results['results']:
    if alert['display_name']  == alert_name:
      alert_id = alert['id']
      print(alert_id)
else:
  print(f"Error: {response.status_code}, {response.text}")

https://docs.databricks.com/api/azure/workspace/alerts/list

  • "quartz_cron_expression": "0 0 13 * * ?" は毎日13時に実行する設定となります。
  • タスクに設定すると以下のようになります。
    image.png

必要な権限や設定値の詳細についてはドキュメントをご確認ください。

おわりに

  • テーブル一覧を取得して、一気にモニターを設定することも可能かなと思いました。
  • 改善点として、アラートの通知先をTeamsにできると良いかなと思います。(今回なぜか上手く通知されず断念しました。)
0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?