1
1

More than 1 year has passed since last update.

API examples | Databricks on AWS [2022/12/6時点]の翻訳です。

本書は抄訳であり内容の正確性を保証するものではありません。正確な内容に関しては原文を参照ください。

本書には、Databricks REST APIの使用法をデモンストレーションするサンプルが含まれています。

以下のサンプルでは、<databricks-instance>をお使いのDatabricksデプロイメントのワークスペースURLで置き換えてください。

認証

注意
セキュリティのベストプラクティスとして、自動化ツール、システム、スクリプト、アプリで認証を行う際、ワークスペースのユーザーではなくサービスプリンシパルに属するアクセストークンを使用することをお勧めします。サービスプリンシパルのアクセストークンを作成するには、Manage access tokens for a service principalをご覧ください。

REST APIの認証をどのように行うのかについては、Databricksパーソナルアクセストークンを用いた認証をご確認ください。

本書のサンプルでは、皆様がDatabricksパーソナルアクセストークンを使用していることを前提としています。以下のサンプルでは、<your-token>をご自身のパーソナルアクセストークンで置き換えてください。curlのサンプルでは、.netrcにDatabricks APIの資格情報を格納していることを前提としています。PythonのサンプルではBearer認証を使用しています。サンプルではコードにトークンを記述していますが、Databricksで資格情報を安全に活用するにはDatabricksにおけるシークレットの管理のユーザーガイドをご覧ください。

クラスター一覧のgzipを取得

この例では、Databricks REST API version 2.0を使用します。

Bash
curl -n -H "Accept-Encoding: gzip" https://<databricks-instance>/api/2.0/clusters/list > clusters.gz

大きなファイルをDBFSにアップロード

単体のAPI呼び出しによるデータアップロードの容量は1MBを超えることができません。DBFSに1MBより大きいファイルをアップロードするには、createaddBlockcloseの組み合わせであるstreaming APIを活用します。

Pythonを用いてこのアクションを行うサンプルを示します。このサンプルではDatabricks REST API version 2.0を使用します。

Python
import json
import requests
import base64

DOMAIN = '<databricks-instance>'
TOKEN = '<your-token>'
BASE_URL = 'https://%s/api/2.0/dbfs/' % (DOMAIN)

def dbfs_rpc(action, body):
  """ A helper function to make the DBFS API request, request/response is encoded/decoded as JSON """
  response = requests.post(
    BASE_URL + action,
    headers={'Authorization': 'Bearer %s' % TOKEN },
    json=body
  )
  return response.json()

# Create a handle that will be used to add blocks
handle = dbfs_rpc("create", {"path": "/temp/upload_large_file", "overwrite": "true"})['handle']
with open('/a/local/file') as f:
  while True:
    # A block can be at most 1MB
    block = f.read(1 << 20)
    if not block:
        break
    data = base64.standard_b64encode(block)
    dbfs_rpc("add-block", {"handle": handle, "data": data})
# close the handle to finish uploading
dbfs_rpc("close", {"handle": handle})

Python 3クラスターの作成

以下のサンプルでは、Databricks REST APIとrequests Python HTTPライブラリを用いて、どのようにPython 3のクラスターを作成するのかを示します。このサンプルではDatabricks REST API version 2.0を使用します。

Python
import requests

DOMAIN = '<databricks-instance>'
TOKEN = '<your-token>'

response = requests.post(
  'https://%s/api/2.0/clusters/create' % (DOMAIN),
  headers={'Authorization': 'Bearer %s' % TOKEN},
  json={
    "cluster_name": "my-cluster",
    "spark_version": "5.5.x-scala2.11",
    "node_type_id": "i3.xlarge",
    "spark_env_vars": {
      "PYSPARK_PYTHON": "/databricks/python3/bin/python3"
    },
    "num_workers": 25
  }
)

if response.status_code == 200:
  print(response.json()['cluster_id'])
else:
  print("Error launching cluster: %s: %s" % (response.json()["error_code"], response.json()["message"]))

ハイコンカレンシークラスターの作成

