2
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Databricks on AWSでスケジュールされているクエリーの特定および一時停止

Last updated at Posted at 2025-02-10

タイトルのようなニーズはあると思います。

注意
こちらのアプローチはDatabricks on AWSで動作確認しています。Azure Databricksでは動作しません

Databricksのクエリーはスケジュール実行が可能です。

かつては、これと(レガシーな)ダッシュボードと組み合わせることで最新の結果をダッシュボードに表示できていたのですが、ダッシュボードがAI/BIになってからはあまり使われなくなっているとの認識です。

Screenshot 2025-02-10 at 21.02.36.png

そうすると、表示する先を持たないスケジュールされた野良クエリーが存在することになります。そういったクエリーを特定し、一括で停止させましょう。

上記のスケジュールされたクエリーは、スケジュールを設定した時点でジョブとして設定されます。

Screenshot 2025-02-10 at 21.02.50.png

こういうシナリオでは、Databricks SDKを使ってプログラムで特定、停止するのが妥当だと思います。Databricks SDKに関してはこちらをご覧ください。

Databricks SDKを使う際には、対象のオブジェクトが何であり、そのオブジェクトの階層構造を理解することが重要です。ここでの対象となるDatabricksジョブは結構階層構造が複雑です。まず、ジョブ複数のタスクから構成されていることをイメージする必要があります。これらの、構造を読み解くにはこちらのマニュアルが不可欠です。

今回、主に使うのは以下のクラスとメソッドです。

以下のツリー構造になっています。

BaseJob
└── JobSettings
    ├── Task
    |   └── SqlTask
    |       └── SqlTaskQuery
    └── CronSchedule
        └── PauseStatus

この構造を踏まえて、タスクにクエリーが含まれていてかつスケジュールされていて一時停止状態にないものを特定します。

こちらがスクリプトです。

from databricks.sdk import WorkspaceClient
from databricks.sdk.service.jobs import JobSettings, CronSchedule, PauseStatus

w = WorkspaceClient()

# ジョブ一覧
# ジョブ数が100件を超える場合は page_token によるページネーションが必要となります
for target_job in w.jobs.list(expand_tasks=True, limit=100):

  # ジョブがスケジュールされているかどうか
  scheduled = False
  # SQLタスク(クエリー)かどうか
  is_sql_task = False

  if target_job.creator_user_name == "takaaki.yayoi@databricks.com":
    print("########################")
    print(f"ジョブ: {target_job.settings.name} ({target_job.job_id})")
    print("スケジュール:", target_job.settings.schedule)

    # ジョブがスケジューリングされているかチェック
    if target_job.settings.schedule != None and target_job.settings.schedule.pause_status != PauseStatus.PAUSED:
      print("スケジュールのステータス:", target_job.settings.schedule.pause_status)
      scheduled = True

    # ジョブに含まれるタスクのSQLタスクかどうかチェック
    for task in target_job.settings.tasks:
      print("SQLタスク:", task.sql_task)
      # クエリーかどうかをチェック
      if task.sql_task and task.sql_task.query:
        print("クエリー:", task.sql_task.query)
        is_sql_task = True

        # スケジュールされているSQLタスクかどうか
        if scheduled and is_sql_task:
          print("#### このジョブにはスケジュールされており、クエリーが含まれています。停止を試みます。")

          # ジョブのスケジュールの pause_status を PAUSED に更新
          new_job_settings = JobSettings(
            schedule=CronSchedule(
              quartz_cron_expression=target_job.settings.schedule.quartz_cron_expression,
              timezone_id=target_job.settings.schedule.timezone_id,
              pause_status=PauseStatus.PAUSED
            )
          )
          print(new_job_settings)

          w.jobs.update(job_id=target_job.job_id, new_settings=new_job_settings)
          print("停止しました。")

上のスクリプトは他に影響を与えないように自分のジョブだけに限定しています。以下のようにスケジュール済みのクエリーscheduled query...が2つある状態です。肝は以下の2点です:

  • w.jobs.listexpand_tasks=Trueを指定することで、タスクの詳細情報を取得しています。
  • 特定したジョブに対して、PauseStatus.PAUSEDを指定してジョブの設定を更新します。w.jobs.updateは設定されたパラメーターのみを用いて、指定された場所だけを部分的に更新をかけます。

