Databricksクイックスタートガイドのコンテンツです。
Databricksにおけるジョブ管理で説明されている、Databricksジョブ機能をウォークスルーします。Jobs CLIを用いてローカルマシンからジョブを起動する方法、Jobs REST APIも説明します。
ジョブ用ノートブック
Databricksのジョブは、複数のノートブック、データパイプラインを組み合わせることができるマルチタスクジョブです。ここではシンプルにするために一つのノートブックを用いてジョブを構成します。
ノートブックはPython、R、SQLなどを用いて実装することができますが、ここではPythonノートブックを使用します。データはサンプルデータとしてDatabricks環境に格納されているflowersデータセットを使用します。パラメーターとして花の種類を受け取り、指定された花の種類のデータを抽出し、書き込みを行う処理を行います。
パラメーターをノートブックで受け取るには、Databricksウィジェットを使用します。
flower = dbutils.widgets.get('flower')
これにより、ノートブックはflower
というパラメーターを受け取れるようになります。このパラメーターを用いてデータを絞り込みます。
flowers = spark.read.format("delta").load("dbfs:/databricks-datasets/flowers/delta/").filter(f"label='{flower}'")
読み込んだデータをDelta形式で書き込みます。
import uuid
# 画像データは圧縮済みなのでParquetの圧縮をオフにします
spark.conf.set("spark.sql.parquet.compression.codec", "uncompressed")
path = "/tmp/flowers/" + str(uuid.uuid4()) # ユニークなIDでDBFS上の保存場所のパスを作成します
flowers.write.format("delta").mode("overwrite").save(path)
書き込んだデータを確認します。
saved_flowers = spark.read.format("delta").load(path)
display(saved_flowers)
ジョブの設定
上述のノートブックは、ノートブック単体でも処理できますが、ジョブとして登録することで定期実行を行うことや、後述するCLIを用いてジョブを起動することが可能となります。
- Task nameにタスク名を入力します。
- TypeでNotebookを選択し、上で作成したノートブックを選択します。
- ClusterはNew Job Clusterのままとします。
-
ParametersのAddをクリックし、Keyに
flower
を入力します。 - Createをクリックします。
右上のRun nowをクリックして動作確認をすることもできます。Scheduleで、スケジュールを組むこともできます。
ジョブ名の下のRunsタブで、実行結果を確認することができます。
Jobs CLIの活用
上で説明した手順はDatabricks上での設定・実行手順となりますが、ローカルマシン、別システムからジョブを起動したい場合には、Jobs CLIあるいはJobs APIを活用することができます。
Jobs CLIを使用するには、ローカルマシンにDatabricks CLIをインストールします。
pip install databricks-cli
CLIがDatabricksにアクセスするために、パーソナルアクセストークンを生成して、設定を行います。パーソナルアクセストークンは、サイドメニューのSettings > User Settingsを開き、Access TokensでGenerate New Tokenをクリックします。トークンが表示されるのでコピーしておきます。
以下のコマンドを実行すると、アクセス先のDatabricksホスト名、上で生成したトークンを指定します。
databricks configure --token
Jobs CLIはDatabricks CLIに含まれています。
Jobs CLIを実行する際には、上で定義したジョブのIDが必要となります。ジョブの詳細画面でジョブIDを確認することができます。
databricks jobs run-now --job-id <ジョブID> --notebook-params '{"flower": "<パラメーターの値>"}'
上のジョブIDを指定し、花の種類がroses
としてパラメーターを指定してジョブを実行するには以下を実行します。
databricks jobs run-now --job-id 157442 --notebook-params '{"flower": "roses"}'
Jobs APIの活用
Jobs API 2.1を用いることで、REST API経由でジョブを操作することができます。
REST APIを用いる際にもパーソナルアクセストークンが必要になります。HTTPリクエストヘッダーにBearerトークンとして埋め込みます。
以下ではノートブックでパーソナルアクセストークンを指定するケースを説明するので、シークレットを活用します。
# トークンは機密性の高い情報なので、ノートブックに直接記載しません。事前にCLIでシークレットとして登録しておいたトークンを呼び出します
token = dbutils.secrets.get("demo-token-takaaki.yayoi", "token")
os.environ["DATABRICKS_TOKEN"] = token
以下のような関数を用意しておきます。
import requests
import json
def call_api(job_id, flower):
# 使用しているワークスペースのホスト名を指定します
instance_id = '<Databricksホスト名>'
api_version = '/api/2.0'
api_command = '/jobs/run-now'
url = f"https://{instance_id}{api_version}{api_command}"
headers = {'Authorization': f'Bearer {os.environ.get("DATABRICKS_TOKEN")}'}
params = {
"job_id": job_id,
"notebook_params": {
"flower": flower
}
}
#print(params)
response = requests.post(
url = url,
headers = headers,
json = params
)
print(json.dumps(json.loads(response.text), indent = 2))
以下のようにジョブIDとパラメーターを指定してREST APIを呼び出すと、ジョブが起動します。
call_api(157442, "sunflowers")