以下の例では、Databricks REST APIを用いてHigh Concurrencyモードのクラスターをどのように起動するのかを示しています。このサンプルではDatabricks REST API version 2.0を使用します。

Bash
  curl -n -X POST -H 'Content-Type: application/json' -d '{
    "cluster_name": "high-concurrency-cluster",
    "spark_version": "7.3.x-scala2.12",
    "node_type_id": "i3.xlarge",
    "spark_conf":{
          "spark.databricks.cluster.profile":"serverless",
          "spark.databricks.repl.allowedLanguages":"sql,python,r"
       },
       "aws_attributes":{
          "zone_id":"us-west-2c",
          "first_on_demand":1,
          "availability":"SPOT_WITH_FALLBACK",
          "spot_bid_price_percent":100
       },
     "custom_tags":{
          "ResourceClass":"Serverless"
       },
         "autoscale":{
          "min_workers":1,
          "max_workers":2
       },
    "autotermination_minutes":10
  }' https://<databricks-instance>/api/2.0/clusters/create

Jobs APIのサンプル

このセクションでは、Pythonジョブ、spark submitジョブ、JARジョブをどのように作成し、どのようにJARジョブを実行して、そのアウトプットを参照数Rのかを説明します。

Pythonジョブの作成

このサンプルではPythonジョブの作成方法を説明します。Apache Sparkの円周率の推定のサンプルを使用します。このサンプルではDatabricks REST API version 2.0を使用します。

  1. サンプルを含むPythonファイルをダウンロードし、Databricks CLIを用いてDBFSにアップロードします。

    Bash
    dbfs cp pi.py dbfs:/docs/pi.py
    
  2. ジョブを作成します。

    以下のサンプルではDatabricks RuntimeDatabricks Lightを用いたジョブの作成方法を示しています。

    Databricks Runtime

    Bash
    curl -n -X POST -H 'Content-Type: application/json' -d \
    '{
      "name": "SparkPi Python job",
      "new_cluster": {
        "spark_version": "7.3.x-scala2.12",
        "node_type_id": "i3.xlarge",
        "num_workers": 2
      },
      "spark_python_task": {
        "python_file": "dbfs:/docs/pi.py",
        "parameters": [
          "10"
        ]
      }
    }' https://<databricks-instance>/api/2.0/jobs/create
    

    Databricks Light

    Bash
    curl -n -X POST -H 'Content-Type: application/json' -d \
    '{
      "name": "SparkPi Python job",
      "new_cluster": {
        "spark_version": "apache-spark-2.4.x-scala2.11",
        "node_type_id": "i3.xlarge",
        "num_workers": 2
      },
      "spark_python_task": {
        "python_file": "dbfs:/docs/pi.py",
        "parameters": [
          "10"
        ]
      }
    }' https://<databricks-instance>/api/2.0/jobs/create
    

spark-submitジョブの作成

このサンプルではspark-submitジョブの作成方法を説明します。Apache SparkのSparkPiサンプルとDatabricks REST APIバージョン2.0を使用します。

  1. サンプルを含むJARをダウンロードし、Databricks CLIを用いてDBFSにアップロードします。

    Bash
    dbfs cp SparkPi-assembly-0.1.jar dbfs:/docs/sparkpi.jar
    
  2. ジョブを作成します。

    Bash
    curl -n \
    -X POST -H 'Content-Type: application/json' -d \
    '{
         "name": "SparkPi spark-submit job",
         "new_cluster": {
           "spark_version": "7.3.x-scala2.12",
           "node_type_id": "r3.xlarge",
           "aws_attributes": {"availability": "ON_DEMAND"},
           "num_workers": 2
           },
        "spark_submit_task": {
           "parameters": [
             "--class",
             "org.apache.spark.examples.SparkPi",
             "dbfs:/docs/sparkpi.jar",
             "10"
             ]
           }
    }' https://<databricks-instance>/api/2.0/jobs/create
    

Rスクリプトのspark-submitジョブの作成および実行

