LoginSignup
1
1

More than 1 year has passed since last update.

dbx by Databricks Labs | Databricks on AWS [2022/10/11時点]の翻訳です。

本書は抄訳であり内容の正確性を保証するものではありません。正確な内容に関しては原文を参照ください。

注意
Databricks Labsdbxはas-isで提供されており、カスタマーテクニカルサポートチャネルを通じて、Databricksによって正式にはサポートされていません。サポート、質問、機能リクエストに関しては、Github上のdatabrickslabs/dbxリポジトリのIssuesページを通じてコミュニケーションすることができます。

Databricks Labsのdbxは、Databricksコマンドラインインタフェース(Databricks CLI)を拡張し、Databricksプラットフォームにおける迅速な開発ライフサイクルと継続的インテグレーション/継続的デリバリー・デプロイメント(CI/CD)のための機能を提供することを目的として設計されたオープンソースツールです。

dbxは複数環境にまたがるジョブの起動とデプロイメントプロセスをシンプルにします。また、皆様のプロジェクトのパッケージングと、バージョン管理された状態でのDatabricks環境へのデリバリーを支援します。CLIファーストで設計されており、CI/CDパイプラインの内部、あるいはローカルツール(Visual Studio CodeやPyCharmのようなローカルIDEなど)の一部として積極的に使用できるように開発されています。

dbxの典型的な開発ワークフローは以下のようになります。

  1. まだリモートのrepoを持っていないのであれば、DatabricksのDatabricks ReposによるGit連携の要件に合致するGitプロバイダーのリモートリポジトリを作成します。

  2. リモートリポジトリをお使いのDatabricksワークスペースにクローンします。

  3. DatabricksワークスペースにクローンしたリポジトリにDatabricksノートブック作成あるいは移動します。お使いのDatabricksクラスターで実行するコードのプロトタイピングを始めるために、このノートブックを使用します。

  4. 個別のヘルパークラスやヘルパー関数、設定ファイル、テストを追加することでお使いのノートブックをエンハンスし、モジュール化するために、dbxを使ってお気に入りのIDE、Gitがインストールされたローカル開発マシンにスイッチします。

  5. お使いのローカル開発マシンにリモートのリポジトリをクローンします。

  6. お使いのノートブックからコードを1台以上のローカルコードファイルに移動します。

  7. ローカルでコーディングを行い、ローカルリポジトリから作業内容をリモートリポジトリにプッシュします。また、リモートリポジトリをお使いのDatabricksワークスペースに同期します。

    ティップス
    あるいは、リアルタイムでローカルファイルの変更を対応するワークスペースのファイルに同期するdbx syncを使用することができます。

  8. 迅速なプロトタイピングのためにDatabricksワークスペース上でノートブックを使用し続け、ノートブックから検証されたコードをローカルマシンに移動し続けます。コードのモジュール化、コードコンプリート、linting、ユニットテスト、Databricksとのライブ接続を必要としないコード、オブジェクトのステップスルーのデバッギングのようなタスクのためにローカルIDEを使い続け、Databricksへのライブな接続を必要としないオブジェクトやコードのステップスルーのデバッグを行います。

  9. 必要に応じて、お使いのローカルコードをターゲットのクラスターでのバッチ実行を行うためにdbxを使用します(これは、Sparkクラスターでアプリケーションを起動するためにSparkのbinディレクトリでspark-submitスクリプトを実行するものと似ています)。

  10. プロダクションに移行する準備ができたら、クラスターでのリモートリポジトリのコードの実行を自動化するために、GitHub ActionsAzure DevOpsGitLabのようなCI/CDプラットフォームを使用します。

要件

