タイトルのようなニーズはあると思います。
注意
こちらのアプローチはDatabricks on AWSで動作確認しています。Azure Databricksでは動作しません
Databricksのクエリーはスケジュール実行が可能です。
かつては、これと(レガシーな)ダッシュボードと組み合わせることで最新の結果をダッシュボードに表示できていたのですが、ダッシュボードがAI/BIになってからはあまり使われなくなっているとの認識です。
そうすると、表示する先を持たないスケジュールされた野良クエリーが存在することになります。そういったクエリーを特定し、一括で停止させましょう。
上記のスケジュールされたクエリーは、スケジュールを設定した時点でジョブとして設定されます。
こういうシナリオでは、Databricks SDKを使ってプログラムで特定、停止するのが妥当だと思います。Databricks SDKに関してはこちらをご覧ください。
Databricks SDKを使う際には、対象のオブジェクトが何であり、そのオブジェクトの階層構造を理解することが重要です。ここでの対象となるDatabricksジョブは結構階層構造が複雑です。まず、ジョブは複数のタスクから構成されていることをイメージする必要があります。これらの、構造を読み解くにはこちらのマニュアルが不可欠です。
今回、主に使うのは以下のクラスとメソッドです。
- databricks.sdk.service.jobs.JobsExt.list: ジョブの一覧を取得するメソッド。
- databricks.sdk.service.jobs.BaseJob: ジョブ自身を表現するクラス。
- databricks.sdk.service.jobs.JobSettings: ジョブの設定を表現するクラス。
- databricks.sdk.service.jobs.SqlTask: Databricks SQLのクエリー、ダッシュボード、アラートを表現するクラス。
- databricks.sdk.service.jobs.SqlTaskQuery: クエリーを表現するクラス。
- databricks.sdk.service.jobs.CronSchedule: スケジュール設定を表現するクラス。
- databricks.sdk.service.jobs.PauseStatus: ジョブが一時停止かどうかを表現。
- databricks.sdk.service.jobs.JobsExt.update: ジョブの設定を更新するメソッド。
以下のツリー構造になっています。
BaseJob
└── JobSettings
├── Task
| └── SqlTask
| └── SqlTaskQuery
└── CronSchedule
└── PauseStatus
この構造を踏まえて、タスクにクエリーが含まれていてかつスケジュールされていて一時停止状態にないものを特定します。
こちらがスクリプトです。
from databricks.sdk import WorkspaceClient
from databricks.sdk.service.jobs import JobSettings, CronSchedule, PauseStatus
w = WorkspaceClient()
# ジョブ一覧
# ジョブ数が100件を超える場合は page_token によるページネーションが必要となります
for target_job in w.jobs.list(expand_tasks=True, limit=100):
# ジョブがスケジュールされているかどうか
scheduled = False
# SQLタスク(クエリー)かどうか
is_sql_task = False
if target_job.creator_user_name == "takaaki.yayoi@databricks.com":
print("########################")
print(f"ジョブ: {target_job.settings.name} ({target_job.job_id})")
print("スケジュール:", target_job.settings.schedule)
# ジョブがスケジューリングされているかチェック
if target_job.settings.schedule != None and target_job.settings.schedule.pause_status != PauseStatus.PAUSED:
print("スケジュールのステータス:", target_job.settings.schedule.pause_status)
scheduled = True
# ジョブに含まれるタスクのSQLタスクかどうかチェック
for task in target_job.settings.tasks:
print("SQLタスク:", task.sql_task)
# クエリーかどうかをチェック
if task.sql_task and task.sql_task.query:
print("クエリー:", task.sql_task.query)
is_sql_task = True
# スケジュールされているSQLタスクかどうか
if scheduled and is_sql_task:
print("#### このジョブにはスケジュールされており、クエリーが含まれています。停止を試みます。")
# ジョブのスケジュールの pause_status を PAUSED に更新
new_job_settings = JobSettings(
schedule=CronSchedule(
quartz_cron_expression=target_job.settings.schedule.quartz_cron_expression,
timezone_id=target_job.settings.schedule.timezone_id,
pause_status=PauseStatus.PAUSED
)
)
print(new_job_settings)
w.jobs.update(job_id=target_job.job_id, new_settings=new_job_settings)
print("停止しました。")
上のスクリプトは他に影響を与えないように自分のジョブだけに限定しています。以下のようにスケジュール済みのクエリーscheduled query...
が2つある状態です。肝は以下の2点です:
-
w.jobs.list
でexpand_tasks=True
を指定することで、タスクの詳細情報を取得しています。 - 特定したジョブに対して、
PauseStatus.PAUSED
を指定してジョブの設定を更新します。w.jobs.update
は設定されたパラメーターのみを用いて、指定された場所だけを部分的に更新をかけます。
これで、上のスクリプトを実行します。
########################
ジョブ: scheduled query_2 2025-02-10 20:53:03 (1106867414717513)
スケジュール: CronSchedule(quartz_cron_expression='3 53 20 * * ?', timezone_id='Asia/Tokyo', pause_status=<PauseStatus.UNPAUSED: 'UNPAUSED'>)
スケジュールのステータス: PauseStatus.UNPAUSED
SQLタスク: SqlTask(warehouse_id='bec52b183a4cfe2a', alert=None, dashboard=None, file=None, parameters=None, query=SqlTaskQuery(query_id='e178cd0a-ad3f-4d02-9d57-f31f8bddb869'))
クエリー: SqlTaskQuery(query_id='e178cd0a-ad3f-4d02-9d57-f31f8bddb869')
#### このジョブにはスケジュールされており、クエリーが含まれています。停止を試みます。
JobSettings(budget_policy_id=None, continuous=None, deployment=None, description=None, edit_mode=None, email_notifications=None, environments=None, format=None, git_source=None, health=None, job_clusters=None, max_concurrent_runs=None, name=None, notification_settings=None, parameters=None, queue=None, run_as=None, schedule=CronSchedule(quartz_cron_expression='3 53 20 * * ?', timezone_id='Asia/Tokyo', pause_status=<PauseStatus.PAUSED: 'PAUSED'>), tags=None, tasks=None, timeout_seconds=None, trigger=None, webhook_notifications=None)
停止しました。
########################
ジョブ: scheduled query 2025-02-10 20:17:34 (1119167818978975)
スケジュール: CronSchedule(quartz_cron_expression='34 17 20 * * ?', timezone_id='Asia/Tokyo', pause_status=<PauseStatus.UNPAUSED: 'UNPAUSED'>)
スケジュールのステータス: PauseStatus.UNPAUSED
SQLタスク: SqlTask(warehouse_id='585073fb24f9f86c', alert=None, dashboard=None, file=None, parameters=None, query=SqlTaskQuery(query_id='d385b3fe-dffa-443b-b040-dd0ee3900693'))
クエリー: SqlTaskQuery(query_id='d385b3fe-dffa-443b-b040-dd0ee3900693')
#### このジョブにはスケジュールされており、クエリーが含まれています。停止を試みます。
JobSettings(budget_policy_id=None, continuous=None, deployment=None, description=None, edit_mode=None, email_notifications=None, environments=None, format=None, git_source=None, health=None, job_clusters=None, max_concurrent_runs=None, name=None, notification_settings=None, parameters=None, queue=None, run_as=None, schedule=CronSchedule(quartz_cron_expression='34 17 20 * * ?', timezone_id='Asia/Tokyo', pause_status=<PauseStatus.PAUSED: 'PAUSED'>), tags=None, tasks=None, timeout_seconds=None, trigger=None, webhook_notifications=None)
停止しました。
########################
ジョブ: 新規ジョブ 2024-08-21 11:14:02 (604864710468504)
スケジュール: None
SQLタスク: None
########################
ジョブ: job with looping (5149234770534)
スケジュール: None
SQLタスク: None
SQLタスク: None
########################
ジョブ: サンプルジョブ (1065290150917006)
スケジュール: None
SQLタスク: None
SQLタスク: None
########################
ジョブ: dbdemos_lakehouse_iot_turbine_init_takaaki_yayoi (535101618310127)
スケジュール: None
SQLタスク: None
SQLタスク: None
SQLタスク: None
SQLタスク: None
SQLタスク: None
########################
ジョブ: terminate_model_serving (397311256284478)
スケジュール: CronSchedule(quartz_cron_expression='26 14 11 * * ?', timezone_id='Asia/Tokyo', pause_status=<PauseStatus.PAUSED: 'PAUSED'>)
SQLタスク: None
########################
ジョブ: test (3021)
スケジュール: None
SQLタスク: None
狙った通りにスケジュール済みのクエリーだけを一時停止しました。