このサンプルでは、Rスクリプトを実行するspark-submitジョブの作成方法を示しています。このサンプルではDatabricks REST API version 2.0を使用しています。

  1. Databricks CLIを用いてRファイルをDBFSにアップロードします。

    Bash
    dbfs cp your_code.R dbfs:/path/to/your_code.R
    

    コードでSparkRを使用している場合、最初にパッケージをインストールしなくてはなりません。DatabricksランタイムにはSparkRのソースコードが含まれています。以下のサンプルのように、ローカルディレクトリからSparkRパッケージをインストールします。

    R
    install.packages("/databricks/spark/R/pkg", repos = NULL)
    library(SparkR)
    
    sparkR.session()
    n <- nrow(createDataFrame(iris))
    write.csv(n, "/dbfs/path/to/num_rows.csv")
    

    DatabricksはCRANから最新バージョンのsparklyrをインストールします。コードでsparklyrを使用している場合には、spark_connectにSparkマスターのURLを指定する必要があります。SparkマスターURLを構成するには、IPアドレスを取得するためにSPARK_LOCAL_IP環境変数を使い、デフォルトのポート7077を使用します。サンプルは以下の通りです。

    R
    library(sparklyr)
    
    master <- paste("spark://", Sys.getenv("SPARK_LOCAL_IP"), ":7077", sep="")
    sc <- spark_connect(master)
    iris_tbl <- copy_to(sc, iris)
    write.csv(iris_tbl, "/dbfs/path/to/sparklyr_iris.csv")
    
  2. ジョブを作成します。

    Bash
    curl -n \
    -X POST -H 'Content-Type: application/json' \
    -d '{
         "name": "R script spark-submit job",
         "new_cluster": {
           "spark_version": "7.3.x-scala2.12",
           "node_type_id": "i3.xlarge",
           "aws_attributes": {"availability": "SPOT"},
           "num_workers": 2
           },
        "spark_submit_task": {
           "parameters": [ "dbfs:/path/to/your_code.R" ]
           }
    }' https://<databricks-instance>/api/2.0/jobs/create
    

    これはジョブ実行に使用するjob-idを返却します。

  3. job-idを用いてジョブを実行します。

    Bash
    curl -n \
    -X POST -H 'Content-Type: application/json' \
    -d '{ "job_id": <job-id> }' https://<databricks-instance>/api/2.0/jobs/run-now
    

JARジョブの作成および実行

