LoginSignup
5
2

More than 1 year has passed since last update.

Databricks Connect | Databricks on AWS [2021/11/10時点]の翻訳です。

Databricksクイックスタートガイドのコンテンツです。

Databricks Connectを用いることで、お使いのIDE(Eclipse、IntelliJ、PyCharm、RStudio、Visual Studio Code)、ノートブックサーバー(Jupyter Notebook、Zeppelin)、他のカスタムアプリケーションからDatabricksクラスターに接続できます。

本書では、Databricks Connectがどのように動作するのか、Databricks Connectのセットアップ方法のウォークスルー、Databricks Connectを利用している際の問題への対処法、Databricks ConnectとDatabricks notebookでの処理実行の違いを説明します。

概要

Databricks Connectは、Databricksランタイムのクライアントライブラリです。これによって、ローカルのSparkセッションの代わりに、Spark APIを用いてジョブを記述し、リモートのDatabricksクラスター上で処理を実行することができます。

例えば、Databricks Connectを用いて、spark.read.parquet(...).groupBy(...).agg(...).show()のデータフレームコマンドを実行した場合、ジョブ実行に対するパーシングと実行計画はお使いのローカルマシン上で実行されます。そして、クラスターで処理を行うために、Databricksで稼働しているSparkサーバーにジョブの論理表現が送信されます。

Databricks Connectを使用することで、以下のことが可能となります。

  • あらゆるPython、Java、Scala、RアプリケーションからSparkの大規模処理を実行できます。どこでもimport pysparkimport org.apache.sparkrequire(SparkR)を実行でき、IDEのプラグインやSparkサブミットスクリプトを利用することなしに、お使いのアプリケーションから直接Sparkジョブを実行することができます。
  • リモートのクラスターを利用しつつも、お使いのIDEでコードをデバッグすることができます。
  • ライブラリを開発する際に迅速にイテレーションできます。クラスターにおける個々のクライアントセッションは分離されているので、Databricks ConnectにおけるPythonやJavaライブラリの依存関係を変更した後にクラスターを再起動する必要がありません。
  • 作業状態を失うことなしにアイドル状態のクラスターをシャットダウンできます。クライアントアプリケーションはクラスターと分離されているので、通常はノートブックで定義されたすべての変数、RDD、データフレームオブジェクトが失われてしまう、クラスターの再起動、アップグレードの影響を受けません。

要件

  • 以下のDatabricksランタイムバージョンがサポートしています。
    • Databricks Runtime 9.1 ML、Databricks Runtime 9.1
    • Databricks Runtime 7.3 LTS ML、Databricks Runtime 7.3 LTS
    • Databricks Runtime 6.4 ML、Databricks Runtime 6.4
    • Databricks Runtime 5.5 LTS ML、Databricks Runtime 5.5 LTS
  • お使いのPythonクライアントのマイナーバージョンは、DatabricksクラスターのPythonのマイナーバージョンと一致する必要があります。以下のテーブルでは、それぞれのDatabricksランタイムのPythonバージョンを示しています。

    Databricksランタイムバージョン Pythonバージョン
    9.1 ML, 9.1 3.8
    7.3 LTS ML, 7.3 LTS 3.7
    6.4 ML, 6.4 3.7
    5.5 LTS ML 3.6
    5.5 LTS 3.5

    例えば、お使いのローカル環境でCondaを使用しており、クラスターがPython 3.5で動作するのであれば、同じバージョンで環境を構築する必要があります。例えば、

    Bash
    conda create --name dbconnect python=3.7
    conda
    
  • Databricks Connectのメジャー、マイナーバージョンは常にDatabricksランタイムバージョンと一致する必要があります。Databricksランタイムバージョンと一致する最新のDatabricks Connectの最新パッケージを使用することをお勧めします。例えば、Databricks Runtime 7.3 LTSクラスターを使用している場合には、databricks-connect==7.3.*パッケージをお使いください。

    注意
    利用可能なDatabricks Connectリリースとメンテナンスのアップデートに関しては、Databricks Connect release notesを参照ください。

  • Java Runtime Environment (JRE) 8が必要です。クライアントはOpenJDK 8 JREでテストされています。クライアントはJava 11をサポートしていません。

    訳者注
    Java SE Development Kit 8 - Downloadsで動作確認しました。
    注意
    Windowsで、Databricks Connectがwinutils.exeを見つけられないというエラーに遭遇した場合には、Windowsでwinutils.exeが見つからないを参照ください。

クライアントのセットアップ

ステップ1:クライアントのインストール

  1. PySparkをアンインストールします。これはdatabricks-connectがPySparkと競合するためです。詳細に関しては、PySparkインストールの競合を参照してください。

    Bash
    pip uninstall pyspark
    
  2. Databricks Connectクライアントをインストールします。

    Bash
    pip install -U "databricks-connect==7.3.*"  # or X.Y.* to match your cluster version.
    

    注意
    最新のパッケージがインストールされていることを確実にするために、databricks-connect=X.Yの代わりにdatabricks-connect==X.Y.*を常に指定するようにしてください。

