投稿背景
Databricks Workspace 上で REST API によりシークレットを登録する際に定義したクラスで実施していたのですが、Databricks SDK for Python によりシンプルに実装できるようになってしまったため不要となってしまいました。とりあえず、こんなことをやっていたことを残すために投稿しておきます。
コード
import json
import time
from typing import Dict, List
import requests
from pyspark.dbutils import DBUtils
from pyspark.sql import SparkSession
class DatabricksOperations:
"""
A class for performing various operations on Databricks.
"""
def __init__(self):
"""
Initializes a new instance of the DatabricksOperations class.
"""
self.defalut_db_token_comment = "Automatically created by REST API"
self.spark = SparkSession.builder.getOrCreate()
def get_databricks_url(self) -> str:
"""
Gets the URL of the Databricks workspace.
Returns:
str: The URL of the Databricks workspace.
Raises:
Exception: If the Databricks workspace URL cannot be retrieved.
"""
spark = SparkSession.getActiveSession()
dbutils = DBUtils(spark)
try:
browserhostname = json.loads(dbutils.notebook.entry_point.getDbutils().notebook().getContext().toJson())[
"tags"
]["browserHostName"]
db_url = f"https://{browserhostname}"
return db_url
except:
raise Exception("Can not get The databricks workspace url.")
def create_databricks_token_by_db_rest_api(
self,
db_url: str,
token: str,
lifetime_seconds: int = 60 * 60 * 24 * 30 * 3,
comment: str = "",
should_set_defalut_comment: bool = True,
) -> dict:
"""
Creates a new Databricks token using the Databricks REST API.
Args:
db_url (str): The URL of the Databricks workspace.
token (str): The Databricks personal access token to use for authentication.
lifetime_seconds (int, optional): The lifetime of the token in seconds. Defaults to 60 * 60 * 24 * 30 * 3.
comment (str, optional): The comment to use for the token. Defaults to ''.
should_set_defalut_comment (bool, optional): Whether to use the default comment for the token. Defaults to True.
Returns:
dict: A dictionary containing the new Databricks token.
Raises:
Exception: If an error occurs while creating the Databricks token.
"""
if should_set_defalut_comment:
comment = self.defalut_db_token_comment
response = requests.post(
f"{db_url}/api/2.0/token/create",
headers={"Authorization": f"Bearer {token}"},
json={
"lifetime_seconds": lifetime_seconds,
"comment": comment,
},
)
if response.status_code == 200:
return response.json()
elif response.status_code == 403:
raise Exception(response.text)
else:
raise Exception(
"Error geting the job: {0}: {1}".format(response.json()["error_code"], response.json()["message"])
)
def create_secret_scopes_by_db_rest_api(
self,
db_url: str,
token: str,
scope_name: str,
initial_manage_principal: str = "",
scope_backend_type: str = "DATABRICKS",
backend_azure_keyvault_resource_id: str = "",
backend_azure_keyvault_dns_name: str = "",
) -> None:
"""
Creates a new secret scope using the Databricks REST API.
Args:
db_url (str): The URL of the Databricks workspace.
token (str): The Databricks personal access token to use for authentication.
scope_name (str): The name of the secret scope to create.
initial_manage_principal (str, optional): The initial principal to manage the secret scope. Defaults to "".
scope_backend_type (str, optional): The type of backend for the secret scope. Defaults to "DATABRICKS".
backend_azure_keyvault_resource_id (str, optional): The resource ID of the Azure Key Vault backend. Defaults to "".
backend_azure_keyvault_dns_name (str, optional): The DNS name of the Azure Key Vault backend. Defaults to "".
Raises:
Exception: If an error occurs while creating the secret scope.
"""
# ToDo create `AZURE_KEYVAULT` type logic. current logic is only for DATABRICKS
headers = {
"scope": scope_name,
}
if scope_backend_type.lower() == "azure_keyvault":
headers["backend_azure_keyvault"] = {}
headers["backend_azure_keyvault"]["resource_id"] = backend_azure_keyvault_resource_id
headers["backend_azure_keyvault"]["dns_name"] = backend_azure_keyvault_dns_name
if initial_manage_principal != "":
headers["initial_manage_principal"] = initial_manage_principal
response = requests.post(
f"{db_url}/api/2.0/secrets/scopes/create",
headers={"Authorization": f"Bearer {token}"},
json=headers,
)
if response.status_code == 200:
print(f'Success:create scope "{scope_name}"')
elif response.status_code == 400:
# スコープが存在する場合の分岐
if response.json().get("error_code") == "RESOURCE_ALREADY_EXISTS":
print(f'scope "{scope_name}" already exists')
else:
raise Exception(response.text)
elif response.status_code == 403:
raise Exception(response.text)
else:
raise Exception(
"Error geting the job: {0}: {1}".format(response.json()["error_code"], response.json()["message"])
)
def delete_secret_scopes_by_db_rest_api(
self,
db_url: str,
token: str,
scope_name: str,
) -> None:
"""
Deletes a secret scope using the Databricks REST API.
Args:
db_url (str): The URL of the Databricks workspace.
token (str): The Databricks personal access token to use for authentication.
scope_name (str): The name of the secret scope to delete.
Raises:
Exception: If an error occurs while deleting the secret scope.
"""
response = requests.post(
f"{db_url}/api/2.0/secrets/scopes/delete",
headers={"Authorization": f"Bearer {token}"},
json={
"scope": scope_name,
},
)
if response.status_code == 200:
print(f'Success:delete scope "{scope_name}"')
elif response.status_code == 400:
# スコープが存在する場合の分岐
if response.json().get("error_code") == "RESOURCE_ALREADY_EXISTS":
print(f'scope "{scope_name}" already exists')
else:
raise Exception(response.text)
elif response.status_code == 403:
raise Exception(response.text)
else:
raise Exception(
"Error geting the job: {0}: {1}".format(response.json()["error_code"], response.json()["message"])
)
def list_secret_scopes_by_db_rest_api(
self,
db_url: str,
token: str,
scope_name: str = "",
) -> List[dict]:
"""
Lists the secret scopes using the Databricks REST API.
Args:
db_url (str): The URL of the Databricks workspace.
token (str): The Databricks personal access token to use for authentication.
scope_name (str, optional): The name of the secret scope to list. Defaults to "".
Returns:
List[dict]: A list of dictionaries containing the secret scopes.
Raises:
Exception: If an error occurs while listing the secret scopes.
"""
header = {}
if scope_name != "":
header[scope] = scope_name
response = requests.get(
f"{db_url}/api/2.0/secrets/scopes/list",
headers={"Authorization": f"Bearer {token}"},
json=header,
)
if response.status_code == 200:
return json.loads(response.text)
elif response.status_code == 400:
raise Exception(response.text)
elif response.status_code == 403:
raise Exception(response.text)
else:
raise Exception(
"Error geting the job: {0}: {1}".format(response.json()["error_code"], response.json()["message"])
)
def put_secrets_by_db_rest_api(
self,
db_url: str,
token: str,
scope_name: str,
secret_name: str,
secret_string_value: str = "",
bytes_value: bytes = b"",
) -> None:
"""
Creates or replaces a secret using the Databricks REST API.
Args:
db_url (str): The URL of the Databricks workspace.
token (str): The Databricks personal access token to use for authentication.
scope_name (str): The name of the secret scope to use.
secret_name (str): The name of the secret to create or replace.
secret_string_value (str, optional): The string value of the secret. Defaults to "".
bytes_value (bytes, optional): The bytes value of the secret. Defaults to b"".
Raises:
Exception: If an error occurs while creating or replacing the secret.
"""
header = {
"scope": scope_name,
"key": secrete_name,
}
if secret_string_value != "":
header["string_value"] = secret_string_value
if bytes_value != b"":
header["bytes_value"] = bytes_value
response = requests.post(
f"{db_url}/api/2.0/secrets/put",
headers={"Authorization": f"Bearer {token}"},
json=header,
)
if response.status_code == 200:
tgt_secret = f'{{"scope_name": "{scope_name}", "secrete_name": "{secrete_name}" }}'
print(f"Success:create or replace secrets {tgt_secret}")
elif response.status_code == 403:
raise Exception(response.text)
else:
raise Exception(
"Error geting the job: {0}: {1}".format(response.json()["error_code"], response.json()["message"])
)
def delete_secrets_by_db_rest_api(
self,
db_url: str,
token: str,
tgt_token_id: str,
) -> None:
"""
Deletes a Databricks token using the Databricks REST API.
Args:
db_url (str): The URL of the Databricks workspace.
token (str): The Databricks personal access token to use for authentication.
tgt_token_id (str): The ID of the token to delete.
Raises:
Exception: If an error occurs while deleting the Databricks token.
"""
response = requests.post(
f"{db_url}/api/2.0/token/delete",
headers={"Authorization": f"Bearer {token}"},
json={
"token_id": tgt_token_id,
},
)
if response.status_code == 200:
print(f"Success:delte token_id {tgt_token_id}")
elif response.status_code == 403:
raise Exception(response.text)
else:
raise Exception(
"Error geting the job: {0}: {1}".format(response.json()["error_code"], response.json()["message"])
)
def delete_databricks_token_by_db_rest_api(
self,
db_url: str,
token: str,
tgt_token_id: str,
) -> None:
"""
Deletes a Databricks token using the Databricks REST API.
Args:
db_url (str): The URL of the Databricks workspace.
token (str): The Databricks personal access token to use for authentication.
tgt_token_id (str): The ID of the token to delete.
Raises:
Exception: If an error occurs while deleting the Databricks token.
"""
response = requests.post(
f"{db_url}/api/2.0/token/delete",
headers={"Authorization": f"Bearer {token}"},
json={
"token_id": tgt_token_id,
},
)
if response.status_code == 200:
print(f"Success:delte token_id {tgt_token_id}")
elif response.status_code == 403:
raise Exception(response.text)
else:
raise Exception(
"Error geting the job: {0}: {1}".format(response.json()["error_code"], response.json()["message"])
)
def list_databricks_token_by_db_rest_api(
self,
db_url: str,
token: str,
) -> List[dict]:
"""
Lists the Databricks tokens using the Databricks REST API.
Args:
db_url (str): The URL of the Databricks workspace.
token (str): The Databricks personal access token to use for authentication.
Returns:
List[dict]: A list of dictionaries containing the Databricks tokens.
Raises:
Exception: If an error occurs while listing the Databricks tokens.
"""
response = requests.get(
f"{db_url}/api/2.0/token/list",
headers={"Authorization": f"Bearer {token}"},
)
if response.status_code == 200:
return response.json()
elif response.status_code == 403:
raise Exception(response.text)
else:
raise Exception(
"Error geting the job: {0}: {1}".format(response.json()["error_code"], response.json()["message"])
)
def delete_databricks_token_by_db_rest_api(
self,
db_url: str,
token: str,
token_id: str,
) -> None:
"""
Deletes a Databricks token using the Databricks REST API.
Args:
db_url (str): The URL of the Databricks workspace.
token (str): The Databricks personal access token to use for authentication.
token_id (str): The ID of the token to delete.
Raises:
Exception: If an error occurs while deleting the Databricks token.
"""
response = requests.post(
f"{db_url}/api/2.0/token/delete",
headers={"Authorization": f"Bearer {token}"},
json={
"token_id": token_id,
},
)
if response.status_code == 200:
print(f"Success:delete token_id {token_id}")
elif response.status_code == 403:
print(response.text)
else:
raise Exception(
"Error geting the job: {0}: {1}".format(response.json()["error_code"], response.json()["message"])
)
def delete_databricks_token_by_commnet(
self,
db_url: str,
token: str,
tgt_commnet: str,
) -> None:
"""
Deletes a Databricks token by comment using the Databricks REST API.
Args:
db_url (str): The URL of the Databricks workspace.
token (str): The Databricks personal access token to use for authentication.
tgt_commnet (str): The comment of the token to delete.
Raises:
Exception: If an error occurs while deleting the Databricks token.
"""
db_tokens = self.list_databricks_token_by_db_rest_api(
db_url=db_url,
token=token,
)
tgt_token_id_to_delete = []
for token_info in db_tokens["token_infos"]:
if token_info["comment"] == tgt_commnet:
tgt_token_id_to_delete.append(token_info["token_id"])
for token_id in tgt_token_id_to_delete:
self.delete_databricks_token_by_db_rest_api(
db_url,
token,
token_id,
)
def delete_databricks_token_by_defalut_db_token_comment(
self,
db_url: str,
token: str,
) -> None:
"""
Deletes a Databricks token by default comment using the Databricks REST API.
Args:
db_url (str): The URL of the Databricks workspace.
token (str): The Databricks personal access token to use for authentication.
Raises:
Exception: If an error occurs while deleting the Databricks token.
"""
tgt_commnet = self.defalut_db_token_comment
db_tokens = self.list_databricks_token_by_db_rest_api(
db_url=db_url,
token=token,
)
tgt_token_id_to_delete = []
for token_info in db_tokens["token_infos"]:
if token_info["comment"] == tgt_commnet and token_info["token_id"] != current_token_id:
tgt_token_id_to_delete.append(token_info["token_id"])
for token_id in tgt_token_id_to_delete:
self.delete_databricks_token_by_db_rest_api(
db_url,
token,
token_id,
)
利用例(Databrick Workspace 上で実施する想定)
スコープの登録
# Databricks notebook source
from utilities.common.databricks_operations__v1 import *
# COMMAND ----------
db_url = "https://adb-529218dddd444.4.azuredatabricks.net"
# Set databricks token with `tmp_token_comment` value comment
tmp_token = "dapi3fe0c3c3be0e341a9fafe4c4444445"
tmp_token_comment = "tmp"
# COMMAND ----------
deploy_env = ''
# COMMAND ----------
if deploy_env == "dev":
pass
else:
print('Other configs is set.')
secrets_scopes = [
{
"name": "qiita",
"initial_manage_principal": "",
"scope_backend_type": "DATABRICKS",
"backend_azure_keyvault_resource_id": "",
"backend_azure_keyvault_dns_name": "",
}
]
secrets_of_db_token = [
{
"secrete_scope": "qiita",
"secrete_name": "db_token",
}
]
secrets_of_db_url = [
{
"secrete_scope": "qiita",
"secrete_name": "db_url",
}
]
# COMMAND ----------
operations = DatabricksOperations()
# COMMAND ----------
operations.get_databricks_url()
# COMMAND ----------
# Create databricks token
token = operations.create_databricks_token_by_db_rest_api(
db_url,
tmp_token,
)["token_value"]
# COMMAND ----------
# Delete a secret scope
scope = ""
if scope != "":
operations.delete_secret_scopes_by_db_rest_api(
db_url=db_url,
token=token,
scope_name=scope,
)
# COMMAND ----------
# Create secret scopes
# ToDo create `AZURE_KEYVAULT` type logic. current logic is only for DATABRICKS
for scope in secrets_scopes:
operations.create_secret_scopes_by_db_rest_api(
db_url=db_url,
token=token,
scope_name=scope["name"],
initial_manage_principal=scope.get("initial_manage_principal", ""),
scope_backend_type = scope.get("scope_backend_type","DATABRICKS"),
backend_azure_keyvault_resource_id = scope.get("backend_azure_keyvault_resource_id",""),
backend_azure_keyvault_dns_name = scope.get("backend_azure_keyvault_dns_name",""),
)
# COMMAND ----------
# List secret scopes
current_secret_scope = operations.list_secret_scopes_by_db_rest_api(
db_url=db_url,
token=token,
)
print(current_secret_scope)
# COMMAND ----------
# Put the databricks token secret
for secret in secrets_of_db_token:
operations.put_secrets_by_db_rest_api(
db_url=db_url,
token=token,
scope_name=secret["secrete_scope"],
secrete_name=secret["secrete_name"],
secret_string_value=token,
)
# COMMAND ----------
# Check the databricks token secret
for secret in secrets_of_db_token:
tgt_secret = f"{secret['secrete_scope']}.{secret['secrete_name']}"
sercret_value = dbutils.secrets.get(secret["secrete_scope"], secret["secrete_name"])
print(f"{tgt_secret}: {sercret_value}")
# COMMAND ----------
# Put the databricks workspace secret
for secret in secrets_of_db_url:
operations.put_secrets_by_db_rest_api(
db_url=db_url,
token=token,
scope_name=secret["secrete_scope"],
secrete_name=secret["secrete_name"],
secret_string_value=db_url,
)
# COMMAND ----------
# Check the databricks token secret
for secret in secrets_of_db_url:
tgt_secret = f"{secret['secrete_scope']}.{secret['secrete_name']}"
sercret_value = dbutils.secrets.get(secret["secrete_scope"], secret["secrete_name"])
print(f"{tgt_secret}: {sercret_value}")
# COMMAND ----------
# Delete temp databricks token
db_url = dbutils.secrets.get(
secrets_of_db_url[0]["secrete_scope"], secrets_of_db_url[0]["secrete_name"]
)
token = dbutils.secrets.get(
secrets_of_db_token[0]["secrete_scope"], secrets_of_db_token[0]["secrete_name"]
)
operations.delete_databricks_token_by_commnet(
db_url=db_url,
token=token,
tgt_commnet=tmp_token_comment,
)
# COMMAND ----------
# exit
シークレットの登録
# Databricks notebook source
# MAGIC %run ../../../src/utilities__v1
# COMMAND ----------
deploy_env = ''
# COMMAND ----------
value_01 = "qiita_article_01"
value_02 = "qiita_article_02"
# COMMAND ----------
if deploy_env == "dev":
pass
else:
print('Other configs is set.')
secrets_of_db_url = {
"secrete_scope": "qita",
"secrete_name": "db_url",
}
secrets_of_db_token = {
"secrete_scope": "qita",
"secrete_name": "db_token",
}
secrets = [
{
"secrete_scope": "qita",
"secrete_name": "value_01",
"string_value": value_01,
},
{
"secrete_scope": "qita",
"secrete_name": "value_02",
"string_value": value_02,
},
]
# COMMAND ----------
operations = DatabricksOperations()
# COMMAND ----------
# Get current databricks workspace url and token
db_url = dbutils.secrets.get(
secrets_of_db_url["secrete_scope"], secrets_of_db_url["secrete_name"]
)
db_token = dbutils.secrets.get(
secrets_of_db_token["secrete_scope"], secrets_of_db_token["secrete_name"]
)
# COMMAND ----------
# Put the databricks token secret
for secret in secrets:
operations.put_secrets_by_db_rest_api(
db_url=db_url,
token=db_token,
scope_name=secret["secrete_scope"],
secrete_name=secret["secrete_name"],
secret_string_value=secret["string_value"],
)