dbxを使うためには、コードでPython、Scala、Javaを使っているのかに関係なく、ローカルの開発マシンに以下をインストールする必要があります。

  • Pythonバージョン3.6以降
    コードでPythonを使用している場合は、ターゲットのクラスターにインストールされているPythonのバージョンと一致させる必要があります。既存のクラスターにインストールされているPythonのバージョンを取得するには、クラスターのwebターミナルを使って、python --versionコマンドを実行します。お使いのクラスターのDatabricksランタイムのバージョンに対しては、Databricks runtime releasesの「System environment」のセクションをご覧ください。

  • pip

  • コードでPythonを使用している場合には、ご自身のdbxプロジェクトで適切なPythonのバージョンを使用し、依存関係をパッケージングすることを確実にするために、Pythonの仮想環境を作成する手段。本書ではpipenvをカバーします。

  • dbxバージョン0.7.0以降。pip install dbxを実行することでPython Package Index(PyPI)からインストールできます。

    dbxがインストールされているかどうかを確認するには以下のコマンドを実行します。

    Bash
    dbx --version
    

    バージョン番号が返って来れば、dbxはインストールされています。

    バージョン番号が0.7.0未満の場合、以下のコマンドを実行してdbxをアップグレードしてバージョン番号を再度確認してください。

    Bash
    pip install dbx --upgrade
    dbx --version
    
    # Or ...
    python -m pip install dbx --upgrade
    dbx --version
    
  • Databricks CLI認証を設定します。dbxをインストールする際に自動でDatabricks CLIがインストールされます。以下の場所のいずれか、あるいは両方でローカル開発マシンの認証を設定することができます。

    • 環境変数DATABRICKS_HOSTDATABRICKS_TOKEN(Databricks CLIバージョン0.8.0以降)
    • .databrickscfgファイル内のプロファイル

    dbxは、これら二つの場所の認証クレディンシャルを検索します。dbxは最初にマッチしたもののみを使用します。

    注意
    dbxは、Databricks CLI version 0.17.2以降を用いる場合、認証で.netrcの使用をサポートしていません。インストールされているDatabricks CLIのバージョンを確認するには、コマンドdatabricks --versionを実行してください。

  • ローカルとリモートの変更をプッシュ、同期するためのgit

以下のIDEごとの指示に従ってください。

注意
Databricksでは、dbxと上述のIDEの利用を検証しています。しかし、dbxは任意のIDEでも動作します。IDEを使わない(ターミナルのみ)という選択肢もあります。

dbxは、単一のPythonコードファイル、コンパイルされたScala、JavaのJARファイルでの動作に最適化されています。dbxは単一のRコードファイルやコンパイルされたRコードパッケージでは動作しません。これは、dbxがJobs API 2.02.1と連携しますが、これらのAPIが単一のRコードファイルやコンパイルされたRコードパッケージをジョブとして実行できないためです。

Visual Studio Code

dbxVisual Studio CodeとPythonを使い始めるには以下の手順を完了してください。

ローカルの開発マシンでは、共通の要件に加えて以下のインストールが必要です。

dbxプロジェクト構成をセットアップするためには、以下のステップを実施します。

  1. お使いのターミナルで空のフォルダーを作成します。これらの手順では、dbx-demoというフォルダーを使用します。dbxプロジェクトのルートフォルダーにはお好きな名前をつけることができます。別な名前を使う際には、以下のステップでフォルダー名を置き換えてください。フォルダーを作成した後にそのフォルダーに移動し、そのフォルダーからVisual Studio Codeを起動します。

    LinuxあるいはmacOS

    Bash
    mkdir dbx-demo
    cd dbx-demo
    code .
    

    ティップ
    code .を実行した後にcommand not found: codeが表示される場合には、MicrosoftWebサイトのLaunching from the command lineをご覧ください。

    Windows

    Powershell
    md dbx-demo
    cd dbx-demo
    code .
    
  2. Visual Studio CodeでプロジェクトのためのPython仮想環境を作成します。

    1. メニューバーでView > Terminalをクリックします。
    2. dbx-demoフォルダーのルートから、以下のオプションを指定してpipenvコマンドを実行します。ここで<version>は、3.7.5のように、すでにローカルにインストールしている(そして、理想的にはターゲットのクラスターのPythonのバージョンにマッチする)Pythonのバージョンとなります。
    Bash
    pipenv --python <version>
    

    次のステップで必要となるので、pipenvコマンドの出力にあるVirtualenv locationの値をメモしておきます。

  3. ターゲットのPythonインタプリタを選択し、Python仮想環境を有効化します。

    1. メニューバーでView > Command Paletteをクリックし、Python: Selectとタイプし、Python: Select Interpreterをクリックします。
    2. 作成したPython仮想環境に対するパスの中でPythonインタプリタを選択します。(このパスはpyenvコマンドの出力のVirtualenv locationとして一覧されます)
    3. メニューバーでView > Command Paletteをクリックし、Terminal: Createとタイプし、Terminal: Create New Terminalをクリックします。

    詳細に関しては、Visual Studio CodeドキュメントのUsing Python environments in VS Codeをご覧ください。

  4. dbxプロジェクトの作成に進みます。