ステップ2:接続プロパティの設定

  1. 以下の設定プロパティを確認します。
    • DatabricksのワークスペースURL
    • Databricksのパーソナルアクセストークン
    • 作成したクラスターのID。URLからクラスターIDを取得できます。以下の例では、クラスターIDは0304-201045-xxxxxxxxとなります。
    • Databricks Connectが接続するポート。15001に設定します。
  2. 接続設定を行います。CLI、SQL設定、環境変数を利用できます。設定方法の優先度が高い順から低い順に、SQL設定キー、CLI、環境変数となります。

    • CLI

      • databricks-connectを実行します。
      Bash
      databricks-connect configure
      

      ライセンスが表示されます。

      Copyright (2018) Databricks, Inc.
      
      This library (the "Software") may not be used except in connection with the
      Licensee's use of the Databricks Platform Services pursuant to an Agreement
        ...
      
      • ライセンスを承諾し、設定値を指定します。Databricks HostDatabricks Tokenに対しては、ステップ1で取得したワークスペースURLとパーソナルアクセストークンを指定します。
      Do you accept the above agreement? [y/N] y
      Set new config values (leave input empty to accept default):
      Databricks Host [no current value, must start with https://]: <databricks-url>
      Databricks Token [no current value]: <databricks-token>
      Cluster ID (e.g., 0921-001415-jelly628) [no current value]: <cluster-id>
      Org ID (Azure-only, see ?o=orgId in URL) [0]: <org-id>
      Port [15001]: <port>
      
    • SQL設定あるいは環境変数。以下のテーブルでは、ステップ1で取得した接続プロパティに対応するSQL設定キーと環境変数を示しています。SQL設定キーを指定するには、sql("set config=value")を使用します。例えば、sql("set spark.databricks.service.clusterId=0304-201045-abcdefgh")

    パラメーター SQL設定キー 環境変数
    Databricksのホスト spark.databricks.service.address DATABRICKS_ADDRESS
    Databricksのトークン spark.databricks.service.token DATABRICKS_API_TOKEN
    クラスターID spark.databricks.service.clusterId DATABRICKS_CLUSTER_ID
    組織ID spark.databricks.service.orgId DATABRICKS_ORG_ID
    ポート spark.databricks.service.port DATABRICKS_PORT

    重要!
    トークンをSQL設定に記述することはお勧めしません。

  3. Databricksへの接続テストを行います。

    Bash
    databricks-connect test
    

    設定したクラスターが稼働していない場合には、このテストはクラスターを起動し、指定された自動停止時間まで稼働し続けます。出力は以下のようになります。

* PySpark is installed at /.../3.5.6/lib/python3.5/site-packages/pyspark
* Checking java version
java version "1.8.0_152"
Java(TM) SE Runtime Environment (build 1.8.0_152-b16)
Java HotSpot(TM) 64-Bit Server VM (build 25.152-b16, mixed mode)
* Testing scala command
18/12/10 16:38:44 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
18/12/10 16:38:50 WARN MetricsSystem: Using default name SparkStatusTracker for source because neither spark.metrics.namespace nor spark.app.id is set.
18/12/10 16:39:53 WARN SparkServiceRPCClient: Now tracking server state for 5abb7c7e-df8e-4290-947c-c9a38601024e, invalidating prev state
18/12/10 16:39:59 WARN SparkServiceRPCClient: Syncing 129 files (176036 bytes) took 3003 ms
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.4.0-SNAPSHOT
      /_/

Using Scala version 2.11.12 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_152)
Type in expressions to have them evaluated.
Type :help for more information.

scala> spark.range(100).reduce(_ + _)
Spark context Web UI available at https://10.8.5.214:4040
Spark context available as 'sc' (master = local[*], app id = local-1544488730553).
Spark session available as 'spark'.
View job details at <databricks-url>/?o=0#/setting/clusters/<cluster-id>/sparkUi
View job details at <databricks-url>?o=0#/setting/clusters/<cluster-id>/sparkUi
res0: Long = 4950

scala> :quit

* Testing python command
18/12/10 16:40:16 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
18/12/10 16:40:17 WARN MetricsSystem: Using default name SparkStatusTracker for source because neither spark.metrics.namespace nor spark.app.id is set.
18/12/10 16:40:28 WARN SparkServiceRPCClient: Now tracking server state for 5abb7c7e-df8e-4290-947c-c9a38601024e, invalidating prev state
View job details at <databricks-url>/?o=0#/setting/clusters/<cluster-id>/sparkUi

IDEあるいはノートブックサーバーのセットアップ

このセクションでは、Databricks Connectクライアントを使うために、お使いのIDEやノートブックサーバーをどのように設定するのかを説明します。

