LoginSignup
1
0

Databricks CLIを用いたDatabricksジョブ(JARタスク)の作成と実行

Last updated at Posted at 2024-03-22

はじめに

以下のGitHubリポジトリで、JARタスクを対象として、Databricks CLIを使ったDatabricksジョブの作成と実行のサンプルコードを公開しました。本記事ではこのサンプルコードを使ってDatabricksジョブの作成と実行を行う流れを説明します。

前提

  • Databricks on AWSが対象
  • DatabricksワークスペースでUnity Catalogが有効になっている
  • 利用可能なカタログ、外部ロケーションがある
  • Databricks CLIがインストールされていること。本記事は以下バージョンのDatabricks CLIで動作確認を行なっている
Databricks CLI バージョン
% databricks -v
Databricks CLI v0.215.0

GitHubリポジトリのディレクトリ構成

リポジトリのディレクトリ構成は以下の通りです。

GitHubリポジトリのディレクトリ構成
% tree
.
├── README.md
├── artifacts
│   └── databricks-job-jar-minimal-example-1.0-SNAPSHOT.jar
├── pom.xml
├── scripts
│   ├── create-job.sh
│   └── run-job.sh
└── src
    └── main
        └── java
            └── com
                └── example
                    └── Main.java
  • artifacts: 本手順の動作確認ですぐに利用できるよう、ビルド済みのJARファイルを格納
  • scripts/create-job.sh: JARタスク用のDatabricksジョブ作成用のBashスクリプト
  • scripts/run-job.sh: 作成したDatabricksジョブを実行するBashスクリプト
  • src/main/java/com/example/Main.java: Javaのエントリーポイントのmainメソッドを定義

Main.javaの説明

上記のうち、動作確認用のサンプルコードMain.javaの処理内容について簡単に説明します。

Main.javaは以下の引数を受け取ります。

  • args[0]: 待機する秒数(整数)
  • args[1]: 終了コード。0なら正常終了、0以外なら異常終了(整数)

指定された秒数分待機した後、指定された終了コードで終了します。終了コードを指定できるようにしているのは、Databricksジョブの異常終了時の挙動を確認できるようにするためです。

Unity Catalogのオブジェクトの説明

続いてUnity Catalogのオブジェクトについて簡単に説明します。DatabricksではUnity Catalogを用いてテーブルなどのメタデータを管理します。

image.png
(出典) Unity Catalogのベストプラクティス

Unity Catalogの主要なオブジェクトは以下の通りです。

  • メタストアは最上位の特別なリソースです
  • テーブルは三層構造で管理します:カタログ > スキーマ > テーブル
  • テーブルは文字通り表形式で表せる構造化データを格納します
  • ボリュームは任意の形式のデータを格納できる入れ物で、CSVなどの入力データを格納したり、JARファイルなどのアーティファクトを格納するのに使います
  • 外部ロケーションはS3などのクラウドストレージ上のパスとUnity Catalogのカタログのパスをマッピングするためのオブジェクトです

すべてのデータは顧客管理のクラウドストレージ(Amazon S3バケットなど)に格納されます。以下図のように、メタストア、カタログ、スキーマごとにクラウドストレージのパスをマッピングできます。

image.png
(出典) Unity Catalogのベストプラクティス

カタログは既存のものを使う前提として、この後の手順ではスキーマ、ボリュームの作成から始めていきます。
なお、ワークスペースの言語設定で日本語にできますが、この後のキャプチャは英語になっているのはご容赦ください。

1. スキーマの作成

まずはDatabricksワークスペースで、カタログエクスプローラーにアクセスし、使用するカタログを選択します。画面右上の[スキーマを作成]をクリックします。

image.png

スキーマ作成画面で任意のスキーマ名を入力して [作成] をクリックします。
ここでは databricks_job_jar_minimal_example という名前にしています。

image.png

2. ボリュームの作成

作成したスキーマで [ボリュームを作成]をクリックします。

image.png

ボリューム作成画面で以下を入力し [作成] をクリックします。

  • ボリューム名 (Volume name): 任意の名前、ここでは artifacts を指定
  • ボリュームタイプ (Volume type): 外部ボリューム
  • 外部ロケーション (External location): 利用可能な外部ロケーションを指定
  • パス (Path): 任意の場所を指定、ここではs3://バケット名/external/databricks_job_jar_minimal_example/artifacts を指定

image.png

これでボリューム /Volumes/カタログ名/databricks_job_jar_minimal_example/artifacts が作成されました。Databricksジョブなどからこのパスでボリュームにアクセスできます。

image.png

Unity Catalogにおいて、S3パス s3://バケット名/external/databricks_job_jar_minimal_example/artifacts/Volumes/カタログ名/databricks_job_jar_minimal_example/artifacts がマッピングされたことになります。

image.png

3. JARのアップロード

AWSマネジメントコンソールから、Unity Catalogボリュームに指定したS3パスにJARをアップロードします。

image.png

Databricksワークスペースのカタログエクスプローラーでボリュームを見ると(要ブラウザリフレッシュ)、JARが確認できます。ボリューム上のJARのパスをコピーしておきます。