PyCharm

原文を参照ください。

IntelliJ IDEA

原文を参照ください。

Eclipse

原文を参照ください。

IDEなし(ターミナルのみ)

dbxでターミナルとPythonを使い始めるには、以下のステップを完了してください。

dbxプロジェクト構成をセットアップするためにターミナルを使用するには、以下のステップを実行します。

  1. お使いのターミナルで空のフォルダーを作成します。これらの手順では、dbx-demoというフォルダーを使用します。dbxプロジェクトのルートフォルダーにはお好きな名前をつけることができます。別な名前を使う際には、以下のステップでフォルダー名を置き換えてください。フォルダーを作成した後にそのフォルダーに移動します。

    LinuxあるいはmacOS

    Bash
    mkdir dbx-demo
    cd dbx-demo
    

    Windows

    Powershell
    md dbx-demo
    cd dbx-demo
    
  2. dbx-demoフォルダーのルートでpyenvコマンドを実行することで、このプロジェクト向けのPython仮想環境を作成します。ここで<version>は、3.7.5のように、すでにローカルにインストールしている(そして、理想的にはターゲットのクラスターのPythonのバージョンにマッチする)Pythonのバージョンとなります。

    Bash
    pipenv --python <version>
    
  3. pipenv shellを実行してPython仮想環境を有効化します。

    Bash
    pipenv shell
    
  4. dbxプロジェクトの作成に進みます。

dbxプロジェクトの作成

上のセクションでdbxプロジェクト構成を作成したので、以下のいずれかのタイプのプロジェクトを作成することができます。

Python向け最小構成dbxプロジェクトの作成

以下の最小構成のdbxプロジェクトはPythonとdbxを使い始める際の、最もシンプルかつ迅速なアプローチとなります。ここでは、お使いのDatabricksワークスペースの既存のDatabricks all-purposeクラスター上で単一のPythonコードファイルをバッチ実行する様子をデモンストレーションします。

注意
all-purposeクラスター、jobクラスター上でのコードのバッチ実行、リモートコードアーティファクトのデプロイメント、CI/CDプラットフォームのセットアップをデモンストレーションするPytohn向けdbxテンプレートプロジェクトを作成したい場合には、CI/CDサポートとPython向けのdbxテンプレートの作成までスキップしてください。

この手順を完了するには、お使いのワークスペースにall-purposeクラスターが存在している必要があります(クラスターの表示クラスターの作成をご覧ください)。理想的には(必須ではありません)、お使いのPython仮想環境のPythonのバージョンはクラスターのPythonのバージョンと一致すべきです。クラスターのPythonバージョンを取得するには、クラスターのWebターミナルpython --versionコマンドを実行します。