このセクションでは以下のツールを説明します。

Jupyter notebook

注意
始める前に、要件に合致しているか確認の上、Databricks Connectのクライアントをセットアップしてください。

Databricks Connectの設定スクリプトは、お使いのプロジェクト設定に自動でパッケージを追加します。Pythonカーネルで始めるには、以下を実行します。

Python
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

SQLクエリーを実行し、可視化するマジックコマンド%sqlを有効にするには、以下のスニペットを実行します。

Python
from IPython.core.magic import line_magic, line_cell_magic, Magics, magics_class

@magics_class
class DatabricksConnectMagics(Magics):

   @line_cell_magic
   def sql(self, line, cell=None):
       if cell and line:
           raise ValueError("Line must be empty for cell magic", line)
       try:
           from autovizwidget.widget.utils import display_dataframe
       except ImportError:
           print("Please run `pip install autovizwidget` to enable the visualization widget.")
           display_dataframe = lambda x: x
       return display_dataframe(self.get_spark().sql(cell or line).toPandas())

   def get_spark(self):
       user_ns = get_ipython().user_ns
       if "spark" in user_ns:
           return user_ns["spark"]
       else:
           from pyspark.sql import SparkSession
           user_ns["spark"] = SparkSession.builder.getOrCreate()
           return user_ns["spark"]

ip = get_ipython()
ip.register_magics(DatabricksConnectMagics)

PyCharm

注意
始める前に、要件に合致しているか確認の上、Databricks Connectのクライアントをセットアップしてください。

Databricks Connectの設定スクリプトは、お使いのプロジェクト設定に自動でパッケージを追加します。

Python3のクラスター

  1. PyCharmプロジェクトを作成する際に、Existing Interpreterを選択します。ドロップダウンメニューから、作成したConda環境を選択します(要件を参照ください)。
  2. Run > Edit Configurationsに移動します。
  3. 環境変数としてPYSPARK_PYTHON=python3を追加します。

SparkRおよびRStudio Desktop

注意
始める前に、要件に合致しているか確認の上、Databricks Connectのクライアントをセットアップしてください。

  1. お使いのローカルマシン上にオープンソースのSparkをダウンロードし解凍します。お使いのDatabircksクラスターと同じバージョン(Hadoop 2.7)を選択します。
  2. databricks-connect get-jar-dirを実行します。このコマンドは/usr/local/lib/python3.5/dist-packages/pyspark/jarsのようなパスを返します。JARディレクトリのファイルパスの一つ上位のディレクトリのファイルパスをコピーします。例えば、SPARK_HOMEディレクトリである、/usr/local/lib/python3.5/dist-packages/pysparkとなります。
  3. お使いのRスクリプトに追加することで、SparkライブラリパスとSparkホームを設定します。ステップ1でオープンソースSparkを解凍したディレクトリを<spark-lib-path>に設定します。ステップ2で取得したディレクトリを<spark-home-path>に設定します。

    R
    # Point to the OSS package path, e.g., /path/to/.../spark-2.4.0-bin-hadoop2.7
    library(SparkR, lib.loc = .libPaths(c(file.path('<spark-lib-path>', 'R', 'lib'), .libPaths())))
    
    # Point to the Databricks Connect PySpark installation, e.g., /path/to/.../pyspark
    Sys.setenv(SPARK_HOME = "<spark-home-path>")
    
  4. Sparkセッションを初期化し、SparkRコマンドを実行します。

    R
    sparkR.session()
    
    df <- as.DataFrame(faithful)
    head(df)
    
    df1 <- dapply(df, function(x) { x }, schema(df))
    collect(df1)
    

sparklyrおよびRStudio Desktop

プレビュー
この機能はパブリックプレビューです。

注意
始める前に、要件に合致しているか確認の上、Databricks Connectのクライアントをセットアップしてください。

Databricks Connectを用いてローカルで開発したsparklyr依存のコードをコピーし、最低限のコード変更のみで、Databricksノートブック、あるいはDatabricksワークスペース上にホストされたRStudioサーバーで実行することができます。

要件

  • sparklyr 1.2以降
  • Databricks Runtime 6.4以降およびマッチするDatabricks Connect。

インストール、設定、sparklyrの利用

  1. RStudioデスクトップで、CRANあるいはGitHubから最新のマスターバージョンからsparklyr 1.2以降をインストールします。

    R
    # Install from CRAN
    install.packages("sparklyr")
    
    # Or install the latest master version from GitHub
    install.packages("devtools")
    devtools::install_github("sparklyr/sparklyr")
    
  2. Databricks ConnectがインストールされたPython環境をアクティベートし、<spark-home-path>を取得するためにターミナルで以下のコマンドを実行します。

    Bash
    databricks-connect get-spark-home
    
  3. スパークセッションを初期化し、sparklyrコマンドを実行します。

    R
    library(sparklyr)
    sc <- spark_connect(method = "databricks", spark_home = "<spark-home-path>")
    
    iris_tbl <- copy_to(sc, iris, overwrite = TRUE)
    
    library(dplyr)
    src_tbls(sc)
    
    iris_tbl %>% count
    
  4. 接続をクローズします。

    R
    spark_disconnect(sc)
    