image.png

4. Databricks CLIの認証

以下のドキュメントを参考に、この後Databricksジョブで使用するDatabricksワークスペースに対するDatabricks CLIの認証(OAuth ユーザー対マシン (U2M) 認証)を行なっておきます。

詳細は上記ドキュメントを見てもらえれば良いですが、以下のコマンドでworkspace-urlを書き換えて認証する形になります。

Databricks CLIの認証
databricks auth login --host <workspace-url>

5. GitHubリポジトリのクローン

任意のディレクトリにGitHubリポジトリをクローンし、cdします。

GitHubリポジトリのクローン
$ git clone https://github.com/nakazax/databricks-job-jar-minimal-example
$ cd databricks-job-jar-minimal-example

6. Databricksジョブの作成

scripts/create-job.shを使ってDatabricksジョブを作成します。

スクリプトの説明

このスクリプトは以下の処理を行います。

  1. JARのパスを引数で受け取る。指定がない場合はエラーで終了
  2. Databricks CLI経由でDatabricksジョブを作成
  • ジョブ名は databricks_job_jar_minimal_example
  • タスクはJARタスク。JARのパスは引数で指定されたもの、メインクラスは com.example.Main
  • ジョブパラメータとして wait_seconds, exit_code を定義。デフォルト値はそれぞれ10, 0
  • クラスタは専用のシングルノードクラスタ(ジョブクラスタ)を使用
scripts/create-job.sh
#!/bin/bash

if [ -z "$1" ]; then
  echo "Error: jar_path is required."
  echo "Usage: $0 <jar_path>"
  exit 1
fi

JAR_PATH=$1
echo Creating job with parameters: jar_path: $JAR_PATH

databricks jobs create --json '{
  "name": "databricks_job_jar_minimal_example",
  "tasks": [
    {
      "task_key": "Task1",
      "spark_jar_task": {
        "main_class_name": "com.example.Main",
        "parameters": [
          "{{job.parameters.wait_seconds}}",
          "{{job.parameters.exit_code}}"
        ],
        "run_as_repl": true
      },
      "job_cluster_key": "single_node_job_cluster",
      "libraries": [
        {
          "jar": "'"$JAR_PATH"'"
        }
      ]
    }
  ],
  "job_clusters": [
    {
      "job_cluster_key": "single_node_job_cluster",
      "new_cluster": {
        "spark_version": "14.3.x-scala2.12",
        "spark_conf": {
          "spark.master": "local[*, 4]",
          "spark.databricks.cluster.profile": "singleNode"
        },
        "aws_attributes": {
          "first_on_demand": 1,
          "availability": "SPOT_WITH_FALLBACK",
          "spot_bid_price_percent": 100,
          "ebs_volume_count": 0
        },
        "node_type_id": "i3.xlarge",
        "custom_tags": {
          "ResourceClass": "SingleNode"
        },
        "enable_elastic_disk": false,
        "data_security_mode": "SINGLE_USER",
        "runtime_engine": "STANDARD",
        "num_workers": 0
      }
    }
  ],
  "parameters": [
    {
      "name": "exit_code",
      "default": "0"
    },
    {
      "name": "wait_seconds",
      "default": "10"
    }
  ]
}'

スクリプトの実行

/Volume/path/for/JAR3. JARのアップロードでコピーしたボリュームのパスに差し替えて実行します。

Databricksジョブ作成
$ sh scripts/create-job.sh /Volume/path/for/JAR

以下はリクエストとレスポンスのサンプルです。レスポンスのjob_idの値をこの後使うのでコピーしておきます。

Databricksジョブ作成 サンプル実行
% sh scripts/create-job.sh /Volumes/hinak_catalog_usw2/databricks_job_jar_minimal_example/artifacts/databricks-job-jar-minimal-example-1.0-SNAPSHOT.jar
Creating job with parameters: jar_path: /Volumes/hinak_catalog_usw2/databricks_job_jar_minimal_example/artifacts/databricks-job-jar-minimal-example-1.0-SNAPSHOT.jar
{
  "job_id":1003386274146420
}

実行結果の確認

Databricksワークスペースのワークフロー > ジョブを見ると、ジョブが作成されているはずです。
image.png

作成されたジョブのタスクを見ると、以下のようになっています。
image.png

7. Databricksジョブの実行

scripts/run-job.shを使ってDatabricksジョブを実行します。

スクリプトの説明

このスクリプトは以下の処理を行います。

  1. ジョブIDを第1引数で受け取る。指定がない場合はエラーで終了
  2. 待機秒数を第2引数、終了コードを第3引数でオプションとして受け取る。省略時のデフォルト値は0
  3. ジョブパラメータに待機秒数と終了コードを指定してジョブを実行、同期的に実行されるので終了するまで待機
  4. ジョブの実行が成功すれば「Job completed successfully.」を出力して正常終了、失敗すれば「Job failed with status: <ステータスコード>」を出力して異常終了
scripts/run-job.sh
#!/bin/bash

