はじめに
以下のGitHubリポジトリで、JARタスクを対象として、Databricks CLIを使ったDatabricksジョブの作成と実行のサンプルコードを公開しました。本記事ではこのサンプルコードを使ってDatabricksジョブの作成と実行を行う流れを説明します。
前提
- Databricks on AWSが対象
- DatabricksワークスペースでUnity Catalogが有効になっている
- 利用可能なカタログ、外部ロケーションがある
- Databricks CLIがインストールされていること。本記事は以下バージョンのDatabricks CLIで動作確認を行なっている
% databricks -v
Databricks CLI v0.215.0
GitHubリポジトリのディレクトリ構成
リポジトリのディレクトリ構成は以下の通りです。
% tree
.
├── README.md
├── artifacts
│ └── databricks-job-jar-minimal-example-1.0-SNAPSHOT.jar
├── pom.xml
├── scripts
│ ├── create-job.sh
│ └── run-job.sh
└── src
└── main
└── java
└── com
└── example
└── Main.java
-
artifacts
: 本手順の動作確認ですぐに利用できるよう、ビルド済みのJARファイルを格納 -
scripts/create-job.sh
: JARタスク用のDatabricksジョブ作成用のBashスクリプト -
scripts/run-job.sh
: 作成したDatabricksジョブを実行するBashスクリプト -
src/main/java/com/example/Main.java
: Javaのエントリーポイントのmainメソッドを定義
Main.javaの説明
上記のうち、動作確認用のサンプルコードMain.java
の処理内容について簡単に説明します。
Main.java
は以下の引数を受け取ります。
- args[0]: 待機する秒数(整数)
- args[1]: 終了コード。0なら正常終了、0以外なら異常終了(整数)
指定された秒数分待機した後、指定された終了コードで終了します。終了コードを指定できるようにしているのは、Databricksジョブの異常終了時の挙動を確認できるようにするためです。
Unity Catalogのオブジェクトの説明
続いてUnity Catalogのオブジェクトについて簡単に説明します。DatabricksではUnity Catalogを用いてテーブルなどのメタデータを管理します。
Unity Catalogの主要なオブジェクトは以下の通りです。
- メタストアは最上位の特別なリソースです
- テーブルは三層構造で管理します:カタログ > スキーマ > テーブル
- テーブルは文字通り表形式で表せる構造化データを格納します
- ボリュームは任意の形式のデータを格納できる入れ物で、CSVなどの入力データを格納したり、JARファイルなどのアーティファクトを格納するのに使います
- 外部ロケーションはS3などのクラウドストレージ上のパスとUnity Catalogのカタログのパスをマッピングするためのオブジェクトです
すべてのデータは顧客管理のクラウドストレージ(Amazon S3バケットなど)に格納されます。以下図のように、メタストア、カタログ、スキーマごとにクラウドストレージのパスをマッピングできます。
カタログは既存のものを使う前提として、この後の手順ではスキーマ、ボリュームの作成から始めていきます。
なお、ワークスペースの言語設定で日本語にできますが、この後のキャプチャは英語になっているのはご容赦ください。
1. スキーマの作成
まずはDatabricksワークスペースで、カタログエクスプローラーにアクセスし、使用するカタログを選択します。画面右上の[スキーマを作成]をクリックします。
スキーマ作成画面で任意のスキーマ名を入力して [作成] をクリックします。
ここでは databricks_job_jar_minimal_example
という名前にしています。
2. ボリュームの作成
作成したスキーマで [ボリュームを作成]をクリックします。
ボリューム作成画面で以下を入力し [作成] をクリックします。
- ボリューム名 (Volume name): 任意の名前、ここでは
artifacts
を指定 - ボリュームタイプ (Volume type): 外部ボリューム
- 外部ロケーション (External location): 利用可能な外部ロケーションを指定
- パス (Path): 任意の場所を指定、ここでは
s3://バケット名/external/databricks_job_jar_minimal_example/artifacts
を指定
これでボリューム /Volumes/カタログ名/databricks_job_jar_minimal_example/artifacts
が作成されました。Databricksジョブなどからこのパスでボリュームにアクセスできます。
Unity Catalogにおいて、S3パス s3://バケット名/external/databricks_job_jar_minimal_example/artifacts
と /Volumes/カタログ名/databricks_job_jar_minimal_example/artifacts
がマッピングされたことになります。
3. JARのアップロード
AWSマネジメントコンソールから、Unity Catalogボリュームに指定したS3パスにJARをアップロードします。
Databricksワークスペースのカタログエクスプローラーでボリュームを見ると(要ブラウザリフレッシュ)、JARが確認できます。ボリューム上のJARのパスをコピーしておきます。
4. Databricks CLIの認証
以下のドキュメントを参考に、この後Databricksジョブで使用するDatabricksワークスペースに対するDatabricks CLIの認証(OAuth ユーザー対マシン (U2M) 認証)を行なっておきます。
詳細は上記ドキュメントを見てもらえれば良いですが、以下のコマンドでworkspace-urlを書き換えて認証する形になります。
databricks auth login --host <workspace-url>
5. GitHubリポジトリのクローン
任意のディレクトリにGitHubリポジトリをクローンし、cd
します。
$ git clone https://github.com/nakazax/databricks-job-jar-minimal-example
$ cd databricks-job-jar-minimal-example
6. Databricksジョブの作成
scripts/create-job.sh
を使ってDatabricksジョブを作成します。
スクリプトの説明
このスクリプトは以下の処理を行います。
- JARのパスを引数で受け取る。指定がない場合はエラーで終了
- Databricks CLI経由でDatabricksジョブを作成
- ジョブ名は
databricks_job_jar_minimal_example
- タスクはJARタスク。JARのパスは引数で指定されたもの、メインクラスは
com.example.Main
- ジョブパラメータとして
wait_seconds
,exit_code
を定義。デフォルト値はそれぞれ10, 0 - クラスタは専用のシングルノードクラスタ(ジョブクラスタ)を使用
#!/bin/bash
if [ -z "$1" ]; then
echo "Error: jar_path is required."
echo "Usage: $0 <jar_path>"
exit 1
fi
JAR_PATH=$1
echo Creating job with parameters: jar_path: $JAR_PATH
databricks jobs create --json '{
"name": "databricks_job_jar_minimal_example",
"tasks": [
{
"task_key": "Task1",
"spark_jar_task": {
"main_class_name": "com.example.Main",
"parameters": [
"{{job.parameters.wait_seconds}}",
"{{job.parameters.exit_code}}"
],
"run_as_repl": true
},
"job_cluster_key": "single_node_job_cluster",
"libraries": [
{
"jar": "'"$JAR_PATH"'"
}
]
}
],
"job_clusters": [
{
"job_cluster_key": "single_node_job_cluster",
"new_cluster": {
"spark_version": "14.3.x-scala2.12",
"spark_conf": {
"spark.master": "local[*, 4]",
"spark.databricks.cluster.profile": "singleNode"
},
"aws_attributes": {
"first_on_demand": 1,
"availability": "SPOT_WITH_FALLBACK",
"spot_bid_price_percent": 100,
"ebs_volume_count": 0
},
"node_type_id": "i3.xlarge",
"custom_tags": {
"ResourceClass": "SingleNode"
},
"enable_elastic_disk": false,
"data_security_mode": "SINGLE_USER",
"runtime_engine": "STANDARD",
"num_workers": 0
}
}
],
"parameters": [
{
"name": "exit_code",
"default": "0"
},
{
"name": "wait_seconds",
"default": "10"
}
]
}'
スクリプトの実行
/Volume/path/for/JAR
を3. JARのアップロードでコピーしたボリュームのパスに差し替えて実行します。
$ sh scripts/create-job.sh /Volume/path/for/JAR
以下はリクエストとレスポンスのサンプルです。レスポンスのjob_id
の値をこの後使うのでコピーしておきます。
% sh scripts/create-job.sh /Volumes/hinak_catalog_usw2/databricks_job_jar_minimal_example/artifacts/databricks-job-jar-minimal-example-1.0-SNAPSHOT.jar
Creating job with parameters: jar_path: /Volumes/hinak_catalog_usw2/databricks_job_jar_minimal_example/artifacts/databricks-job-jar-minimal-example-1.0-SNAPSHOT.jar
{
"job_id":1003386274146420
}
実行結果の確認
Databricksワークスペースのワークフロー > ジョブを見ると、ジョブが作成されているはずです。
作成されたジョブのタスクを見ると、以下のようになっています。
7. Databricksジョブの実行
scripts/run-job.sh
を使ってDatabricksジョブを実行します。
スクリプトの説明
このスクリプトは以下の処理を行います。
- ジョブIDを第1引数で受け取る。指定がない場合はエラーで終了
- 待機秒数を第2引数、終了コードを第3引数でオプションとして受け取る。省略時のデフォルト値は0
- ジョブパラメータに待機秒数と終了コードを指定してジョブを実行、同期的に実行されるので終了するまで待機
- ジョブの実行が成功すれば「Job completed successfully.」を出力して正常終了、失敗すれば「Job failed with status: <ステータスコード>」を出力して異常終了
#!/bin/bash
if [ -z "$1" ]; then
echo "Error: job_id is required."
echo "Usage: $0 <job_id> [wait_seconds] [exit_code]"
exit 1
fi
JOB_ID=$1
WAIT_SECONDS=${2:-0}
EXIT_CODE=${3:-0}
cat <<EOM
Triggering job with parameters:
job_id: $JOB_ID
wait_seconds: $WAIT_SECONDS
exit_code: $EXIT_CODE
EOM
databricks jobs run-now --json '{
"job_id": '$JOB_ID',
"job_parameters": {
"wait_seconds": "'$WAIT_SECONDS'",
"exit_code": "'$EXIT_CODE'"
}
}'
job_run_status=$?
if [ $job_run_status -eq 0 ]; then
echo "Job completed successfully."
exit 0
else
echo "Job failed with status: $job_run_status"
exit 1
fi
スクリプトの実行
job_id
を6. Databricksジョブの作成で控えたジョブIDに差し替えて実行します。
sh scripts/run-job.sh job_id
以下はリクエストとレスポンスのサンプルです。ジョブが完了するまで同期的に実行されます。
% sh scripts/run-job.sh 1003386274146420
Triggering job with parameters:
job_id: 1003386274146420
wait_seconds: 0
exit_code: 0
{
"cleanup_duration":0,
"creator_user_name":"hiroyuki.nakazato@databricks.com",
"end_time":1711080183731,
"execution_duration":58000,
"job_clusters": [
{
"job_cluster_key":"single_node_job_cluster",
"new_cluster": {
"aws_attributes": {
"availability":"SPOT_WITH_FALLBACK",
"ebs_volume_count":0,
"first_on_demand":1,
"spot_bid_price_percent":100,
"zone_id":"us-west-2a"
},
"custom_tags": {
"ResourceClass":"SingleNode"
},
"data_security_mode":"SINGLE_USER",
"enable_elastic_disk":false,
"node_type_id":"i3.xlarge",
"num_workers":0,
"runtime_engine":"STANDARD",
"spark_conf": {
"spark.databricks.cluster.profile":"singleNode",
"spark.master":"local[*, 4]"
},
"spark_version":"14.3.x-scala2.12"
}
}
],
"job_id":1003386274146420,
"job_parameters": [
{
"default":"0",
"name":"exit_code",
"value":"0"
},
{
"default":"10",
"name":"wait_seconds",
"value":"0"
}
],
"number_in_job":345529882270737,
"original_attempt_run_id":345529882270737,
"run_id":345529882270737,
"run_name":"databricks_job_jar_minimal_example",
"run_page_url":"https://e2-demo-field-eng.cloud.databricks.com/?o=1444828305810485#job/1003386274146420/run/345529882270737",
"run_type":"JOB_RUN",
"setup_duration":266000,
"start_time":1711079859564,
"state": {
"life_cycle_state":"TERMINATED",
"result_state":"SUCCESS",
"state_message":"",
"user_cancelled_or_timedout":false
},
"tasks": [
{
"attempt_number":0,
"cleanup_duration":0,
"cluster_instance": {
"cluster_id":"0322-035740-tz2odkul",
"spark_context_id":"4779511158103819814"
},
"end_time":1711080183636,
"execution_duration":58000,
"libraries": [
{
"jar":"/Volumes/hinak_catalog_usw2/databricks_job_jar_minimal_example/artifacts/databricks-job-jar-minimal-example-1.0-SNAPSHOT.jar"
}
],
"run_id":506317854362030,
"run_if":"ALL_SUCCESS",
"setup_duration":266000,
"spark_jar_task": {
"jar_uri":"",
"main_class_name":"com.example.Main",
"parameters": [
"{{job.parameters.wait_seconds}}",
"{{job.parameters.exit_code}}"
]
},
"start_time":1711079859572,
"state": {
"life_cycle_state":"TERMINATED",
"result_state":"SUCCESS",
"state_message":"",
"user_cancelled_or_timedout":false
},
"task_key":"Task1"
}
],
"trigger":"ONE_TIME"
}
Job completed successfully.
実行結果の確認
ワークスペースで見てみると、ジョブの実行状況が確認できます。
まとめ
本記事ではDatabricks CLIを使ってJARタスクのDatabricksジョブを作成・実行する手順を紹介しました。ポイントは以下の通りです。
- Databricks CLIの認証
- Unity Catalogのオブジェクト(カタログ、スキーマ、ボリューム)の作成
- JARファイルのボリュームへのアップロード
- Databricks CLIを使ったジョブの作成と実行
サンプルコードを用意したので、ぜひ手を動かして試してみてください。
以上です。Databricksジョブを活用して、データ処理やML/AIのワークロードを自動化していきましょう!