リソース

詳細に関しては、sparklyrのGitHubのREADMEを参照ください。コードサンプルはsparklyrを参照ください。

sparklyrとRStudioデスクトップの制限

以下の機能はサポートされていません。

  • sparklyrストリーミングAPI
  • sparklyr ML API
  • broom API
  • csv_fileのシリアライゼーションモード
  • spark submit

IntelliJ (Scala、Java)

注意
始める前に、要件に合致しているか確認の上、Databricks Connectのクライアントをセットアップしてください。

  1. databricks-connect get-jar-dirを実行します。
  2. コマンドで返却されるディレクトリへの依存関係を指定します。File > Project Structure > Modules > Dependencies > ‘+’ sign > JARs or Directoriesに移動します。 競合を回避をするためには、お使いのクラスパスから他のSparkインストレーションを削除することを強くお勧めします。これが不可能な場合、追加したJARがクラスパスの先頭にあることを確認してください。特に、これらはインストールされた他のSparkのバージョン先頭にある必要があります(さもないと、これら他のSparkバージョンを使用しローカルで実行するか、ClassDefNotFoundErrorをスローすることになります)。
  3. IntelliJのブレイクアウトオプションの設定をチェックしてください。デフォルトはAllであり、デバッグのためにブレークポイントを設定した場合には、ネットワークのタイムアウトが発生するかもしれません。バックグラウンドのネットワークスレッドの停止を回避するために、Threadに設定してください。

Eclipse

注意
始める前に、要件に合致しているか確認の上、Databricks Connectのクライアントをセットアップしてください。

  1. databricks-connect get-jar-dirを実行します。
  2. コマンドで返却されるディレクトリへの依存関係を指定します。Project menu > Properties > Java Build Path > Libraries > Add External Jarsに移動します。 競合を回避をするためには、お使いのクラスパスから他のSparkインストレーションを削除することを強くお勧めします。これが不可能な場合、追加したJARがクラスパスの先頭にあることを確認してください。特に、これらはインストールされた他のSparkのバージョン先頭にある必要があります(さもないと、これら他のSparkバージョンを使用しローカルで実行するか、ClassDefNotFoundErrorをスローすることになります)。

Visual Studio Code

注意
始める前に、要件に合致しているか確認の上、Databricks Connectのクライアントをセットアップしてください。

  1. Python拡張がインストールされていることを確認してください。
  2. コマンドパレットを開きます(MacOSならCommand+Shift+P、Windows/LinuxならCtrl+Shift+P)。
  3. Pythonインタプリタを選択します。Code > Preferences > Settingsに移動し、python settingsを選択します。
  4. databricks-connect get-jar-dirを実行します。
  5. コマンドから返却されるディレクトリをpython.venvPath配下のUser SettingsのJSONに追加します。これはPython設定に追加される必要があります。
  6. Linterを無効化します。edit json settingsの右側の...をクリックします。変更された設定は以下の通りとなります。
  7. VS CodeでPythonを開発する際に推奨している、仮想環境で実行をしている場合には、コマンドパレットで、select python interpreterとタイプし、お使いのクラスターPythonバージョンにマッチする環境を指定します。 例えば、クラスターがPython 3.5の場合には、お使いのローカル環境はPython 3.5である必要があります。

SBT

注意
始める前に、要件に合致しているか確認の上、Databricks Connectのクライアントをセットアップしてください。

SBTを使うためには、通常のSpark依存ライブラリではなく、Databricks ConnectのJARを指定するようにbuild.sbtを設定する必要があります。以下のビルドファイルサンプルにあるように、unmanagedBaseディレクティブを用いて設定を行います。こちらのサンプルでは、Scalaアプリケーションがcom.example.Testメインオブジェクトを有していることを想定しています。

build.sbt

name := "hello-world"
version := "1.0"
scalaVersion := "2.11.6"
// this should be set to the path returned by ``databricks-connect get-jar-dir``
unmanagedBase := new java.io.File("/usr/local/lib/python2.7/dist-packages/pyspark/jars")
mainClass := Some("com.example.Test")

IDEでの実行例

Java
import java.util.ArrayList;
import java.util.List;
import java.sql.Date;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.*;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.Dataset;