Bash
python --version
  1. ターミナルで、dbxプロジェクトのルートフォルダーから以下のオプションを指定してdbx configureコマンドを実行します。このコマンドによって、dbxプロジェクトのルートフォルダーに.dbx隠しフォルダーが作成されます。.dbxフォルダーにはlock.jsonproject.jsonファイルが格納されます。

    Bash
    dbx configure --profile DEFAULT
    

    注意
    project.jsonファイルには、Databricks CLIの.databrickscfgファイル内のDEFAULTプロファイルへのリファレンスが含まれます。dbxに別のプロファイルを使わせたい場合には、DEFAULTをターゲットのプロファイル名で置き換えてください。
    Databricks CLIの.databrickscfgファイルのプロファイルではなく、dbxに環境変数DATABRICKS_HOSTDATABRICKS_TOKENを使わせたい場合には、project.jsonDEFAULTをそのままにしておきます。dbxはデフォルトでこのリファレンスを使用します。

  2. dbxルートフォルダーにconfというフォルダーを作成します。

    LinuxあるいはmacOS

    Bash
    mkdir conf
    

    Windows

    Powershell
    md conf
    
  3. confディレクトリに、以下の内容のdeployment.yamlというファイルを追加します。

    YAML
    build:
      no_build: true
    environments:
      default:
        workflows:
          - name: "dbx-demo-job"
            spark_python_task:
              python_file: "file://dbx-demo-job.py"
    

    注意
    deployment.yamlには小文字のdefaultが含まれており、これはDatabricks CLIの.databrickscfgファイルのDEFAULTプロファイルへのリファレンスとなっています。dbxに別のプロファイルを使わせたい場合には、DEFAULTをターゲットのプロファイル名で置き換えてください。

    例えば、dbxDEFAULTプロファイルではなく、Databricks CLIの.databrickscfgファイルのDEVというプロファイルがある場合、お使いのdeployment.yamlファイルは以下の様になります。

    YAML
    environments:
      default:
        workflows:
          - name: "dbx-demo-job"
            spark_python_task:
              python_file: "file://dbx-demo-job.py"
      dev:
        workflows:
          - name: "<some-other-job-name>"
            spark_python_task:
              python_file: "file://<some-other-filename>.py"
    

    もし、Databricks CLIの.databrickscfgファイルのプロファイルではなく、dbxに環境変数DATABRICKS_HOSTDATABRICKS_TOKENを使わせたい場合には、project.jsonDEFAULTをそのままにしておきます。dbxはデフォルトでこのリファレンスを使用します。

    ティップス
    ジョブにSpark設定のキーバリューペアを追加するには、以下の様にspark_confフィールドを使用します。

    YAML
    environments:
      default:
        workflows:
          - name: "dbx-demo-job"
            spark_conf:
              spark.speculation: true
              spark.streaming.ui.retainedBatches: 5
              spark.driver.extraJavaOptions: "-verbose:gc -XX:+PrintGCDetails"
            # ...
    

    ジョブにアクセス権を追加するには、以下の様にpermissionsフィールドを使用します。

    YAML
    environments:
      default:
        workflows:
          - name: "dbx-demo-job"
            permissions:
              access_control_list:
                - user_name: "someone@example.com"
                  permission_level: "IS_OWNER"
                - group_name: "some-group"
                  permission_level: "CAN_VIEW"
            # ...
    

    access_control_listフィールドは網羅的である必要があることに注意してください。すなわち、リストには他のユーザーやグループのアクセス権も追加する必要があります。

  4. dbx-demo-job.pyというファイルにクラスターで実行するコードを追加し、ファイルをdbxプロジェクトのルートフォルダーに追加します。(すぐに使えそうなコードがない場合には、本書の後半にあるコードサンプルのPythonコードを使用することができます。)

    注意
    このファイルの名前をdbx-demo-job.pyにする必要はありません。別のファイル名を指定した場合、conf/deployment.yamlファイルのpython_fileフィールドが一致するようにしてください。

  5. 以下のオプションを指定してdbx executeコマンドを実行します。このコマンドでは、<existing-cluster-id>をお使いのワークスペースのターゲットクラスターのIDで置き換えてください。(IDを取得するにはクラスターURLとIDをご覧ください)

    Bash
    dbx execute --cluster-id=<existing-cluster-id> --job=dbx-demo-job --no-rebuild --no-package
    
  6. ローカルで実行結果を参照するには、ターミナルの出力をご覧ください。クラスター上で実行結果を参照するには、お使いのクラスターのDriver logsタブのStandard outputを参照ください。(クラスターのドライバーノード、ワーカーノードのログもご覧ください)

  7. 次のステップに進んでください。