このサンプルではJARジョブの作成、実行方法を説明します。Apache SparkのSparkPiサンプルとDatabricks REST API version 2.0を使用します。

  1. サンプルを含むJARをダウンロードします。

  2. APIを用いてDatabricksインスタンスにJARをアップロードします。

    Bash
    curl -n \
    -F filedata=@"SparkPi-assembly-0.1.jar" \
    -F path="/docs/sparkpi.jar" \
    -F overwrite=true \
    https://<databricks-instance>/api/2.0/dbfs/put
    

    呼び出しに成功すると{}が返ります。そうでない場合にはエラ〜メッセージが表示されます。

  3. ジョブを作成する前にすべてのSparkバージョンの一覧を取得します。

    Bash
    curl -n https://<databricks-instance>/api/2.0/clusters/spark-versions
    

    このサンプルでは7.3.x-scala2.12を使用します。Sparkクラスターのバージョンの詳細についてはRuntime version stringsをご覧ください。

  4. ジョブを作成します。JARはライブラリとして指定され、Spark JARタスクではメインクラス名が参照されます。

    Bash
    curl -n -X POST -H 'Content-Type: application/json' \
    -d '{
          "name": "SparkPi JAR job",
          "new_cluster": {
            "spark_version": "7.3.x-scala2.12",
            "node_type_id": "r3.xlarge",
            "aws_attributes": {"availability": "ON_DEMAND"},
            "num_workers": 2
            },
         "libraries": [{"jar": "dbfs:/docs/sparkpi.jar"}],
         "spark_jar_task": {
            "main_class_name":"org.apache.spark.examples.SparkPi",
            "parameters": "10"
            }
    }' https://<databricks-instance>/api/2.0/jobs/create
    

    これは、ジョブの実行に使用するjob-idを返却します。

  5. run nowを用いてジョブを実行します。

    Bash
    curl -n \
    -X POST -H 'Content-Type: application/json' \
    -d '{ "job_id": <job-id> }' https://<databricks-instance>/api/2.0/jobs/run-now
    
  6. https://<databricks-instance>/#job/<job-id>に移動すると、実行中のジョブを確認することができます。

  7. 上のリクエストから返却される情報を用いてAPIからチェックすることもできます。

    Bash
    curl -n https://<databricks-instance>/api/2.0/jobs/runs/get?run_id=<run-id> | jq
    

    以下のような結果が返却されるはずです。

    JSON
    {
      "job_id": 35,
      "run_id": 30,
      "number_in_job": 1,
      "original_attempt_run_id": 30,
      "state": {
        "life_cycle_state": "TERMINATED",
        "result_state": "SUCCESS",
        "state_message": ""
      },
      "task": {
        "spark_jar_task": {
          "jar_uri": "",
          "main_class_name": "org.apache.spark.examples.SparkPi",
          "parameters": [
            "10"
          ],
          "run_as_repl": true
        }
      },
      "cluster_spec": {
        "new_cluster": {
          "spark_version": "7.3.x-scala2.12",
          "node_type_id": "<node-type>",
          "enable_elastic_disk": false,
          "num_workers": 1
        },
        "libraries": [
          {
            "jar": "dbfs:/docs/sparkpi.jar"
          }
        ]
      },
      "cluster_instance": {
        "cluster_id": "0412-165350-type465",
        "spark_context_id": "5998195893958609953"
      },
      "start_time": 1523552029282,
      "setup_duration": 211000,
      "execution_duration": 33000,
      "cleanup_duration": 2000,
      "trigger": "ONE_TIME",
      "creator_user_name": "...",
      "run_name": "SparkPi JAR job",
      "run_page_url": "<databricks-instance>/?o=3901135158661429#job/35/run/1",
      "run_type": "JOB_RUN"
    }
    
  8. ジョブのアウトプットを参照するには、job run details pageにアクセスします。

    Executing command, time = 1523552263909.
    Pi is roughly 3.13973913973914
    

テーブルアクセスコントロールが有効化されたクラスターの作成

テーブルアクセスコントロールが有効化されたクラスターを作成するには、リクエストボディのspark_confで以下の内容を指定します。このサンプルではDatabricks REST API version 2.0を使用しています。

Bash
curl -X POST https://<databricks-instance>/api/2.0/clusters/create -d'
{
  "cluster_name": "my-cluster",
  "spark_version": "7.3.x-scala2.12",
  "node_type_id": "i3.xlarge",
  "spark_conf": {
    "spark.databricks.acl.dfAclsEnabled":true,
    "spark.databricks.repl.allowedLanguages": "python,sql"
  },
  "aws_attributes": {
    "availability": "SPOT",
    "zone_id": "us-west-2a"
  },
  "num_workers": 1,
  "custom_tags":{
      "costcenter":"Tags",
      "applicationname":"Tags1"
  }
}'

クラスターログデリバリーのサンプル

Spark UIでSparkドライバーとエグゼキューターのログを参照できますが、このログをDBFSやS3にデリバリーすることができます。以下の例をご覧ください。

DBFSロケーションにログをデリバリーするクラスターの作成

以下のcurlコマンドはcluster_log_dbfsというクラスターを作成し、Databricksに対してログをdbfs:/logsにクラスターIDをプレフィックスとして送信するようにリクエストします。このサンプルではDatabricks REST API version 2.0を使用しています。

Bash
curl -n -X POST -H 'Content-Type: application/json' -d \
'{
  "cluster_name": "cluster_log_dbfs",
  "spark_version": "7.3.x-scala2.12",
  "node_type_id": "i3.xlarge",
  "num_workers": 1,
  "cluster_log_conf": {
    "dbfs": {
      "destination": "dbfs:/logs"
    }
  }
}' https://<databricks-instance>/api/2.0/clusters/create

レスポンスにはクラスターIDが含まれます。