public class App {
    public static void main(String[] args) throws Exception {
        SparkSession spark = SparkSession
            .builder()
            .appName("Temps Demo")
            .config("spark.master", "local")
            .getOrCreate();

        // Create a Spark DataFrame consisting of high and low temperatures
        // by airport code and date.
        StructType schema = new StructType(new StructField[] {
            new StructField("AirportCode", DataTypes.StringType, false, Metadata.empty()),
            new StructField("Date", DataTypes.DateType, false, Metadata.empty()),
            new StructField("TempHighF", DataTypes.IntegerType, false, Metadata.empty()),
            new StructField("TempLowF", DataTypes.IntegerType, false, Metadata.empty()),
        });

        List<Row> dataList = new ArrayList<Row>();
        dataList.add(RowFactory.create("BLI", Date.valueOf("2021-04-03"), 52, 43));
        dataList.add(RowFactory.create("BLI", Date.valueOf("2021-04-02"), 50, 38));
        dataList.add(RowFactory.create("BLI", Date.valueOf("2021-04-01"), 52, 41));
        dataList.add(RowFactory.create("PDX", Date.valueOf("2021-04-03"), 64, 45));
        dataList.add(RowFactory.create("PDX", Date.valueOf("2021-04-02"), 61, 41));
        dataList.add(RowFactory.create("PDX", Date.valueOf("2021-04-01"), 66, 39));
        dataList.add(RowFactory.create("SEA", Date.valueOf("2021-04-03"), 57, 43));
        dataList.add(RowFactory.create("SEA", Date.valueOf("2021-04-02"), 54, 39));
        dataList.add(RowFactory.create("SEA", Date.valueOf("2021-04-01"), 56, 41));

        Dataset<Row> temps = spark.createDataFrame(dataList, schema);

        // Create a table on the Databricks cluster and then fill
        // the table with the DataFrame's contents.
        // If the table already exists from a previous run,
        // delete it first.
        spark.sql("USE default");
        spark.sql("DROP TABLE IF EXISTS demo_temps_table");
        temps.write().saveAsTable("demo_temps_table");

        // Query the table on the Databricks cluster, returning rows
        // where the airport code is not BLI and the date is later
        // than 2021-04-01. Group the results and order by high
        // temperature in descending order.
        Dataset<Row> df_temps = spark.sql("SELECT * FROM demo_temps_table " +
            "WHERE AirportCode != 'BLI' AND Date > '2021-04-01' " +
            "GROUP BY AirportCode, Date, TempHighF, TempLowF " +
            "ORDER BY TempHighF DESC");
        df_temps.show();

        // Results:
        //
        // +-----------+----------+---------+--------+
        // |AirportCode|      Date|TempHighF|TempLowF|
        // +-----------+----------+---------+--------+
        // |        PDX|2021-04-03|       64|      45|
        // |        PDX|2021-04-02|       61|      41|
        // |        SEA|2021-04-03|       57|      43|
        // |        SEA|2021-04-02|       54|      39|
        // +-----------+----------+---------+--------+

        // Clean up by deleting the table from the Databricks cluster.
        spark.sql("DROP TABLE demo_temps_table");
    }
}
Python
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from datetime import date

spark = SparkSession.builder.appName('temps-demo').getOrCreate()

# Create a Spark DataFrame consisting of high and low temperatures
# by airport code and date.
schema = StructType([
    StructField('AirportCode', StringType(), False),
    StructField('Date', DateType(), False),
    StructField('TempHighF', IntegerType(), False),
    StructField('TempLowF', IntegerType(), False)
])

data = [
    [ 'BLI', date(2021, 4, 3), 52, 43],
    [ 'BLI', date(2021, 4, 2), 50, 38],
    [ 'BLI', date(2021, 4, 1), 52, 41],
    [ 'PDX', date(2021, 4, 3), 64, 45],
    [ 'PDX', date(2021, 4, 2), 61, 41],
    [ 'PDX', date(2021, 4, 1), 66, 39],
    [ 'SEA', date(2021, 4, 3), 57, 43],
    [ 'SEA', date(2021, 4, 2), 54, 39],
    [ 'SEA', date(2021, 4, 1), 56, 41]
]

temps = spark.createDataFrame(data, schema)

# Create a table on the Databricks cluster and then fill
# the table with the DataFrame's contents.
# If the table already exists from a previous run,
# delete it first.
spark.sql('USE default')
spark.sql('DROP TABLE IF EXISTS demo_temps_table')
temps.write.saveAsTable('demo_temps_table')

# Query the table on the Databricks cluster, returning rows
# where the airport code is not BLI and the date is later
# than 2021-04-01. Group the results and order by high
# temperature in descending order.
df_temps = spark.sql("SELECT * FROM demo_temps_table " \
    "WHERE AirportCode != 'BLI' AND Date > '2021-04-01' " \
    "GROUP BY AirportCode, Date, TempHighF, TempLowF " \
    "ORDER BY TempHighF DESC")
df_temps.show()

# Results:
#
# +-----------+----------+---------+--------+
# |AirportCode|      Date|TempHighF|TempLowF|
# +-----------+----------+---------+--------+
# |        PDX|2021-04-03|       64|      45|
# |        PDX|2021-04-02|       61|      41|
# |        SEA|2021-04-03|       57|      43|
# |        SEA|2021-04-02|       54|      39|
# +-----------+----------+---------+--------+