Scala、Java向け最小構成dbxプロジェクトの作成

原文を参照ください。

CI/CDサポートとPython向けのdbxテンプレートの作成

以下のPython向けdbxテンプレートプロジェクトでは、DatabricksワークスペースでDatabricksのall-purposeクラスターやjobsクラスターでPythonコードのバッチ実行、リモートコードアーティファクトのデプロイメント、CI/CDプラットフォームセットアップをデモンストレーションします。(既存のall-purposeクラスターで単一のPythonコードファイルのバッチ実行をデモンストレーションするPython最小構成dbxプロジェクトを作成するには、Python向け最小構成dbxプロジェクトの作成まで戻ってください)

  1. ターミナル上で、dbxプロジェクトルートフォルダーからdbx initコマンドを実行します。

    Bash
    dbx init
    
  2. project_nameでは、プロジェクトの名前を入力するか、デフォルトのプロジェクト名を受け入れるためにEnterを押します。

  3. versionには、お使いのプロジェクトの開始バージョン番号を入力するか、デフォルトのプロジェクトバージョンを受け入れるためにEnterを押します。

  4. cloudには、プロジェクトで使用するDatabricksクラウドバージョンに対応する番号を入力するか、デフォルトを受け入れるためにEnterを押します。

  5. cicd_toolには、プロジェクトで使いたいCI/CDに対応する番号を入力するか、デフォルトを受け入れるためにEnterを押します。

  6. project_slugには、プロジェクトで使用するリソーで使用したいプレフィックスを指定するか、デフォルトを受け入れるためにEnterを押します。

  7. workspace_dirには、プロジェクト向けのワークスペースディレクトリに対するローカルパスを指定するか、デフォルトを受け入れるためにEnterを押します。

  8. artifact_locationには、プロジェクトのアーティファクトが書き込まれるDatabricksワークスペースのパスを入力するか、デフォルトを受け入れるためにEnterを押します。

  9. profileには、プロジェクトで使用したいDatabricks CLI認証プロファイルの名前を入力するか、デフォルトを受け入れるためにEnterを押します。

ティップ
以下のようにパラメーターをハードコードして、dbx initを実行することで上述のステップをスキップすることができます。

Bash
dbx init --template="python_basic" \
-p "project_name=cicd-sample-project" \
-p "cloud=AWS" \
-p "cicd_tool='GitHub Actions'" \
-p "profile=DEFAULT" \
--no-input

dbxは、自動でパラメーターproject_slugworkspace_dirartifact_locationを計算します。これら3つのパラメーターはオプションであり、より高度なユースケースでのみ有用なものとなります。

dbxドキュメントのCLI Referenceinitコマンドをご覧ください。

新たなプロジェクトで使用する際には、dbxドキュメントのBasic Python Templateをご覧ください。

次のステップもご覧ください。

コードサンプル

dbxで使えるようなコードが手元にない場合には、以下のコードをdbxでバッチ実行することで実験することができます。このコードは、お使いのワークスペースに小さいテーブルを作成し、テーブルをクエリーし、テーブルを削除します。

ティップ
テーブルを削除せずにそのままにしておきたい場合は、dbxでバッチ実行する前に、このサンプルコードの最後の行をコメントアウトしてください。

Python
# For testing and debugging of local objects, run
# "pip install pyspark=X.Y.Z", where "X.Y.Z"
# matches the version of PySpark
# on your target clusters.
from pyspark.sql import SparkSession

from pyspark.sql.types import *
from datetime import date

spark = SparkSession.builder.appName("dbx-demo").getOrCreate()

# Create a DataFrame consisting of high and low temperatures
# by airport code and date.
schema = StructType([
   StructField('AirportCode', StringType(), False),
   StructField('Date', DateType(), False),
   StructField('TempHighF', IntegerType(), False),
   StructField('TempLowF', IntegerType(), False)
])

