API examples | Databricks on AWS [2022/12/6時点]の翻訳です。
本書は抄訳であり内容の正確性を保証するものではありません。正確な内容に関しては原文を参照ください。
本書には、Databricks REST APIの使用法をデモンストレーションするサンプルが含まれています。
以下のサンプルでは、<databricks-instance>
をお使いのDatabricksデプロイメントのワークスペースURLで置き換えてください。
認証
注意
セキュリティのベストプラクティスとして、自動化ツール、システム、スクリプト、アプリで認証を行う際、ワークスペースのユーザーではなくサービスプリンシパルに属するアクセストークンを使用することをお勧めします。サービスプリンシパルのアクセストークンを作成するには、Manage access tokens for a service principalをご覧ください。
REST APIの認証をどのように行うのかについては、Databricksパーソナルアクセストークンを用いた認証をご確認ください。
本書のサンプルでは、皆様がDatabricksパーソナルアクセストークンを使用していることを前提としています。以下のサンプルでは、<your-token>
をご自身のパーソナルアクセストークンで置き換えてください。curl
のサンプルでは、.netrcにDatabricks APIの資格情報を格納していることを前提としています。PythonのサンプルではBearer認証を使用しています。サンプルではコードにトークンを記述していますが、Databricksで資格情報を安全に活用するにはDatabricksにおけるシークレットの管理のユーザーガイドをご覧ください。
クラスター一覧のgzipを取得
この例では、Databricks REST API version 2.0を使用します。
curl -n -H "Accept-Encoding: gzip" https://<databricks-instance>/api/2.0/clusters/list > clusters.gz
大きなファイルをDBFSにアップロード
単体のAPI呼び出しによるデータアップロードの容量は1MBを超えることができません。DBFSに1MBより大きいファイルをアップロードするには、create
、addBlock
、close
の組み合わせであるstreaming APIを活用します。
Pythonを用いてこのアクションを行うサンプルを示します。このサンプルではDatabricks REST API version 2.0を使用します。
import json
import requests
import base64
DOMAIN = '<databricks-instance>'
TOKEN = '<your-token>'
BASE_URL = 'https://%s/api/2.0/dbfs/' % (DOMAIN)
def dbfs_rpc(action, body):
""" A helper function to make the DBFS API request, request/response is encoded/decoded as JSON """
response = requests.post(
BASE_URL + action,
headers={'Authorization': 'Bearer %s' % TOKEN },
json=body
)
return response.json()
# Create a handle that will be used to add blocks
handle = dbfs_rpc("create", {"path": "/temp/upload_large_file", "overwrite": "true"})['handle']
with open('/a/local/file') as f:
while True:
# A block can be at most 1MB
block = f.read(1 << 20)
if not block:
break
data = base64.standard_b64encode(block)
dbfs_rpc("add-block", {"handle": handle, "data": data})
# close the handle to finish uploading
dbfs_rpc("close", {"handle": handle})
Python 3クラスターの作成
以下のサンプルでは、Databricks REST APIとrequests Python HTTPライブラリを用いて、どのようにPython 3のクラスターを作成するのかを示します。このサンプルではDatabricks REST API version 2.0を使用します。
import requests
DOMAIN = '<databricks-instance>'
TOKEN = '<your-token>'
response = requests.post(
'https://%s/api/2.0/clusters/create' % (DOMAIN),
headers={'Authorization': 'Bearer %s' % TOKEN},
json={
"cluster_name": "my-cluster",
"spark_version": "5.5.x-scala2.11",
"node_type_id": "i3.xlarge",
"spark_env_vars": {
"PYSPARK_PYTHON": "/databricks/python3/bin/python3"
},
"num_workers": 25
}
)
if response.status_code == 200:
print(response.json()['cluster_id'])
else:
print("Error launching cluster: %s: %s" % (response.json()["error_code"], response.json()["message"]))
ハイコンカレンシークラスターの作成
以下の例では、Databricks REST APIを用いてHigh Concurrencyモードのクラスターをどのように起動するのかを示しています。このサンプルではDatabricks REST API version 2.0を使用します。
curl -n -X POST -H 'Content-Type: application/json' -d '{
"cluster_name": "high-concurrency-cluster",
"spark_version": "7.3.x-scala2.12",
"node_type_id": "i3.xlarge",
"spark_conf":{
"spark.databricks.cluster.profile":"serverless",
"spark.databricks.repl.allowedLanguages":"sql,python,r"
},
"aws_attributes":{
"zone_id":"us-west-2c",
"first_on_demand":1,
"availability":"SPOT_WITH_FALLBACK",
"spot_bid_price_percent":100
},
"custom_tags":{
"ResourceClass":"Serverless"
},
"autoscale":{
"min_workers":1,
"max_workers":2
},
"autotermination_minutes":10
}' https://<databricks-instance>/api/2.0/clusters/create
Jobs APIのサンプル
このセクションでは、Pythonジョブ、spark submitジョブ、JARジョブをどのように作成し、どのようにJARジョブを実行して、そのアウトプットを参照数Rのかを説明します。
Pythonジョブの作成
このサンプルではPythonジョブの作成方法を説明します。Apache Sparkの円周率の推定のサンプルを使用します。このサンプルではDatabricks REST API version 2.0を使用します。
-
サンプルを含むPythonファイルをダウンロードし、Databricks CLIを用いてDBFSにアップロードします。
Bashdbfs cp pi.py dbfs:/docs/pi.py
-
ジョブを作成します。
以下のサンプルではDatabricks RuntimeとDatabricks Lightを用いたジョブの作成方法を示しています。
Databricks Runtime
Bashcurl -n -X POST -H 'Content-Type: application/json' -d \ '{ "name": "SparkPi Python job", "new_cluster": { "spark_version": "7.3.x-scala2.12", "node_type_id": "i3.xlarge", "num_workers": 2 }, "spark_python_task": { "python_file": "dbfs:/docs/pi.py", "parameters": [ "10" ] } }' https://<databricks-instance>/api/2.0/jobs/create
Databricks Light
Bashcurl -n -X POST -H 'Content-Type: application/json' -d \ '{ "name": "SparkPi Python job", "new_cluster": { "spark_version": "apache-spark-2.4.x-scala2.11", "node_type_id": "i3.xlarge", "num_workers": 2 }, "spark_python_task": { "python_file": "dbfs:/docs/pi.py", "parameters": [ "10" ] } }' https://<databricks-instance>/api/2.0/jobs/create
spark-submitジョブの作成
このサンプルではspark-submitジョブの作成方法を説明します。Apache SparkのSparkPiサンプルとDatabricks REST APIバージョン2.0を使用します。
-
サンプルを含むJARをダウンロードし、Databricks CLIを用いてDBFSにアップロードします。
Bashdbfs cp SparkPi-assembly-0.1.jar dbfs:/docs/sparkpi.jar
-
ジョブを作成します。
Bashcurl -n \ -X POST -H 'Content-Type: application/json' -d \ '{ "name": "SparkPi spark-submit job", "new_cluster": { "spark_version": "7.3.x-scala2.12", "node_type_id": "r3.xlarge", "aws_attributes": {"availability": "ON_DEMAND"}, "num_workers": 2 }, "spark_submit_task": { "parameters": [ "--class", "org.apache.spark.examples.SparkPi", "dbfs:/docs/sparkpi.jar", "10" ] } }' https://<databricks-instance>/api/2.0/jobs/create
Rスクリプトのspark-submitジョブの作成および実行
このサンプルでは、Rスクリプトを実行するspark-submitジョブの作成方法を示しています。このサンプルではDatabricks REST API version 2.0を使用しています。
-
Databricks CLIを用いてRファイルをDBFSにアップロードします。
Bashdbfs cp your_code.R dbfs:/path/to/your_code.R
コードでSparkRを使用している場合、最初にパッケージをインストールしなくてはなりません。DatabricksランタイムにはSparkRのソースコードが含まれています。以下のサンプルのように、ローカルディレクトリからSparkRパッケージをインストールします。
Rinstall.packages("/databricks/spark/R/pkg", repos = NULL) library(SparkR) sparkR.session() n <- nrow(createDataFrame(iris)) write.csv(n, "/dbfs/path/to/num_rows.csv")
DatabricksはCRANから最新バージョンのsparklyrをインストールします。コードでsparklyrを使用している場合には、
spark_connect
にSparkマスターのURLを指定する必要があります。SparkマスターURLを構成するには、IPアドレスを取得するためにSPARK_LOCAL_IP
環境変数を使い、デフォルトのポート7077
を使用します。サンプルは以下の通りです。Rlibrary(sparklyr) master <- paste("spark://", Sys.getenv("SPARK_LOCAL_IP"), ":7077", sep="") sc <- spark_connect(master) iris_tbl <- copy_to(sc, iris) write.csv(iris_tbl, "/dbfs/path/to/sparklyr_iris.csv")
-
ジョブを作成します。
Bashcurl -n \ -X POST -H 'Content-Type: application/json' \ -d '{ "name": "R script spark-submit job", "new_cluster": { "spark_version": "7.3.x-scala2.12", "node_type_id": "i3.xlarge", "aws_attributes": {"availability": "SPOT"}, "num_workers": 2 }, "spark_submit_task": { "parameters": [ "dbfs:/path/to/your_code.R" ] } }' https://<databricks-instance>/api/2.0/jobs/create
これはジョブ実行に使用する
job-id
を返却します。 -
job-id
を用いてジョブを実行します。Bashcurl -n \ -X POST -H 'Content-Type: application/json' \ -d '{ "job_id": <job-id> }' https://<databricks-instance>/api/2.0/jobs/run-now
JARジョブの作成および実行
このサンプルではJARジョブの作成、実行方法を説明します。Apache SparkのSparkPiサンプルとDatabricks REST API version 2.0を使用します。
-
サンプルを含むJARをダウンロードします。
-
APIを用いてDatabricksインスタンスにJARをアップロードします。
Bashcurl -n \ -F filedata=@"SparkPi-assembly-0.1.jar" \ -F path="/docs/sparkpi.jar" \ -F overwrite=true \ https://<databricks-instance>/api/2.0/dbfs/put
呼び出しに成功すると
{}
が返ります。そうでない場合にはエラ〜メッセージが表示されます。 -
ジョブを作成する前にすべてのSparkバージョンの一覧を取得します。
Bashcurl -n https://<databricks-instance>/api/2.0/clusters/spark-versions
このサンプルでは
7.3.x-scala2.12
を使用します。Sparkクラスターのバージョンの詳細についてはRuntime version stringsをご覧ください。 -
ジョブを作成します。JARはライブラリとして指定され、Spark JARタスクではメインクラス名が参照されます。
Bashcurl -n -X POST -H 'Content-Type: application/json' \ -d '{ "name": "SparkPi JAR job", "new_cluster": { "spark_version": "7.3.x-scala2.12", "node_type_id": "r3.xlarge", "aws_attributes": {"availability": "ON_DEMAND"}, "num_workers": 2 }, "libraries": [{"jar": "dbfs:/docs/sparkpi.jar"}], "spark_jar_task": { "main_class_name":"org.apache.spark.examples.SparkPi", "parameters": "10" } }' https://<databricks-instance>/api/2.0/jobs/create
これは、ジョブの実行に使用する
job-id
を返却します。 -
run now
を用いてジョブを実行します。Bashcurl -n \ -X POST -H 'Content-Type: application/json' \ -d '{ "job_id": <job-id> }' https://<databricks-instance>/api/2.0/jobs/run-now
-
https://<databricks-instance>/#job/<job-id>
に移動すると、実行中のジョブを確認することができます。 -
上のリクエストから返却される情報を用いてAPIからチェックすることもできます。
Bashcurl -n https://<databricks-instance>/api/2.0/jobs/runs/get?run_id=<run-id> | jq
以下のような結果が返却されるはずです。
JSON{ "job_id": 35, "run_id": 30, "number_in_job": 1, "original_attempt_run_id": 30, "state": { "life_cycle_state": "TERMINATED", "result_state": "SUCCESS", "state_message": "" }, "task": { "spark_jar_task": { "jar_uri": "", "main_class_name": "org.apache.spark.examples.SparkPi", "parameters": [ "10" ], "run_as_repl": true } }, "cluster_spec": { "new_cluster": { "spark_version": "7.3.x-scala2.12", "node_type_id": "<node-type>", "enable_elastic_disk": false, "num_workers": 1 }, "libraries": [ { "jar": "dbfs:/docs/sparkpi.jar" } ] }, "cluster_instance": { "cluster_id": "0412-165350-type465", "spark_context_id": "5998195893958609953" }, "start_time": 1523552029282, "setup_duration": 211000, "execution_duration": 33000, "cleanup_duration": 2000, "trigger": "ONE_TIME", "creator_user_name": "...", "run_name": "SparkPi JAR job", "run_page_url": "<databricks-instance>/?o=3901135158661429#job/35/run/1", "run_type": "JOB_RUN" }
-
ジョブのアウトプットを参照するには、job run details pageにアクセスします。
Executing command, time = 1523552263909. Pi is roughly 3.13973913973914
テーブルアクセスコントロールが有効化されたクラスターの作成
テーブルアクセスコントロールが有効化されたクラスターを作成するには、リクエストボディのspark_conf
で以下の内容を指定します。このサンプルではDatabricks REST API version 2.0を使用しています。
curl -X POST https://<databricks-instance>/api/2.0/clusters/create -d'
{
"cluster_name": "my-cluster",
"spark_version": "7.3.x-scala2.12",
"node_type_id": "i3.xlarge",
"spark_conf": {
"spark.databricks.acl.dfAclsEnabled":true,
"spark.databricks.repl.allowedLanguages": "python,sql"
},
"aws_attributes": {
"availability": "SPOT",
"zone_id": "us-west-2a"
},
"num_workers": 1,
"custom_tags":{
"costcenter":"Tags",
"applicationname":"Tags1"
}
}'
クラスターログデリバリーのサンプル
Spark UIでSparkドライバーとエグゼキューターのログを参照できますが、このログをDBFSやS3にデリバリーすることができます。以下の例をご覧ください。
DBFSロケーションにログをデリバリーするクラスターの作成
以下のcurlコマンドはcluster_log_dbfs
というクラスターを作成し、Databricksに対してログをdbfs:/logs
にクラスターIDをプレフィックスとして送信するようにリクエストします。このサンプルではDatabricks REST API version 2.0を使用しています。
curl -n -X POST -H 'Content-Type: application/json' -d \
'{
"cluster_name": "cluster_log_dbfs",
"spark_version": "7.3.x-scala2.12",
"node_type_id": "i3.xlarge",
"num_workers": 1,
"cluster_log_conf": {
"dbfs": {
"destination": "dbfs:/logs"
}
}
}' https://<databricks-instance>/api/2.0/clusters/create
レスポンスにはクラスターIDが含まれます。
{"cluster_id":"1111-223344-abc55"}
クラスター作成後は、Databricksがログファイルを5分ごとに送信先に同期します。ドライバーログはdbfs:/logs/1111-223344-abc55/driver
、エグゼキューターログはdbfs:/logs/1111-223344-abc55/executor
にアップロードされます。
S3ロケーションにログをデリバリーするクラスターの作成
Databricksは、クラスターのインスタンスプロファイルを用いたS3ロケーションへのログのデリバリーをサポートしています。以下のコマンドはcluster_log_s3
というクラスターを作成し、指定されたインスタンスプロファイルを用いてs3://my-bucket/logs
にログを送信するようにDatabricksにリクエストします。このサンプルではDatabricks REST API version 2.0を使用しています。
curl -n -X POST --H 'Content-Type: application/json' d \
'{
"cluster_name": "cluster_log_s3",
"spark_version": "7.3.x-scala2.12",
"aws_attributes": {
"availability": "SPOT",
"zone_id": "us-west-2c",
"instance_profile_arn": "arn:aws:iam::12345678901234:instance-profile/YOURIAM"
},
"num_workers": 1,
"cluster_log_conf": {
"s3": {
"destination": "s3://my-bucket/logs",
"region": "us-west-2"
}
}
}' https://<databricks-instance>/api/2.0/clusters/create
Databricksは対応するインスタンスプロファイルを用いてS3にログをデリバリーします。Databricksでは、Amazon S3のマネージドキー(SSE-S3)とAWS KMSマネージドキー(SSE-KMS)の両方による暗号化をサポートしています。詳細はEncrypt data in S3 bucketsをご覧ください。
重要!
S3の送信先にログをアップロードし、あとで読み込むためにはインスタンスプロファイルのIAMロールが権限を有している必要があります。デフォルトの権限を変更するにはAPIリクエストでcanned_acl
を使用します。
ログデリバリーのステータスチェック
API経由でログデリバリーのステータスに関するクラスター情報を取得することができます。このサンプルではDatabricks REST API version 2.0を使用しています。
curl -n -H 'Content-Type: application/json' -d \
'{
"cluster_id": "1111-223344-abc55"
}' https://<databricks-instance>/api/2.0/clusters/get
ログのアップロードの最新のバッチが成功している場合、レスポンスには最後の試行のタイムスタンプのみが含まれます。
{
"cluster_log_status": {
"last_attempted": 1479338561
}
}
エラーがある場合には、レスポンスにエラーメッセージが表示されます。
{
"cluster_log_status": {
"last_attempted": 1479338561,
"last_exception": "Exception: Access Denied ..."
}
}
ワークスペースのサンプル
ここでは、ワークスペースオブジェクトの一覧、情報取得、作成、削除、エクスポート、インポートを行うためにWorkspace APIを用いたサンプルを示します。
ノートブックあるいはフォルダの一覧
以下のcurlコマンドはワークスペースのパスを一覧します。このサンプルではDatabricks REST API version 2.0を使用しています。
curl -n -X GET -H 'Content-Type: application/json' -d \
'{
"path": "/Users/user@example.com/"
}' https://<databricks-instance>/api/2.0/workspace/list
レスポンスにはステータスの一覧が含まれます。
{
"objects": [
{
"object_type": "DIRECTORY",
"path": "/Users/user@example.com/folder"
},
{
"object_type": "NOTEBOOK",
"language": "PYTHON",
"path": "/Users/user@example.com/notebook1"
},
{
"object_type": "NOTEBOOK",
"language": "SCALA",
"path": "/Users/user@example.com/notebook2"
}
]
}
パスがノートブックである場合、レスポンスには入力ノートブックのステータスを含む配列が格納されます
ノートブックあるいはフォルダの情報の取得
以下のcurlコマンドはワークスペースのパスをステータスを取得します。このサンプルではDatabricks REST API version 2.0を使用しています。
curl -n -X GET -H 'Content-Type: application/json' -d \
'{
"path": "/Users/user@example.com/"
}' https://<databricks-instance>/api/2.0/workspace/get-status
レスポンスには入力パスのステータスが含まれます。
{
"object_type": "DIRECTORY",
"path": "/Users/user@example.com"
}
フォルダの作成
以下のcurlコマンドはフォルダを作成します。これは、mkdir -p
のように再帰的にフォルダを作成します。すでにフォルダが存在している場合には何も行わずコマンドは成功します。このサンプルではDatabricks REST API version 2.0を使用しています。
curl -n -X POST -H 'Content-Type: application/json' -d \
'{
"path": "/Users/user@example.com/new/folder"
}' https://<databricks-instance>/api/2.0/workspace/mkdirs
リクエストが成功すると、空のJSON文字列が返却されます。
ノートブックあるいはフォルダの削除
以下のcurlコマンドはノートブックやフォルダを削除します。空ではないフォルダを再帰的に削除するためにrecursive
を有効化することができます。このサンプルではDatabricks REST API version 2.0を使用しています。
curl -n -X POST -H 'Content-Type: application/json' -d \
'{
"path": "/Users/user@example.com/new/folder",
"recursive": "false"
}' https://<databricks-instance>/api/2.0/workspace/delete
リクエストが成功すると、空のJSON文字列が返却されます。
ノートブックあるいはフォルダのエクスポート
以下のcurlコマンドはノートブックをエクスポートします。ノートブックはSOURCE
, HTML
, JUPYTER
, DBC
のフォーマットでエクスポートすることができます。このサンプルではDatabricks REST API version 2.0を使用しています。
curl -n -X GET \
-d '{ "path": "/Users/user@example.com/notebook", "format": "SOURCE" }' \
https://<databricks-instance>/api/2.0/workspace/export
レスポンスには、base64エンコードされたノートブックのコンテンツが含まれます。
{
"content": "Ly8gRGF0YWJyaWNrcyBub3RlYm9vayBzb3VyY2UKcHJpbnQoImhlbGxvLCB3b3JsZCIpCgovLyBDT01NQU5EIC0tLS0tLS0tLS0KCg=="
}
あるいは、エクスポートされたノートブックを直接ダウンロードすることができます。
curl -n -X GET "https://<databricks-instance>/api/2.0/workspace/export?format=SOURCE&direct_download=true&path=/Users/user@example.com/notebook"
レスポンスはエクスポートされたノートブックのコンテンツとなります。
ノートブックあるいはディレクトリのインポート
以下のcurlコマンドはノートブックをワークスペースにインポートします。複数のフォーマット(SOURCE
, HTML
, JUPYTER
, DBC
)がサポートされています。format
がSOURCE
の場合、language
を指定しなくてはなりません。content
パラメータはbase64エンコードされたノートブックのコンテンツです。既存のノートブックを上書きするためにoverwrite
を有効化することができます。このサンプルではDatabricks REST API version 2.0を使用しています。
curl -n -X POST -H 'Content-Type: application/json' -d \
'{
"path": "/Users/user@example.com/new-notebook",
"format": "SOURCE",
"language": "SCALA",
"content": "Ly8gRGF0YWJyaWNrcyBub3RlYm9vayBzb3VyY2UKcHJpbnQoImhlbGxvLCB3b3JsZCIpCgovLyBDT01NQU5EIC0tLS0tLS0tLS0KCg==",
"overwrite": "false"
}' https://<databricks-instance>/api/2.0/workspace/import
リクエストが成功すると、空のJSON文字列が返却されます。
あるいは、POSTのマルチパート経由でノートブックをインポートすることができます。
curl -n -X POST https://<databricks-instance>/api/2.0/workspace/import \
-F path="/Users/user@example.com/new-notebook" -F format=SOURCE -F language=SCALA -F overwrite=true -F content=@notebook.scala