# Clean up by deleting the table from the Databricks cluster.
spark.sql('DROP TABLE demo_temps_table')
Scala
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
import java.sql.Date

object Demo {
  def main(args: Array[String]) {
      val spark = SparkSession.builder.master("local").getOrCreate()

      // Create a Spark DataFrame consisting of high and low temperatures
      // by airport code and date.
      val schema = StructType(Array(
        StructField("AirportCode", StringType, false),
        StructField("Date", DateType, false),
        StructField("TempHighF", IntegerType, false),
        StructField("TempLowF", IntegerType, false)
      ))

      val data = List(
        Row("BLI", Date.valueOf("2021-04-03"), 52, 43),
        Row("BLI", Date.valueOf("2021-04-02"), 50, 38),
        Row("BLI", Date.valueOf("2021-04-01"), 52, 41),
        Row("PDX", Date.valueOf("2021-04-03"), 64, 45),
        Row("PDX", Date.valueOf("2021-04-02"), 61, 41),
        Row("PDX", Date.valueOf("2021-04-01"), 66, 39),
        Row("SEA", Date.valueOf("2021-04-03"), 57, 43),
        Row("SEA", Date.valueOf("2021-04-02"), 54, 39),
        Row("SEA", Date.valueOf("2021-04-01"), 56, 41)
      )

      val rdd = spark.sparkContext.makeRDD(data)
      val temps = spark.createDataFrame(rdd, schema)

      // Create a table on the Databricks cluster and then fill
      // the table with the DataFrame's contents.
      // If the table already exists from a previous run,
      // delete it first.
      spark.sql("USE default")
      spark.sql("DROP TABLE IF EXISTS demo_temps_table")
      temps.write.saveAsTable("demo_temps_table")

      // Query the table on the Databricks cluster, returning rows
      // where the airport code is not BLI and the date is later
      // than 2021-04-01. Group the results and order by high
      // temperature in descending order.
      val df_temps = spark.sql("SELECT * FROM demo_temps_table " +
        "WHERE AirportCode != 'BLI' AND Date > '2021-04-01' " +
        "GROUP BY AirportCode, Date, TempHighF, TempLowF " +
        "ORDER BY TempHighF DESC")
      df_temps.show()

      // Results:
      //
      // +-----------+----------+---------+--------+
      // |AirportCode|      Date|TempHighF|TempLowF|
      // +-----------+----------+---------+--------+
      // |        PDX|2021-04-03|       64|      45|
      // |        PDX|2021-04-02|       61|      41|
      // |        SEA|2021-04-03|       57|      43|
      // |        SEA|2021-04-02|       54|      39|
      // +-----------+----------+---------+--------+

      // Clean up by deleting the table from the Databricks cluster.
      spark.sql("DROP TABLE demo_temps_table")
  }
}

依存関係への対応

通常、メインクラスやPythonファイルは他のJARやファイルへの依存関係を有します。sparkContext.addJar("path-to-the-jar")sparkContext.addPyFile("path-to-the-file")を呼び出すことで、このようなJARやフィルに対する依存関係を追加することができます。また、addPyFile()インタフェースで、Eggファイルやzipファイルを追加することもできます。IDEでコードを実行する際は常に、クラスター上に依存するJARやファイルがインストールされます。

Python
from lib import Foo
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

sc = spark.sparkContext
#sc.setLogLevel("INFO")

print("Testing simple count")
print(spark.range(100).count())

print("Testing addPyFile isolation")
sc.addPyFile("lib.py")
print(sc.parallelize(range(10)).map(lambda i: Foo(2)).collect())

class Foo(object):
  def __init__(self, x):
    self.x = x
Scala
package com.example

import org.apache.spark.sql.SparkSession

case class Foo(x: String)

object Test {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder()
      ...
      .getOrCreate();
    spark.sparkContext.setLogLevel("INFO")

    println("Running simple show query...")
    spark.read.parquet("/tmp/x").show()

    println("Running simple UDF query...")
    spark.sparkContext.addJar("./target/scala-2.11/hello-world_2.11-1.0.jar")
    spark.udf.register("f", (x: Int) => x + 1)
    spark.range(10).selectExpr("f(id)").show()

    println("Running custom objects query...")
    val objs = spark.sparkContext.parallelize(Seq(Foo("bye"), Foo("hi"))).collect()
    println(objs.toSeq)
  }
}

DBUtilsへのアクセス

Databricksユーティリティモジュールであるdbutils.fsdbutils.secretsを使用することができます。サポートされているコマンドは、dbutils.fs.cpdbutils.fs.headdbutils.fs.lsdbutils.fs.mkdirsdbutils.fs.mvdbutils.fs.putdbutils.fs.rmdbutils.secrets.getdbutils.secrets.getBytesdbutils.secrets.listdbutils.secrets.listScopesとなります。Credentials utility (dbutils.credentials)Secrets utility (dbutils.secrets)を参照いただくか、dbutils.fs.help()dbutils.secrets.help()を実行してください。