if [ -z "$1" ]; then
  echo "Error: job_id is required."
  echo "Usage: $0 <job_id> [wait_seconds] [exit_code]"
  exit 1
fi

JOB_ID=$1
WAIT_SECONDS=${2:-0}
EXIT_CODE=${3:-0}

cat <<EOM
Triggering job with parameters:
  job_id: $JOB_ID
  wait_seconds: $WAIT_SECONDS
  exit_code: $EXIT_CODE
EOM

databricks jobs run-now --json '{
  "job_id": '$JOB_ID',
  "job_parameters": {
    "wait_seconds": "'$WAIT_SECONDS'",
    "exit_code": "'$EXIT_CODE'"
  }
}'

job_run_status=$?
if [ $job_run_status -eq 0 ]; then
  echo "Job completed successfully."
  exit 0
else
  echo "Job failed with status: $job_run_status"
  exit 1
fi

スクリプトの実行

job_id6. Databricksジョブの作成で控えたジョブIDに差し替えて実行します。

Databricksジョブ実行
sh scripts/run-job.sh job_id

以下はリクエストとレスポンスのサンプルです。ジョブが完了するまで同期的に実行されます。

Databricksジョブ実行 サンプル
% sh scripts/run-job.sh 1003386274146420   
Triggering job with parameters:
  job_id: 1003386274146420
  wait_seconds: 0
  exit_code: 0
{
  "cleanup_duration":0,
  "creator_user_name":"hiroyuki.nakazato@databricks.com", 
  "end_time":1711080183731,
  "execution_duration":58000,
  "job_clusters": [
    {
      "job_cluster_key":"single_node_job_cluster",
      "new_cluster": {
        "aws_attributes": {
          "availability":"SPOT_WITH_FALLBACK",
          "ebs_volume_count":0,
          "first_on_demand":1,
          "spot_bid_price_percent":100,
          "zone_id":"us-west-2a"
        },
        "custom_tags": {
          "ResourceClass":"SingleNode"  
        },
        "data_security_mode":"SINGLE_USER",
        "enable_elastic_disk":false,
        "node_type_id":"i3.xlarge",
        "num_workers":0,
        "runtime_engine":"STANDARD",
        "spark_conf": {
          "spark.databricks.cluster.profile":"singleNode",
          "spark.master":"local[*, 4]"
        },
        "spark_version":"14.3.x-scala2.12"
      }
    }
  ],
  "job_id":1003386274146420,
  "job_parameters": [
    {
      "default":"0",
      "name":"exit_code",
      "value":"0"  
    },
    {
      "default":"10",
      "name":"wait_seconds",
      "value":"0"
    }
  ],
  "number_in_job":345529882270737,
  "original_attempt_run_id":345529882270737,
  "run_id":345529882270737,
  "run_name":"databricks_job_jar_minimal_example",
  "run_page_url":"https://e2-demo-field-eng.cloud.databricks.com/?o=1444828305810485#job/1003386274146420/run/345529882270737",
  "run_type":"JOB_RUN",
  "setup_duration":266000,
  "start_time":1711079859564,
  "state": {
    "life_cycle_state":"TERMINATED",
    "result_state":"SUCCESS",
    "state_message":"",
    "user_cancelled_or_timedout":false
  },
  "tasks": [
    {
      "attempt_number":0,
      "cleanup_duration":0,
      "cluster_instance": {
        "cluster_id":"0322-035740-tz2odkul",
        "spark_context_id":"4779511158103819814"
      },  
      "end_time":1711080183636,
      "execution_duration":58000,
      "libraries": [
        {
          "jar":"/Volumes/hinak_catalog_usw2/databricks_job_jar_minimal_example/artifacts/databricks-job-jar-minimal-example-1.0-SNAPSHOT.jar"
        }
      ],
      "run_id":506317854362030,
      "run_if":"ALL_SUCCESS",
      "setup_duration":266000,
      "spark_jar_task": {
        "jar_uri":"",
        "main_class_name":"com.example.Main",
        "parameters": [
          "{{job.parameters.wait_seconds}}",
          "{{job.parameters.exit_code}}"  
        ]
      },
      "start_time":1711079859572,
      "state": {
        "life_cycle_state":"TERMINATED", 
        "result_state":"SUCCESS",
        "state_message":"",
        "user_cancelled_or_timedout":false
      },
      "task_key":"Task1"
    }
  ],
  "trigger":"ONE_TIME"
}
Job completed successfully.

実行結果の確認

ワークスペースで見てみると、ジョブの実行状況が確認できます。
image.png

まとめ

本記事ではDatabricks CLIを使ってJARタスクのDatabricksジョブを作成・実行する手順を紹介しました。ポイントは以下の通りです。

  • Databricks CLIの認証
  • Unity Catalogのオブジェクト(カタログ、スキーマ、ボリューム)の作成
  • JARファイルのボリュームへのアップロード
  • Databricks CLIを使ったジョブの作成と実行

サンプルコードを用意したので、ぜひ手を動かして試してみてください。
以上です。Databricksジョブを活用して、データ処理やML/AIのワークロードを自動化していきましょう!

1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0