Screenshot 2025-02-10 at 21.06.12.png

これで、上のスクリプトを実行します。

########################
ジョブ: scheduled query_2 2025-02-10 20:53:03 (1106867414717513)
スケジュール: CronSchedule(quartz_cron_expression='3 53 20 * * ?', timezone_id='Asia/Tokyo', pause_status=<PauseStatus.UNPAUSED: 'UNPAUSED'>)
スケジュールのステータス: PauseStatus.UNPAUSED
SQLタスク: SqlTask(warehouse_id='bec52b183a4cfe2a', alert=None, dashboard=None, file=None, parameters=None, query=SqlTaskQuery(query_id='e178cd0a-ad3f-4d02-9d57-f31f8bddb869'))
クエリー: SqlTaskQuery(query_id='e178cd0a-ad3f-4d02-9d57-f31f8bddb869')
#### このジョブにはスケジュールされており、クエリーが含まれています。停止を試みます。
JobSettings(budget_policy_id=None, continuous=None, deployment=None, description=None, edit_mode=None, email_notifications=None, environments=None, format=None, git_source=None, health=None, job_clusters=None, max_concurrent_runs=None, name=None, notification_settings=None, parameters=None, queue=None, run_as=None, schedule=CronSchedule(quartz_cron_expression='3 53 20 * * ?', timezone_id='Asia/Tokyo', pause_status=<PauseStatus.PAUSED: 'PAUSED'>), tags=None, tasks=None, timeout_seconds=None, trigger=None, webhook_notifications=None)
停止しました。
########################
ジョブ: scheduled query 2025-02-10 20:17:34 (1119167818978975)
スケジュール: CronSchedule(quartz_cron_expression='34 17 20 * * ?', timezone_id='Asia/Tokyo', pause_status=<PauseStatus.UNPAUSED: 'UNPAUSED'>)
スケジュールのステータス: PauseStatus.UNPAUSED
SQLタスク: SqlTask(warehouse_id='585073fb24f9f86c', alert=None, dashboard=None, file=None, parameters=None, query=SqlTaskQuery(query_id='d385b3fe-dffa-443b-b040-dd0ee3900693'))
クエリー: SqlTaskQuery(query_id='d385b3fe-dffa-443b-b040-dd0ee3900693')
#### このジョブにはスケジュールされており、クエリーが含まれています。停止を試みます。
JobSettings(budget_policy_id=None, continuous=None, deployment=None, description=None, edit_mode=None, email_notifications=None, environments=None, format=None, git_source=None, health=None, job_clusters=None, max_concurrent_runs=None, name=None, notification_settings=None, parameters=None, queue=None, run_as=None, schedule=CronSchedule(quartz_cron_expression='34 17 20 * * ?', timezone_id='Asia/Tokyo', pause_status=<PauseStatus.PAUSED: 'PAUSED'>), tags=None, tasks=None, timeout_seconds=None, trigger=None, webhook_notifications=None)
停止しました。
########################
ジョブ: 新規ジョブ 2024-08-21 11:14:02 (604864710468504)
スケジュール: None
SQLタスク: None
########################
ジョブ: job with looping (5149234770534)
スケジュール: None
SQLタスク: None
SQLタスク: None
########################
ジョブ: サンプルジョブ (1065290150917006)
スケジュール: None
SQLタスク: None
SQLタスク: None
########################
ジョブ: dbdemos_lakehouse_iot_turbine_init_takaaki_yayoi (535101618310127)
スケジュール: None
SQLタスク: None
SQLタスク: None
SQLタスク: None
SQLタスク: None
SQLタスク: None
########################
ジョブ: terminate_model_serving (397311256284478)
スケジュール: CronSchedule(quartz_cron_expression='26 14 11 * * ?', timezone_id='Asia/Tokyo', pause_status=<PauseStatus.PAUSED: 'PAUSED'>)
SQLタスク: None
########################
ジョブ: test (3021)
スケジュール: None
SQLタスク: None

狙った通りにスケジュール済みのクエリーだけを一時停止しました。

Screenshot 2025-02-10 at 21.31.29.png

はじめてのDatabricks

はじめてのDatabricks

Databricks無料トライアル

Databricks無料トライアル

2
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?