Python

Python
pip install six
Python
from pyspark.sql import SparkSession
from pyspark.dbutils import DBUtils

spark = SparkSession.builder.getOrCreate()

dbutils = DBUtils(spark)
print(dbutils.fs.ls("dbfs:/"))
print(dbutils.secrets.listScopes())

Databricksランタイム7.3 LTS以降を使用しており、ローカルと同じような手順でDatabricksクラスターのDBUtilsモジュールにアクセスするには、以下のget_dbutils()を使用してください。

Python
def get_dbutils(spark):
  from pyspark.dbutils import DBUtils
  return DBUtils(spark)

あるいは、以下のget_dbutils()を使用してください。

Python
def get_dbutils(spark):
  if spark.conf.get("spark.databricks.service.client.enabled") == "true":
    from pyspark.dbutils import DBUtils
    return DBUtils(spark)
  else:
    import IPython
    return IPython.get_ipython().user_ns["dbutils"]

Scala

Scala
val dbutils = com.databricks.service.DBUtils
println(dbutils.fs.ls("dbfs:/"))
println(dbutils.secrets.listScopes())

ローカル、リモートファイルシステム間でのファイルコピー

クライアントとリモートファイルシステム間でファイルをコピーするために、dbutils.fsを使用できます。file:/スキームはクライアントのローカルファイルシステムを参照します。

Python
from pyspark.dbutils import DBUtils
dbutils = DBUtils(spark)

dbutils.fs.cp('file:/home/user/data.csv', 'dbfs:/uploads')
dbutils.fs.cp('dbfs:/output/results.csv', 'file:/home/user/downloads/')

転送できるファイル最大サイズは250MBです。

dbutils.secrets.getの有効化

セキュリティ上の制約から、デフォルトではdbutils.secrets.getの呼び出しは無効化されています。お使いのワークスペースでこの機能を有効化するにはDatabricksのサポートに問い合わせください。

Hadoopファイルシステムへのアクセス

標準的なHadoopファイルシステムインタフェースを用いて、DBFSに直接アクセスすることができます。

Scala
> import org.apache.hadoop.fs._

// get new DBFS connection
> val dbfs = FileSystem.get(spark.sparkContext.hadoopConfiguration)
dbfs: org.apache.hadoop.fs.FileSystem = com.databricks.backend.daemon.data.client.DBFS@2d036335

// list files
> dbfs.listStatus(new Path("dbfs:/"))
res1: Array[org.apache.hadoop.fs.FileStatus] = Array(FileStatus{path=dbfs:/$; isDirectory=true; ...})

// open file
> val stream = dbfs.open(new Path("dbfs:/path/to/your_file"))
stream: org.apache.hadoop.fs.FSDataInputStream = org.apache.hadoop.fs.FSDataInputStream@7aa4ef24

// get file contents as string
> import org.apache.commons.io._
> println(new String(IOUtils.toByteArray(stream)))

Hadoopの設定

SQL、データフレームの操作に適用されるspark.conf.setAPIを用いて、クライアント上にHadoop設定を行うことができます。sparkContextに設定されたHadoop設定は、ノートブックを使用しているクラスターに設定される必要があります。これは、sparkContext上の設定は、ユーザーのセッションではなく、クラスター全体適用されるためです。

トラブルシューティング

接続に関する問題を確認するには、databricks-connect testを実行します。このセクションでは、遭遇するであろう一般的な問題と解決法を説明します。

Pythonバージョンのミスマッチ

ローカルで使用しているPythonのバージョンが、クラスターのバージョンとマイナーバージョンまで一致していることを確認してください。(例えば、3.5.13.5.2はOKですが、3.6はNGです)

ローカルに複数のPythonバージョンがある場合には、Databricks Connectが、PYSPARK_PYTHONで指定されたもの(例えば、PYSPARK_PYTHON=python3)を使用していることを確認してください。

サーバーが有効化されていない

お使いのクラスターでspark.databricks.service.server.enabled trueが指定され、Sparkサーバーが有効化されていることを確認してください。有効化されている場合、ドライバーログに以下のような出力が確認できます。

18/10/25 21:39:18 INFO SparkConfUtils$: Set spark config:
spark.databricks.service.server.enabled -> true
...
18/10/25 21:39:21 INFO SparkContext: Loading Spark Service RPC Server
18/10/25 21:39:21 INFO SparkServiceRPCServer:
Starting Spark Service RPC Server
18/10/25 21:39:21 INFO Server: jetty-9.3.20.v20170531
18/10/25 21:39:21 INFO AbstractConnector: Started ServerConnector@6a6c7f42
{HTTP/1.1,[http/1.1]}{0.0.0.0:15001}
18/10/25 21:39:21 INFO Server: Started @5879ms

