dbx by Databricks Labs | Databricks on AWS [2022/10/11時点]の翻訳です。
本書は抄訳であり内容の正確性を保証するものではありません。正確な内容に関しては原文を参照ください。
注意
Databricks Labsのdbxはas-isで提供されており、カスタマーテクニカルサポートチャネルを通じて、Databricksによって正式にはサポートされていません。サポート、質問、機能リクエストに関しては、Github上のdatabrickslabs/dbxリポジトリのIssuesページを通じてコミュニケーションすることができます。
Databricks Labsのdbxは、Databricksコマンドラインインタフェース(Databricks CLI)を拡張し、Databricksプラットフォームにおける迅速な開発ライフサイクルと継続的インテグレーション/継続的デリバリー・デプロイメント(CI/CD)のための機能を提供することを目的として設計されたオープンソースツールです。
dbx
は複数環境にまたがるジョブの起動とデプロイメントプロセスをシンプルにします。また、皆様のプロジェクトのパッケージングと、バージョン管理された状態でのDatabricks環境へのデリバリーを支援します。CLIファーストで設計されており、CI/CDパイプラインの内部、あるいはローカルツール(Visual Studio CodeやPyCharmのようなローカルIDEなど)の一部として積極的に使用できるように開発されています。
dbx
の典型的な開発ワークフローは以下のようになります。
-
まだリモートのrepoを持っていないのであれば、DatabricksのDatabricks ReposによるGit連携の要件に合致するGitプロバイダーのリモートリポジトリを作成します。
-
リモートリポジトリをお使いのDatabricksワークスペースにクローンします。
-
DatabricksワークスペースにクローンしたリポジトリにDatabricksノートブックを作成あるいは移動します。お使いのDatabricksクラスターで実行するコードのプロトタイピングを始めるために、このノートブックを使用します。
-
個別のヘルパークラスやヘルパー関数、設定ファイル、テストを追加することでお使いのノートブックをエンハンスし、モジュール化するために、
dbx
を使ってお気に入りのIDE、Gitがインストールされたローカル開発マシンにスイッチします。 -
お使いのローカル開発マシンにリモートのリポジトリをクローンします。
-
お使いのノートブックからコードを1台以上のローカルコードファイルに移動します。
-
ローカルでコーディングを行い、ローカルリポジトリから作業内容をリモートリポジトリにプッシュします。また、リモートリポジトリをお使いのDatabricksワークスペースに同期します。
ティップス
あるいは、リアルタイムでローカルファイルの変更を対応するワークスペースのファイルに同期するdbx syncを使用することができます。 -
迅速なプロトタイピングのためにDatabricksワークスペース上でノートブックを使用し続け、ノートブックから検証されたコードをローカルマシンに移動し続けます。コードのモジュール化、コードコンプリート、linting、ユニットテスト、Databricksとのライブ接続を必要としないコード、オブジェクトのステップスルーのデバッギングのようなタスクのためにローカルIDEを使い続け、Databricksへのライブな接続を必要としないオブジェクトやコードのステップスルーのデバッグを行います。
-
必要に応じて、お使いのローカルコードをターゲットのクラスターでのバッチ実行を行うために
dbx
を使用します(これは、Sparkクラスターでアプリケーションを起動するためにSparkのbin
ディレクトリでspark-submitスクリプトを実行するものと似ています)。 -
プロダクションに移行する準備ができたら、クラスターでのリモートリポジトリのコードの実行を自動化するために、GitHub ActionsやAzure DevOps、GitLabのようなCI/CDプラットフォームを使用します。
要件
dbx
を使うためには、コードでPython、Scala、Javaを使っているのかに関係なく、ローカルの開発マシンに以下をインストールする必要があります。
-
Pythonバージョン3.6以降
コードでPythonを使用している場合は、ターゲットのクラスターにインストールされているPythonのバージョンと一致させる必要があります。既存のクラスターにインストールされているPythonのバージョンを取得するには、クラスターのwebターミナルを使って、python --version
コマンドを実行します。お使いのクラスターのDatabricksランタイムのバージョンに対しては、Databricks runtime releasesの「System environment」のセクションをご覧ください。 -
コードでPythonを使用している場合には、ご自身の
dbx
プロジェクトで適切なPythonのバージョンを使用し、依存関係をパッケージングすることを確実にするために、Pythonの仮想環境を作成する手段。本書ではpipenvをカバーします。 -
dbxバージョン0.7.0以降。
pip install dbx
を実行することでPython Package Index(PyPI)からインストールできます。dbx
がインストールされているかどうかを確認するには以下のコマンドを実行します。Bashdbx --version
バージョン番号が返って来れば、
dbx
はインストールされています。バージョン番号が
0.7.0
未満の場合、以下のコマンドを実行してdbx
をアップグレードしてバージョン番号を再度確認してください。Bashpip install dbx --upgrade dbx --version # Or ... python -m pip install dbx --upgrade dbx --version
-
Databricks CLI、認証を設定します。
dbx
をインストールする際に自動でDatabricks CLIがインストールされます。以下の場所のいずれか、あるいは両方でローカル開発マシンの認証を設定することができます。- 環境変数
DATABRICKS_HOST
とDATABRICKS_TOKEN
(Databricks CLIバージョン0.8.0以降) -
.databrickscfg
ファイル内のプロファイル
dbx
は、これら二つの場所の認証クレディンシャルを検索します。dbx
は最初にマッチしたもののみを使用します。注意
dbx
は、Databricks CLI version 0.17.2以降を用いる場合、認証で.netrcの使用をサポートしていません。インストールされているDatabricks CLIのバージョンを確認するには、コマンドdatabricks --version
を実行してください。 - 環境変数
- ローカルとリモートの変更をプッシュ、同期するためのgit
以下のIDEごとの指示に従ってください。
注意
Databricksでは、dbx
と上述のIDEの利用を検証しています。しかし、dbx
は任意のIDEでも動作します。IDEを使わない(ターミナルのみ)という選択肢もあります。
dbx
は、単一のPythonコードファイル、コンパイルされたScala、JavaのJARファイルでの動作に最適化されています。dbx
は単一のRコードファイルやコンパイルされたRコードパッケージでは動作しません。これは、dbx
がJobs API 2.0と2.1と連携しますが、これらのAPIが単一のRコードファイルやコンパイルされたRコードパッケージをジョブとして実行できないためです。
Visual Studio Code
dbx
でVisual Studio CodeとPythonを使い始めるには以下の手順を完了してください。
ローカルの開発マシンでは、共通の要件に加えて以下のインストールが必要です。
-
Visual Studio Code向けのPython拡張。詳細はVisual Studio CodeウェブサイトのExtension Marketplaceをご覧ください。
詳細はVisual Studio CodeドキュメントのGetting Started with Python in VS Codeをご覧ください。
dbx
プロジェクト構成をセットアップするためには、以下のステップを実施します。
-
お使いのターミナルで空のフォルダーを作成します。これらの手順では、
dbx-demo
というフォルダーを使用します。dbx
プロジェクトのルートフォルダーにはお好きな名前をつけることができます。別な名前を使う際には、以下のステップでフォルダー名を置き換えてください。フォルダーを作成した後にそのフォルダーに移動し、そのフォルダーからVisual Studio Codeを起動します。LinuxあるいはmacOS
Bashmkdir dbx-demo cd dbx-demo code .
ティップ
code .
を実行した後にcommand not found: code
が表示される場合には、MicrosoftWebサイトのLaunching from the command lineをご覧ください。Windows
Powershellmd dbx-demo cd dbx-demo code .
-
Visual Studio CodeでプロジェクトのためのPython仮想環境を作成します。
- メニューバーでView > Terminalをクリックします。
-
dbx-demo
フォルダーのルートから、以下のオプションを指定してpipenv
コマンドを実行します。ここで<version>
は、3.7.5
のように、すでにローカルにインストールしている(そして、理想的にはターゲットのクラスターのPythonのバージョンにマッチする)Pythonのバージョンとなります。
Bashpipenv --python <version>
次のステップで必要となるので、
pipenv
コマンドの出力にあるVirtualenv location
の値をメモしておきます。 -
ターゲットのPythonインタプリタを選択し、Python仮想環境を有効化します。
- メニューバーでView > Command Paletteをクリックし、
Python: Select
とタイプし、Python: Select Interpreterをクリックします。 - 作成したPython仮想環境に対するパスの中でPythonインタプリタを選択します。(このパスは
pyenv
コマンドの出力のVirtualenv location
として一覧されます) - メニューバーでView > Command Paletteをクリックし、
Terminal: Create
とタイプし、Terminal: Create New Terminalをクリックします。
詳細に関しては、Visual Studio CodeドキュメントのUsing Python environments in VS Codeをご覧ください。
- メニューバーでView > Command Paletteをクリックし、
-
dbxプロジェクトの作成に進みます。
PyCharm
原文を参照ください。
IntelliJ IDEA
原文を参照ください。
Eclipse
原文を参照ください。
IDEなし(ターミナルのみ)
dbx
でターミナルとPythonを使い始めるには、以下のステップを完了してください。
dbx
プロジェクト構成をセットアップするためにターミナルを使用するには、以下のステップを実行します。
-
お使いのターミナルで空のフォルダーを作成します。これらの手順では、
dbx-demo
というフォルダーを使用します。dbx
プロジェクトのルートフォルダーにはお好きな名前をつけることができます。別な名前を使う際には、以下のステップでフォルダー名を置き換えてください。フォルダーを作成した後にそのフォルダーに移動します。LinuxあるいはmacOS
Bashmkdir dbx-demo cd dbx-demo
Windows
Powershellmd dbx-demo cd dbx-demo
-
dbx-demo
フォルダーのルートでpyenv
コマンドを実行することで、このプロジェクト向けのPython仮想環境を作成します。ここで<version>
は、3.7.5
のように、すでにローカルにインストールしている(そして、理想的にはターゲットのクラスターのPythonのバージョンにマッチする)Pythonのバージョンとなります。Bashpipenv --python <version>
-
pipenv shell
を実行してPython仮想環境を有効化します。Bashpipenv shell
-
dbxプロジェクトの作成に進みます。
dbxプロジェクトの作成
上のセクションでdbx
プロジェクト構成を作成したので、以下のいずれかのタイプのプロジェクトを作成することができます。
Python向け最小構成dbxプロジェクトの作成
以下の最小構成のdbx
プロジェクトはPythonとdbx
を使い始める際の、最もシンプルかつ迅速なアプローチとなります。ここでは、お使いのDatabricksワークスペースの既存のDatabricks all-purposeクラスター上で単一のPythonコードファイルをバッチ実行する様子をデモンストレーションします。
注意
all-purposeクラスター、jobクラスター上でのコードのバッチ実行、リモートコードアーティファクトのデプロイメント、CI/CDプラットフォームのセットアップをデモンストレーションするPytohn向けdbx
テンプレートプロジェクトを作成したい場合には、CI/CDサポートとPython向けのdbxテンプレートの作成までスキップしてください。
この手順を完了するには、お使いのワークスペースにall-purposeクラスターが存在している必要があります(クラスターの表示、クラスターの作成をご覧ください)。理想的には(必須ではありません)、お使いのPython仮想環境のPythonのバージョンはクラスターのPythonのバージョンと一致すべきです。クラスターのPythonバージョンを取得するには、クラスターのWebターミナルでpython --version
コマンドを実行します。
python --version
-
ターミナルで、
dbx
プロジェクトのルートフォルダーから以下のオプションを指定してdbx configure
コマンドを実行します。このコマンドによって、dbx
プロジェクトのルートフォルダーに.dbx
隠しフォルダーが作成されます。.dbx
フォルダーにはlock.json
とproject.json
ファイルが格納されます。Bashdbx configure --profile DEFAULT
注意
project.json
ファイルには、Databricks CLIの.databrickscfg
ファイル内のDEFAULT
プロファイルへのリファレンスが含まれます。dbx
に別のプロファイルを使わせたい場合には、DEFAULT
をターゲットのプロファイル名で置き換えてください。
Databricks CLIの.databrickscfg
ファイルのプロファイルではなく、dbx
に環境変数DATABRICKS_HOST
、DATABRICKS_TOKEN
を使わせたい場合には、project.json
のDEFAULT
をそのままにしておきます。dbx
はデフォルトでこのリファレンスを使用します。 -
dbx
ルートフォルダーにconf
というフォルダーを作成します。LinuxあるいはmacOS
Bashmkdir conf
Windows
Powershellmd conf
-
conf
ディレクトリに、以下の内容のdeployment.yaml
というファイルを追加します。YAMLbuild: no_build: true environments: default: workflows: - name: "dbx-demo-job" spark_python_task: python_file: "file://dbx-demo-job.py"
注意
deployment.yaml
には小文字のdefault
が含まれており、これはDatabricks CLIの.databrickscfg
ファイルのDEFAULT
プロファイルへのリファレンスとなっています。dbx
に別のプロファイルを使わせたい場合には、DEFAULT
をターゲットのプロファイル名で置き換えてください。例えば、
dbx
でDEFAULT
プロファイルではなく、Databricks CLIの.databrickscfg
ファイルのDEV
というプロファイルがある場合、お使いのdeployment.yaml
ファイルは以下の様になります。YAMLenvironments: default: workflows: - name: "dbx-demo-job" spark_python_task: python_file: "file://dbx-demo-job.py" dev: workflows: - name: "<some-other-job-name>" spark_python_task: python_file: "file://<some-other-filename>.py"
もし、Databricks CLIの
.databrickscfg
ファイルのプロファイルではなく、dbx
に環境変数DATABRICKS_HOST
、DATABRICKS_TOKEN
を使わせたい場合には、project.json
のDEFAULT
をそのままにしておきます。dbx
はデフォルトでこのリファレンスを使用します。ティップス
ジョブにSpark設定のキーバリューペアを追加するには、以下の様にspark_conf
フィールドを使用します。YAMLenvironments: default: workflows: - name: "dbx-demo-job" spark_conf: spark.speculation: true spark.streaming.ui.retainedBatches: 5 spark.driver.extraJavaOptions: "-verbose:gc -XX:+PrintGCDetails" # ...
ジョブにアクセス権を追加するには、以下の様に
permissions
フィールドを使用します。YAMLenvironments: default: workflows: - name: "dbx-demo-job" permissions: access_control_list: - user_name: "someone@example.com" permission_level: "IS_OWNER" - group_name: "some-group" permission_level: "CAN_VIEW" # ...
access_control_list
フィールドは網羅的である必要があることに注意してください。すなわち、リストには他のユーザーやグループのアクセス権も追加する必要があります。 -
dbx-demo-job.py
というファイルにクラスターで実行するコードを追加し、ファイルをdbx
プロジェクトのルートフォルダーに追加します。(すぐに使えそうなコードがない場合には、本書の後半にあるコードサンプルのPythonコードを使用することができます。)注意
このファイルの名前をdbx-demo-job.py
にする必要はありません。別のファイル名を指定した場合、conf/deployment.yaml
ファイルのpython_file
フィールドが一致するようにしてください。 -
以下のオプションを指定して
dbx execute
コマンドを実行します。このコマンドでは、<existing-cluster-id>
をお使いのワークスペースのターゲットクラスターのIDで置き換えてください。(IDを取得するにはクラスターURLとIDをご覧ください)Bashdbx execute --cluster-id=<existing-cluster-id> --job=dbx-demo-job --no-rebuild --no-package
-
ローカルで実行結果を参照するには、ターミナルの出力をご覧ください。クラスター上で実行結果を参照するには、お使いのクラスターのDriver logsタブのStandard outputを参照ください。(クラスターのドライバーノード、ワーカーノードのログもご覧ください)
-
次のステップに進んでください。
Scala、Java向け最小構成dbxプロジェクトの作成
原文を参照ください。
CI/CDサポートとPython向けのdbxテンプレートの作成
以下のPython向けdbx
テンプレートプロジェクトでは、DatabricksワークスペースでDatabricksのall-purposeクラスターやjobsクラスターでPythonコードのバッチ実行、リモートコードアーティファクトのデプロイメント、CI/CDプラットフォームセットアップをデモンストレーションします。(既存のall-purposeクラスターで単一のPythonコードファイルのバッチ実行をデモンストレーションするPython最小構成dbx
プロジェクトを作成するには、Python向け最小構成dbxプロジェクトの作成まで戻ってください)
-
ターミナル上で、
dbx
プロジェクトルートフォルダーからdbx init
コマンドを実行します。Bashdbx init
-
project_nameでは、プロジェクトの名前を入力するか、デフォルトのプロジェクト名を受け入れるためにEnterを押します。
-
versionには、お使いのプロジェクトの開始バージョン番号を入力するか、デフォルトのプロジェクトバージョンを受け入れるためにEnterを押します。
-
cloudには、プロジェクトで使用するDatabricksクラウドバージョンに対応する番号を入力するか、デフォルトを受け入れるためにEnterを押します。
-
cicd_toolには、プロジェクトで使いたいCI/CDに対応する番号を入力するか、デフォルトを受け入れるためにEnterを押します。
-
project_slugには、プロジェクトで使用するリソーで使用したいプレフィックスを指定するか、デフォルトを受け入れるためにEnterを押します。
-
workspace_dirには、プロジェクト向けのワークスペースディレクトリに対するローカルパスを指定するか、デフォルトを受け入れるためにEnterを押します。
-
artifact_locationには、プロジェクトのアーティファクトが書き込まれるDatabricksワークスペースのパスを入力するか、デフォルトを受け入れるためにEnterを押します。
-
profileには、プロジェクトで使用したいDatabricks CLI認証プロファイルの名前を入力するか、デフォルトを受け入れるためにEnterを押します。
ティップ
以下のようにパラメーターをハードコードして、dbx init
を実行することで上述のステップをスキップすることができます。
dbx init --template="python_basic" \
-p "project_name=cicd-sample-project" \
-p "cloud=AWS" \
-p "cicd_tool='GitHub Actions'" \
-p "profile=DEFAULT" \
--no-input
dbx
は、自動でパラメーターproject_slug
、workspace_dir
、artifact_location
を計算します。これら3つのパラメーターはオプションであり、より高度なユースケースでのみ有用なものとなります。
dbx
ドキュメントのCLI Referenceのinit
コマンドをご覧ください。
新たなプロジェクトで使用する際には、dbx
ドキュメントのBasic Python Templateをご覧ください。
次のステップもご覧ください。
コードサンプル
dbx
で使えるようなコードが手元にない場合には、以下のコードをdbx
でバッチ実行することで実験することができます。このコードは、お使いのワークスペースに小さいテーブルを作成し、テーブルをクエリーし、テーブルを削除します。
ティップ
テーブルを削除せずにそのままにしておきたい場合は、dbx
でバッチ実行する前に、このサンプルコードの最後の行をコメントアウトしてください。
# For testing and debugging of local objects, run
# "pip install pyspark=X.Y.Z", where "X.Y.Z"
# matches the version of PySpark
# on your target clusters.
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from datetime import date
spark = SparkSession.builder.appName("dbx-demo").getOrCreate()
# Create a DataFrame consisting of high and low temperatures
# by airport code and date.
schema = StructType([
StructField('AirportCode', StringType(), False),
StructField('Date', DateType(), False),
StructField('TempHighF', IntegerType(), False),
StructField('TempLowF', IntegerType(), False)
])
data = [
[ 'BLI', date(2021, 4, 3), 52, 43],
[ 'BLI', date(2021, 4, 2), 50, 38],
[ 'BLI', date(2021, 4, 1), 52, 41],
[ 'PDX', date(2021, 4, 3), 64, 45],
[ 'PDX', date(2021, 4, 2), 61, 41],
[ 'PDX', date(2021, 4, 1), 66, 39],
[ 'SEA', date(2021, 4, 3), 57, 43],
[ 'SEA', date(2021, 4, 2), 54, 39],
[ 'SEA', date(2021, 4, 1), 56, 41]
]
temps = spark.createDataFrame(data, schema)
# Create a table on the cluster and then fill
# the table with the DataFrame's contents.
# If the table already exists from a previous run,
# delete it first.
spark.sql('USE default')
spark.sql('DROP TABLE IF EXISTS demo_temps_table')
temps.write.saveAsTable('demo_temps_table')
# Query the table on the cluster, returning rows
# where the airport code is not BLI and the date is later
# than 2021-04-01. Group the results and order by high
# temperature in descending order.
df_temps = spark.sql("SELECT * FROM demo_temps_table " \
"WHERE AirportCode != 'BLI' AND Date > '2021-04-01' " \
"GROUP BY AirportCode, Date, TempHighF, TempLowF " \
"ORDER BY TempHighF DESC")
df_temps.show()
# Results:
#
# +-----------+----------+---------+--------+
# |AirportCode| Date|TempHighF|TempLowF|
# +-----------+----------+---------+--------+
# | PDX|2021-04-03| 64| 45|
# | PDX|2021-04-02| 61| 41|
# | SEA|2021-04-03| 57| 43|
# | SEA|2021-04-02| 54| 39|
# +-----------+----------+---------+--------+
# Clean up by deleting the table from the cluster.
spark.sql('DROP TABLE demo_temps_table')
package com.example.demo
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
import java.sql.Date
object SampleApp {
def main(args: Array[String]) {
val spark = SparkSession.builder().master("local").getOrCreate()
val schema = StructType(Array(
StructField("AirportCode", StringType, false),
StructField("Date", DateType, false),
StructField("TempHighF", IntegerType, false),
StructField("TempLowF", IntegerType, false)
))
val data = List(
Row("BLI", Date.valueOf("2021-04-03"), 52, 43),
Row("BLI", Date.valueOf("2021-04-02"), 50, 38),
Row("BLI", Date.valueOf("2021-04-01"), 52, 41),
Row("PDX", Date.valueOf("2021-04-03"), 64, 45),
Row("PDX", Date.valueOf("2021-04-02"), 61, 41),
Row("PDX", Date.valueOf("2021-04-01"), 66, 39),
Row("SEA", Date.valueOf("2021-04-03"), 57, 43),
Row("SEA", Date.valueOf("2021-04-02"), 54, 39),
Row("SEA", Date.valueOf("2021-04-01"), 56, 41)
)
val rdd = spark.sparkContext.makeRDD(data)
val temps = spark.createDataFrame(rdd, schema)
// Create a table on the Databricks cluster and then fill
// the table with the DataFrame's contents.
// If the table already exists from a previous run,
// delete it first.
spark.sql("USE default")
spark.sql("DROP TABLE IF EXISTS demo_temps_table")
temps.write.saveAsTable("demo_temps_table")
// Query the table on the Databricks cluster, returning rows
// where the airport code is not BLI and the date is later
// than 2021-04-01. Group the results and order by high
// temperature in descending order.
val df_temps = spark.sql("SELECT * FROM demo_temps_table " +
"WHERE AirportCode != 'BLI' AND Date > '2021-04-01' " +
"GROUP BY AirportCode, Date, TempHighF, TempLowF " +
"ORDER BY TempHighF DESC")
df_temps.show()
// Results:
//
// +-----------+----------+---------+--------+
// |AirportCode| Date|TempHighF|TempLowF|
// +-----------+----------+---------+--------+
// | PDX|2021-04-03| 64| 45|
// | PDX|2021-04-02| 61| 41|
// | SEA|2021-04-03| 57| 43|
// | SEA|2021-04-02| 54| 39|
// +-----------+----------+---------+--------+
// Clean up by deleting the table from the Databricks cluster.
spark.sql("DROP TABLE demo_temps_table")
}
}
package com.example.demo;
import java.util.ArrayList;
import java.util.List;
import java.sql.Date;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.*;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.Dataset;
public class SampleApp {
public static void main(String[] args) {
SparkSession spark = SparkSession
.builder()
.appName("Temps Demo")
.config("spark.master", "local")
.getOrCreate();
// Create a Spark DataFrame consisting of high and low temperatures
// by airport code and date.
StructType schema = new StructType(new StructField[] {
new StructField("AirportCode", DataTypes.StringType, false, Metadata.empty()),
new StructField("Date", DataTypes.DateType, false, Metadata.empty()),
new StructField("TempHighF", DataTypes.IntegerType, false, Metadata.empty()),
new StructField("TempLowF", DataTypes.IntegerType, false, Metadata.empty()),
});
List<Row> dataList = new ArrayList<Row>();
dataList.add(RowFactory.create("BLI", Date.valueOf("2021-04-03"), 52, 43));
dataList.add(RowFactory.create("BLI", Date.valueOf("2021-04-02"), 50, 38));
dataList.add(RowFactory.create("BLI", Date.valueOf("2021-04-01"), 52, 41));
dataList.add(RowFactory.create("PDX", Date.valueOf("2021-04-03"), 64, 45));
dataList.add(RowFactory.create("PDX", Date.valueOf("2021-04-02"), 61, 41));
dataList.add(RowFactory.create("PDX", Date.valueOf("2021-04-01"), 66, 39));
dataList.add(RowFactory.create("SEA", Date.valueOf("2021-04-03"), 57, 43));
dataList.add(RowFactory.create("SEA", Date.valueOf("2021-04-02"), 54, 39));
dataList.add(RowFactory.create("SEA", Date.valueOf("2021-04-01"), 56, 41));
Dataset<Row> temps = spark.createDataFrame(dataList, schema);
// Create a table on the Databricks cluster and then fill
// the table with the DataFrame's contents.
// If the table already exists from a previous run,
// delete it first.
spark.sql("USE default");
spark.sql("DROP TABLE IF EXISTS demo_temps_table");
temps.write().saveAsTable("demo_temps_table");
// Query the table on the Databricks cluster, returning rows
// where the airport code is not BLI and the date is later
// than 2021-04-01. Group the results and order by high
// temperature in descending order.
Dataset<Row> df_temps = spark.sql("SELECT * FROM demo_temps_table " +
"WHERE AirportCode != 'BLI' AND Date > '2021-04-01' " +
"GROUP BY AirportCode, Date, TempHighF, TempLowF " +
"ORDER BY TempHighF DESC");
df_temps.show();
// Results:
//
// +-----------+----------+---------+--------+
// |AirportCode| Date|TempHighF|TempLowF|
// +-----------+----------+---------+--------+
// | PDX|2021-04-03| 64| 45|
// | PDX|2021-04-02| 61| 41|
// | SEA|2021-04-03| 57| 43|
// | SEA|2021-04-02| 54| 39|
// +-----------+----------+---------+--------+
// Clean up by deleting the table from the Databricks cluster.
spark.sql("DROP TABLE demo_temps_table");
}
}
次のステップ
- 様々なタイプのall-purpose、jobsクラスター定義をサポートするために、お使いのconf/deployment.yamlファイルを拡張します。
-
conf/deployment.yaml
ファイルでマルチタスクジョブを宣言します。 -
conf/deployment.yaml
ファイルで名前付きプロパティを参照します。 - dbx executeコマンドで、クラスター上で新規ジョブとしてコードをバッチ実行します。
その他のリソース
- dbx deployコマンドによるDatabricksワークスペースストレージへのコードアーティファクトのバッチデプロイ。
- dbx launchコマンドによるクラスター上での既存ジョブのバッチ実行。
- ジョブレスデプロイメントを行うためにdbxの活用
- ワークフローデプロイメントをスケジュールする
- dbxとCI/CDの詳細
- dbxドキュメント
- GitHub上のdatabrickslabs/dbxリポジトリ
- dbxの制限