data = [
   [ 'BLI', date(2021, 4, 3), 52, 43],
   [ 'BLI', date(2021, 4, 2), 50, 38],
   [ 'BLI', date(2021, 4, 1), 52, 41],
   [ 'PDX', date(2021, 4, 3), 64, 45],
   [ 'PDX', date(2021, 4, 2), 61, 41],
   [ 'PDX', date(2021, 4, 1), 66, 39],
   [ 'SEA', date(2021, 4, 3), 57, 43],
   [ 'SEA', date(2021, 4, 2), 54, 39],
   [ 'SEA', date(2021, 4, 1), 56, 41]
]

temps = spark.createDataFrame(data, schema)

# Create a table on the cluster and then fill
# the table with the DataFrame's contents.
# If the table already exists from a previous run,
# delete it first.
spark.sql('USE default')
spark.sql('DROP TABLE IF EXISTS demo_temps_table')
temps.write.saveAsTable('demo_temps_table')

# Query the table on the cluster, returning rows
# where the airport code is not BLI and the date is later
# than 2021-04-01. Group the results and order by high
# temperature in descending order.
df_temps = spark.sql("SELECT * FROM demo_temps_table " \
   "WHERE AirportCode != 'BLI' AND Date > '2021-04-01' " \
   "GROUP BY AirportCode, Date, TempHighF, TempLowF " \
   "ORDER BY TempHighF DESC")
df_temps.show()

# Results:
#
# +-----------+----------+---------+--------+
# |AirportCode|      Date|TempHighF|TempLowF|
# +-----------+----------+---------+--------+
# |        PDX|2021-04-03|       64|      45|
# |        PDX|2021-04-02|       61|      41|
# |        SEA|2021-04-03|       57|      43|
# |        SEA|2021-04-02|       54|      39|
# +-----------+----------+---------+--------+

# Clean up by deleting the table from the cluster.
spark.sql('DROP TABLE demo_temps_table')
Scala
package com.example.demo

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
import java.sql.Date

object SampleApp {
  def main(args: Array[String]) {
    val spark = SparkSession.builder().master("local").getOrCreate()

    val schema = StructType(Array(
      StructField("AirportCode", StringType, false),
      StructField("Date", DateType, false),
      StructField("TempHighF", IntegerType, false),
      StructField("TempLowF", IntegerType, false)
    ))

    val data = List(
      Row("BLI", Date.valueOf("2021-04-03"), 52, 43),
      Row("BLI", Date.valueOf("2021-04-02"), 50, 38),
      Row("BLI", Date.valueOf("2021-04-01"), 52, 41),
      Row("PDX", Date.valueOf("2021-04-03"), 64, 45),
      Row("PDX", Date.valueOf("2021-04-02"), 61, 41),
      Row("PDX", Date.valueOf("2021-04-01"), 66, 39),
      Row("SEA", Date.valueOf("2021-04-03"), 57, 43),
      Row("SEA", Date.valueOf("2021-04-02"), 54, 39),
      Row("SEA", Date.valueOf("2021-04-01"), 56, 41)
    )

    val rdd = spark.sparkContext.makeRDD(data)
    val temps = spark.createDataFrame(rdd, schema)

    // Create a table on the Databricks cluster and then fill
    // the table with the DataFrame's contents.
    // If the table already exists from a previous run,
    // delete it first.
    spark.sql("USE default")
    spark.sql("DROP TABLE IF EXISTS demo_temps_table")
    temps.write.saveAsTable("demo_temps_table")

    // Query the table on the Databricks cluster, returning rows
    // where the airport code is not BLI and the date is later
    // than 2021-04-01. Group the results and order by high
    // temperature in descending order.
    val df_temps = spark.sql("SELECT * FROM demo_temps_table " +
      "WHERE AirportCode != 'BLI' AND Date > '2021-04-01' " +
      "GROUP BY AirportCode, Date, TempHighF, TempLowF " +
      "ORDER BY TempHighF DESC")
    df_temps.show()

    // Results:
    //
    // +-----------+----------+---------+--------+
    // |AirportCode|      Date|TempHighF|TempLowF|
    // +-----------+----------+---------+--------+
    // |        PDX|2021-04-03|       64|      45|
    // |        PDX|2021-04-02|       61|      41|
    // |        SEA|2021-04-03|       57|      43|
    // |        SEA|2021-04-02|       54|      39|
    // +-----------+----------+---------+--------+

    // Clean up by deleting the table from the Databricks cluster.
    spark.sql("DROP TABLE demo_temps_table")
  }
}
Java
package com.example.demo;