PySparkインストールの競合

databricks-connectパッケージはPySparkと競合します。両方をインストールすると、PythonでSparkコンテキストを初期化する際にエラーが発生します。これは「stream corrupted」や「class not found」エラーとなりえます。Python環境にPySparkをインストールしている場合には、databricks-connectをインストールする前にアンインストールしてください。PySparkをアンインストールした後は、Databricks Connectを完全に再インストールしたことを確認んしてください。

Bash
pip uninstall pyspark
pip uninstall databricks-connect
pip install -U "databricks-connect==5.5.*"  # or X.Y.* to match your cluster version.

SPARK_HOMEの競合

お使いのマシンで以前Sparkを使ったことがある場合には、IDEはDatabricks ConnectのSparkではなく、以前のバージョンのSparkを使うように設定されているかもしれません。これは、「stream corrupted」や「class not found」といったエラーを引き起こします。SPARK_HOMEの値をチェックすることで、お使いのSparkのバージョンを確認できます。

Java
System.out.println(System.getenv("SPARK_HOME"));
Python
import os
print(os.environ['SPARK_HOME'])
Scala
println(sys.env.get("SPARK_HOME"))

解決方法

SPARK_HOMEがクライアントと異なるバージョンのSparkを示している場合には、SPARK_HOMEをアンセットし、再度トライする必要があります。

お使いのIDEの環境変数設定、.bashrc.zshrc.bash_profileファイル、他に環境変数が設定されているところを確認してください。古い状態を廃棄するためにIEDを再起動する必要があるかもしれません、そして、問題が継続する場合には新規プロジェクトを作成する必要があるかもしれません。

SPARK_HOMEに新たな値を指定する必要はありません。アンセットするだけで十分です。

バイナリーのPATHエントリーの競合あるいは欠如

spark-shellのようなコマンドがDatabricks Connectによるものではなく、別にインストールされたバイナリーで実行されるようにPATHが設定されている可能性があります。この場合、databricks-connect testは失敗します。Databricks Connectのバイナリーの優先度が高いことを確認するか、以前インストールされたものを削除してください。

spark-shellのようなコマンドを実行できない場合、pip installによってPATHが自動的に設定されておらず、手動でPATHにbinディレクトリを追加する必要があるかもしれません。sっとアップされていなかったとしても、IDEからDatabricks Connectを利用できる可能性はありますが、databricks-connect testコマンドは失敗します。

クラスターにおけるシリアライゼーション設定の競合

databricks-connect testコマンドを実行した際に「stream corrupted」エラーに装具数する場合は、クラスターのシリアライゼーション設定に互換性がない可能性があります。例えば、spark.io.compression.codecの設定が問題を引き起こす場合があります。この問題を解決するためには、クラスター設定からこれらの設定を削除するか、Databricks Connectクライアントにも同じ設定を行います。

Windowsにおいてwinutils.exeが見つからない

WindowsでDatabricks Connectを使用している際に、以下のエラーに遭遇するかもしれません。

ERROR Shell: Failed to locate the winutils binary in the hadoop binary path
java.io.IOException: Could not locate executable null\bin\winutils.exe in the Hadoop binaries.

WindowsにおけるHadoopパスの設定の指示に従ってください。

Windowsにおいて、ファイル名、ディレクトリ名、ボリュームラベルの文法が誤っている

WindowsでDatabricks Connectを使用している際に、以下のエラーに遭遇するかもしれません。

The filename, directory name, or volume label syntax is incorrect.

Java、Databricks Connectがパスに空白を含むディレクトリにインストールされています。パスに空白を含まないディレクトリにインストールするか、ショートネームフォームを用いてパスを設定することで回避します。

制限

以下のDatabricks、サードパーティのプラットフォームの機能はサポートされていません。

  • 構造化ストリーミング
  • リモートクラスターにおいて、Sparkジョブの一部ではない任意のコードの実行
  • Deltaテーブル操作のためのScala、Python、RのネイティブAPI(例えば、DeltaTable.forPath)はサポートされていません。しかし、Delta Lakeに対するオペレーションのためのSQL API(spark.sql(...))や、Deltaテーブルに対するSpark API(例えば、spark.read.load)はサポートされています。
  • Apache Zeppelin 0.7.x以下
  • テーブルアクセスコントロールがあるクラスターへの接続
  • プロセス分類が有効化された(spark.databricks.pyspark.enableProcessIsolationtrueに設定されている) クラスターへの接続
  • DeltaのCLONESQLコマンド
  • グローバル一時ビュー
  • Koalas
  • 以下のDatabricksユーティリティ
  • AWS Glueカタログ
  • IAMクレディンシャルパススルーはDatabricksランタイム6.4以降が動作しているスタンダードクラスターでのみサポートされます。

Databricks 無料トライアル

Databricks 無料トライアル

5
2
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
5
2