JSON
{"cluster_id":"1111-223344-abc55"}

クラスター作成後は、Databricksがログファイルを5分ごとに送信先に同期します。ドライバーログはdbfs:/logs/1111-223344-abc55/driver、エグゼキューターログはdbfs:/logs/1111-223344-abc55/executorにアップロードされます。

S3ロケーションにログをデリバリーするクラスターの作成

Databricksは、クラスターのインスタンスプロファイルを用いたS3ロケーションへのログのデリバリーをサポートしています。以下のコマンドはcluster_log_s3というクラスターを作成し、指定されたインスタンスプロファイルを用いてs3://my-bucket/logsにログを送信するようにDatabricksにリクエストします。このサンプルではDatabricks REST API version 2.0を使用しています。

Bash
curl -n -X POST --H 'Content-Type: application/json' d \
'{
  "cluster_name": "cluster_log_s3",
  "spark_version": "7.3.x-scala2.12",
  "aws_attributes": {
    "availability": "SPOT",
    "zone_id": "us-west-2c",
    "instance_profile_arn": "arn:aws:iam::12345678901234:instance-profile/YOURIAM"
    },
    "num_workers": 1,
    "cluster_log_conf": {
    "s3": {
      "destination": "s3://my-bucket/logs",
    "region": "us-west-2"
    }
  }
}' https://<databricks-instance>/api/2.0/clusters/create

Databricksは対応するインスタンスプロファイルを用いてS3にログをデリバリーします。Databricksでは、Amazon S3のマネージドキー(SSE-S3)とAWS KMSマネージドキー(SSE-KMS)の両方による暗号化をサポートしています。詳細はEncrypt data in S3 bucketsをご覧ください。

重要!
S3の送信先にログをアップロードし、あとで読み込むためにはインスタンスプロファイルのIAMロールが権限を有している必要があります。デフォルトの権限を変更するにはAPIリクエストでcanned_aclを使用します。

ログデリバリーのステータスチェック

API経由でログデリバリーのステータスに関するクラスター情報を取得することができます。このサンプルではDatabricks REST API version 2.0を使用しています。

Bash
curl -n -H 'Content-Type: application/json' -d \
'{
  "cluster_id": "1111-223344-abc55"
}' https://<databricks-instance>/api/2.0/clusters/get

ログのアップロードの最新のバッチが成功している場合、レスポンスには最後の試行のタイムスタンプのみが含まれます。

JSON
{
  "cluster_log_status": {
    "last_attempted": 1479338561
  }
}

エラーがある場合には、レスポンスにエラーメッセージが表示されます。

JSON
{
  "cluster_log_status": {
    "last_attempted": 1479338561,
    "last_exception": "Exception: Access Denied ..."
  }
}

ワークスペースのサンプル

ここでは、ワークスペースオブジェクトの一覧、情報取得、作成、削除、エクスポート、インポートを行うためにWorkspace APIを用いたサンプルを示します。

ノートブックあるいはフォルダの一覧

以下のcurlコマンドはワークスペースのパスを一覧します。このサンプルではDatabricks REST API version 2.0を使用しています。

Bash
curl -n -X GET -H 'Content-Type: application/json' -d \
'{
  "path": "/Users/user@example.com/"
}' https://<databricks-instance>/api/2.0/workspace/list

レスポンスにはステータスの一覧が含まれます。

JSON
{
  "objects": [
    {
     "object_type": "DIRECTORY",
     "path": "/Users/user@example.com/folder"
    },
    {
     "object_type": "NOTEBOOK",
     "language": "PYTHON",
     "path": "/Users/user@example.com/notebook1"
    },
    {
     "object_type": "NOTEBOOK",
     "language": "SCALA",
     "path": "/Users/user@example.com/notebook2"
    }
  ]
}

パスがノートブックである場合、レスポンスには入力ノートブックのステータスを含む配列が格納されます

ノートブックあるいはフォルダの情報の取得

以下のcurlコマンドはワークスペースのパスをステータスを取得します。このサンプルではDatabricks REST API version 2.0を使用しています。