import java.util.ArrayList;
import java.util.List;
import java.sql.Date;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.*;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.Dataset;

public class SampleApp {
  public static void main(String[] args) {
    SparkSession spark = SparkSession
      .builder()
      .appName("Temps Demo")
      .config("spark.master", "local")
      .getOrCreate();

    // Create a Spark DataFrame consisting of high and low temperatures
    // by airport code and date.
    StructType schema = new StructType(new StructField[] {
      new StructField("AirportCode", DataTypes.StringType, false, Metadata.empty()),
      new StructField("Date", DataTypes.DateType, false, Metadata.empty()),
      new StructField("TempHighF", DataTypes.IntegerType, false, Metadata.empty()),
      new StructField("TempLowF", DataTypes.IntegerType, false, Metadata.empty()),
    });

    List<Row> dataList = new ArrayList<Row>();
    dataList.add(RowFactory.create("BLI", Date.valueOf("2021-04-03"), 52, 43));
    dataList.add(RowFactory.create("BLI", Date.valueOf("2021-04-02"), 50, 38));
    dataList.add(RowFactory.create("BLI", Date.valueOf("2021-04-01"), 52, 41));
    dataList.add(RowFactory.create("PDX", Date.valueOf("2021-04-03"), 64, 45));
    dataList.add(RowFactory.create("PDX", Date.valueOf("2021-04-02"), 61, 41));
    dataList.add(RowFactory.create("PDX", Date.valueOf("2021-04-01"), 66, 39));
    dataList.add(RowFactory.create("SEA", Date.valueOf("2021-04-03"), 57, 43));
    dataList.add(RowFactory.create("SEA", Date.valueOf("2021-04-02"), 54, 39));
    dataList.add(RowFactory.create("SEA", Date.valueOf("2021-04-01"), 56, 41));

    Dataset<Row> temps = spark.createDataFrame(dataList, schema);

    // Create a table on the Databricks cluster and then fill
    // the table with the DataFrame's contents.
    // If the table already exists from a previous run,
    // delete it first.
    spark.sql("USE default");
    spark.sql("DROP TABLE IF EXISTS demo_temps_table");
    temps.write().saveAsTable("demo_temps_table");

    // Query the table on the Databricks cluster, returning rows
    // where the airport code is not BLI and the date is later
    // than 2021-04-01. Group the results and order by high
    // temperature in descending order.
    Dataset<Row> df_temps = spark.sql("SELECT * FROM demo_temps_table " +
      "WHERE AirportCode != 'BLI' AND Date > '2021-04-01' " +
      "GROUP BY AirportCode, Date, TempHighF, TempLowF " +
      "ORDER BY TempHighF DESC");
    df_temps.show();

    // Results:
    //
    // +-----------+----------+---------+--------+
    // |AirportCode|      Date|TempHighF|TempLowF|
    // +-----------+----------+---------+--------+
    // |        PDX|2021-04-03|       64|      45|
    // |        PDX|2021-04-02|       61|      41|
    // |        SEA|2021-04-03|       57|      43|
    // |        SEA|2021-04-02|       54|      39|
    // +-----------+----------+---------+--------+

    // Clean up by deleting the table from the Databricks cluster.
    spark.sql("DROP TABLE demo_temps_table");
  }
}

次のステップ

その他のリソース

Databricks 無料トライアル

Databricks 無料トライアル

1
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
1