こちらの続編です。
Pythonからマルチタスクジョブを構成できるのは便利ですが、GUIではクラスターを再利用することができます。では、Databricks SDK for Pythonからできるのでしょうか?
ということで調べました。結論できました。
まず、ジョブを作成するメソッドを確認すると、パラメータにjob_clusters
があります。
job_clusters – List[JobCluster] (optional) A list of job cluster specifications that can be shared and reused by tasks of this job. Libraries cannot be declared in a shared job cluster. You must declare dependent libraries in task settings.
このジョブのタスクで共有、再利用できるジョブクラスターの仕様のリストです。
まさにこれです。でも、どうやって指定するのか?よく見るとJobCluster
のリストとなっています。こちらを確認します。
class databricks.sdk.service.jobs.JobCluster(job_cluster_key: 'str', new_cluster: 'Optional[compute.ClusterSpec]' = None)
引数にjob_cluster_key
とnew_cluster
がありますのでピンときました。ジョブのタスクの仕様で、パラメータにjob_cluster_key
があります。
job_cluster_key: str | None = None
If job_cluster_key, this task is executed reusing the cluster specified in job.settings.job_clusters.
job_cluster_keyが指定されている場合には、
job.settings.job_clusters
で指定されているクラスターを再利用してこのタスクが実行されます。
まさしくこれでした。job.settings.job_clusters
がなんだろうと思いましたが、jobのインスタンス作成時にしている上記のパラメータjob_clusters
ということでした。
図にします。
上の構成を実装するコードが以下となります。
%pip install databricks-sdk --upgrade
dbutils.library.restartPython()
from databricks.sdk import WorkspaceClient
from databricks.sdk.service.jobs import Task, NotebookTask, Source, TaskDependency
from databricks.sdk.service.compute import ClusterSpec, RuntimeEngine
w = WorkspaceClient()
こちらが肝になります。
from databricks.sdk import WorkspaceClient
from databricks.sdk.service.jobs import Task, NotebookTask, Source, JobSettings, JobCluster
from databricks.sdk.service.compute import ClusterSpec, RuntimeEngine
w = WorkspaceClient()
job_name = "cluster re-use job"
description = "タスクのクラスターを再利用"
notebook_path = (
"/Users/takaaki.yayoi@databricks.com/20231210_workflow/ml-quickstart-training-jpn"
)
task_key = "first-task"
# 最新MLランタイム
latest = w.clusters.select_spark_version(ml=True)
print("ジョブの作成を試みます。お待ちください...\n")
j = w.jobs.create(
name=job_name,
# ジョブで共有するジョブクラスターのリスト
job_clusters=[
JobCluster(
job_cluster_key="first_job_cluster",
new_cluster = ClusterSpec(num_workers=1, spark_version=latest, node_type_id="i3.xlarge"),
),
JobCluster(
job_cluster_key="second_job_cluster",
new_cluster = ClusterSpec(num_workers=1, spark_version=latest, node_type_id="i3.xlarge"),
)
],
# ジョブを構成するタスクのリスト、タスクから上記ジョブクラスターを job_cluster_key で指定
tasks=[
Task(
description="first-task",
job_cluster_key="first_job_cluster", # first_job_clusterを使用
notebook_task=NotebookTask(
base_parameters={"n_estimators": 50},
notebook_path=notebook_path,
source=Source("WORKSPACE"),
),
task_key="first-task",
),
Task(
description="second-task",
job_cluster_key="second_job_cluster", # second_job_clusterを使用
notebook_task=NotebookTask(
base_parameters={"n_estimators": 50},
notebook_path=notebook_path,
source=Source("WORKSPACE"),
),
task_key="second-task",
),
Task(
description="third-task",
job_cluster_key="first_job_cluster", # first_job_clusterを使用
notebook_task=NotebookTask(
base_parameters={"n_estimators": 50},
notebook_path=notebook_path,
source=Source("WORKSPACE"),
),
task_key="third-task",
),
],
)
print(f"View the job at {w.config.host}/#job/{j.job_id}\n")
ご活用ください!