概要
DatabricksにてJobsの実行結果をエクスポートするにはREST APIを利用する必要があり、その実施手順を紹介します。
この手順を理解することでバッチ処理によりちょっとしたレポートを作成できるようになり、Databricksの活用方法が広がります。
詳細は下記のGithub pagesのページをご確認ください。
コードを実行したい方は、下記のdbcファイルを取り込んでください。
https://github.com/manabian-/databricks_tecks_for_qiita/blob/main/tecks/get_notebook_results_by_restapi/dbc/get_notebook_results_by_restapi.dbc
検証環境
databricks runtime: 8.1.x-scala2.12
Python version: 3.8.8
pyspark version: 3.1.1.dev0
手順
1. 変数へ基本的な情報をセット
import json
# DatabricksのワークスペースURLを設定
browserHostName = json.loads(dbutils.notebook.entry_point
.getDbutils()
.notebook()
.getContext()
.toJson()
)['tags']['browserHostName']
# プロトコルを追加
db_url = f"https://{browserHostName}" # DatabricksのURL
# トークン。本来は、dbutils.secrets.get(scope = <SOCPE_NAME>, key = <TOKEN>)
token = "dapia5ac9cadd5988cd3874cef7113a92b41"
2. ノートブックをJobsとして実行
notebook_name = "code" # 実行するノートブック名を指定
notebook_path = "./includes/" + notebook_name # 実行するノートブックの相対パスと名称を指定
output_dir_notebookresult = "/dbfs/FileStore/qiita/get_notebook_results_by_restapi" # 実行結果の保存先を指定
# ノートブックをJobsとして実行
run_id = dbutils.notebook.run(notebook_path, 0)
3. Databricks REST APIにより、Jobsの実行結果を取得
# Databricks REST APIにより、Jobsの実行結果を取得
## 実行済みのJobsのデータをエクスポートする場合には、Job IDを設定
## run_id = "50809"
import requests
import os
response = requests.get(
f'{db_url}/api/2.0/jobs/runs/export?run_id={run_id}',
headers={'Authorization': f'Bearer {token}'}
)
if response.status_code == 200:
data = response.json().get("views")
elif response.status_code == 403:
print(response.text)
else:
print("Error geting the job: {0}: {1}".format(response.json()["error_code"],response.json()["message"]))
4. Databricks REST APIにより取得したHTMLデータを整形してファイルとして書き込む
# Databricks REST APIにより取得したHTMLデータを整形してファイルとして書き込む
output_file_names = set()
os.makedirs(output_dir_notebookresult, exist_ok=True)
html_file_path = f'{output_dir_notebookresult}/{output_file}.html'
for element in data:
if element.get("type", None).lower() != "notebook":
continue
output_file = element.get("name")
counter = 0
while output_file in output_file_names:
counter += 1
output_file = f'{output_file}_{counter}'
output_file_names.add(output_file)
with open(html_file_path, "w") as writer:
writer.write(str(element.get("content", "")))
output_file_path = ", ".join([html_file_path for f in output_file_names])