Bash
curl -n  -X GET -H 'Content-Type: application/json' -d \
'{
  "path": "/Users/user@example.com/"
}' https://<databricks-instance>/api/2.0/workspace/get-status

レスポンスには入力パスのステータスが含まれます。

JSON
{
  "object_type": "DIRECTORY",
  "path": "/Users/user@example.com"
}

フォルダの作成

以下のcurlコマンドはフォルダを作成します。これは、mkdir -pのように再帰的にフォルダを作成します。すでにフォルダが存在している場合には何も行わずコマンドは成功します。このサンプルではDatabricks REST API version 2.0を使用しています。

Bash
curl -n -X POST -H 'Content-Type: application/json' -d \
'{
  "path": "/Users/user@example.com/new/folder"
}' https://<databricks-instance>/api/2.0/workspace/mkdirs

リクエストが成功すると、空のJSON文字列が返却されます。

ノートブックあるいはフォルダの削除

以下のcurlコマンドはノートブックやフォルダを削除します。空ではないフォルダを再帰的に削除するためにrecursiveを有効化することができます。このサンプルではDatabricks REST API version 2.0を使用しています。

Bash
curl -n -X POST -H 'Content-Type: application/json' -d \
'{
  "path": "/Users/user@example.com/new/folder",
  "recursive": "false"
}' https://<databricks-instance>/api/2.0/workspace/delete

リクエストが成功すると、空のJSON文字列が返却されます。

ノートブックあるいはフォルダのエクスポート

以下のcurlコマンドはノートブックをエクスポートします。ノートブックはSOURCE, HTML, JUPYTER, DBCのフォーマットでエクスポートすることができます。このサンプルではDatabricks REST API version 2.0を使用しています。

Bash
curl -n  -X GET \
-d '{ "path": "/Users/user@example.com/notebook", "format": "SOURCE" }' \
https://<databricks-instance>/api/2.0/workspace/export

レスポンスには、base64エンコードされたノートブックのコンテンツが含まれます。

JSON
{
  "content": "Ly8gRGF0YWJyaWNrcyBub3RlYm9vayBzb3VyY2UKcHJpbnQoImhlbGxvLCB3b3JsZCIpCgovLyBDT01NQU5EIC0tLS0tLS0tLS0KCg=="
}

あるいは、エクスポートされたノートブックを直接ダウンロードすることができます。

Bash
curl -n -X GET "https://<databricks-instance>/api/2.0/workspace/export?format=SOURCE&direct_download=true&path=/Users/user@example.com/notebook"

レスポンスはエクスポートされたノートブックのコンテンツとなります。

ノートブックあるいはディレクトリのインポート

以下のcurlコマンドはノートブックをワークスペースにインポートします。複数のフォーマット(SOURCE, HTML, JUPYTER, DBC)がサポートされています。formatSOURCEの場合、languageを指定しなくてはなりません。contentパラメータはbase64エンコードされたノートブックのコンテンツです。既存のノートブックを上書きするためにoverwriteを有効化することができます。このサンプルではDatabricks REST API version 2.0を使用しています。

Bash
curl -n -X POST -H 'Content-Type: application/json' -d \
'{
  "path": "/Users/user@example.com/new-notebook",
  "format": "SOURCE",
  "language": "SCALA",
  "content": "Ly8gRGF0YWJyaWNrcyBub3RlYm9vayBzb3VyY2UKcHJpbnQoImhlbGxvLCB3b3JsZCIpCgovLyBDT01NQU5EIC0tLS0tLS0tLS0KCg==",
  "overwrite": "false"
}' https://<databricks-instance>/api/2.0/workspace/import

リクエストが成功すると、空のJSON文字列が返却されます。

あるいは、POSTのマルチパート経由でノートブックをインポートすることができます。

Bash
curl -n -X POST https://<databricks-instance>/api/2.0/workspace/import \
       -F path="/Users/user@example.com/new-notebook" -F format=SOURCE -F language=SCALA -F overwrite=true -F content=@notebook.scala

Databricks 無料トライアル

Databricks 無料トライアル

1
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
1