SQL databases using JDBC | Databricks on AWS [2021/8/13時点]の翻訳です。
JDBCドライバーを用いて様々なSQLデータベースにクエリーを行うためにDatabricksを利用できます。
Databricksランタイムには、MySQLドライバーとしてorg.mariadb.jdbc
ドライバーが含まれています。
Databricksランタイムには、Microsoft SQL Server、Azure SQL Database向けのJDBCドライバーが含まれています。
PostgreSQL、Oracleのような他のSQLデータベースを使用することも可能です。Databricksでドライバーが提供されていないデータベースに対してい、JDBCライブラリをインストールする方法に関しては、ライブラリをご覧ください。
本書では、JDBCを用いてSQLデータベースに接続するためにどのようにDataFrame APIを使うのか、JDBCインタフェースを通じてどのように並列性を制御するのかを説明します。本書の最後には、Scala API、Python、Spark SQLのサンプルを示しています。JDBC経由でSQLデータベースに接続する際にサポートされている引数に関しては、JDBC To Other Databasesをご覧ください。
重要!
本書のサンプルでは、JDBCのURLにユーザー名、パスワードを含めていません。データベースの認証情報をシークレットとして保存し、ノートブック上でjava.util.Properties
オブジェクトに認証情報を埋め込むようにシークレット管理ユーザーガイドに従うことをお勧めします。以下に例を示します。
Scala
val jdbcUsername = dbutils.secrets.get(scope = "jdbc", key = "username")
val jdbcPassword = dbutils.secrets.get(scope = "jdbc", key = "password")
> シークレット管理の完全なサンプルに関しては、[Databricksシークレットのワークフロー例](https://qiita.com/taka_yayoi/items/b9bdaa63e7c991d95092)をご覧ください。
# クラウドとの接続の確立
DatabricksのVPCはSparkクラスターのみの接続を許可しています。他のインフラストラクチャにアクセスする際のベストプラクティスは、[VPCピアリング](https://docs.databricks.com/administration-guide/cloud-configurations/aws/vpc-peering.html)を使用することです。VPCピアリング確立後に、クラスターで`netcat`ユーティリティを用いて確認することできます。
```bash:Bash
%sh nc -vz <jdbcHostname> <jdbcPort>
MySQLとの接続の確立
このサンプルでは、JDBCドライバーを使ってMySQLにクエリーを実行します。
ステップ1: JDBCドライバーが利用できることを確認する
以下のステートメントでは、お使いのclasspathにドライバークラスが存在しているかどうかを確認しています。Pythonノートブックのように他の言語のノートブックにおいても、%scala
マジックコマンドを用いてテストすることができます。
Class.forName("org.mariadb.jdbc.Driver")
ステップ2: JDBC URLを作成する
val jdbcHostname = "<hostname>"
val jdbcPort = 3306
val jdbcDatabase = "<database>"
// Create the JDBC URL without passing in the user and password parameters.
val jdbcUrl = s"jdbc:mysql://${jdbcHostname}:${jdbcPort}/${jdbcDatabase}"
// Create a Properties() object to hold the parameters.
import java.util.Properties
val connectionProperties = new Properties()
connectionProperties.put("user", s"${jdbcUsername}")
connectionProperties.put("password", s"${jdbcPassword}")
ステップ3: MySQLデータベースとの接続を確認する
import java.sql.DriverManager
val connection = DriverManager.getConnection(jdbcUrl, jdbcUsername, jdbcPassword)
connection.isClosed()
SQL Serverとの接続の確立
このサンプルでは、JDBCドライバーを使ってSQL Serverにクエリーを実行します。
ステップ1: JDBCドライバーが利用できることを確認する
Class.forName("com.microsoft.sqlserver.jdbc.SQLServerDriver")
ステップ2: JDBC URLを作成する
val jdbcHostname = "<hostname>"
val jdbcPort = 1433
val jdbcDatabase = "<database>"
// Create the JDBC URL without passing in the user and password parameters.
val jdbcUrl = s"jdbc:sqlserver://${jdbcHostname}:${jdbcPort};database=${jdbcDatabase}"
// Create a Properties() object to hold the parameters.
import java.util.Properties
val connectionProperties = new Properties()
connectionProperties.put("user", s"${jdbcUsername}")
connectionProperties.put("password", s"${jdbcPassword}")
ステップ3: SQL Serverデータベースとの接続を確認する
val driverClass = "com.microsoft.sqlserver.jdbc.SQLServerDriver"
connectionProperties.setProperty("Driver", driverClass)
SSL経由でのPostgreSQLデータベースとの接続
JDBCを用いて、SSL経由でPostgreSQLデータベースに接続するためには、以下の手順を踏みます。
- PEMではなくPK8、DERフォーマットで証明書と鍵を指定する必要があります。
- 全てのノードが読み取りを行えるように、DBFSの
/dbfs
フォルダーに証明書と鍵を格納する必要があります。
以下のPythonサンプルノートブックでは、一連のPEMファイルからPK8、DERファイルを作成し、これらのファイルからデータフレームを作成する方法を説明しています。このサンプルでは以下のPEMファイルが存在することを想定しています。
- クライアント公開鍵証明書として
client_cert.pem
- クライアント秘密鍵として
client_key.pem
- サーバ証明書として
server_ca.pem
%sh
# Copy the PEM files to a folder within /dbfs so that all nodes can read them.
mkdir -p <target-folder>
cp <source-files> <target-folder>
%sh
# Convert the PEM files to PK8 and DER format.
cd <target-folder>
openssl pkcs8 -topk8 -inform PEM -in client_key.pem -outform DER -out client_key.pk8 -nocrypt
openssl x509 -in server_ca.pem -out server_ca.der -outform DER
openssl x509 -in client_cert.pem -out client_cert.der -outform DER
# Create the DataFrame.
df = (spark
.read
.format("jdbc")
.option("url", <connection-string>)
.option("dbtable", <table-name>)
.option("user", <username>)
.option("password", <password>)
.option("ssl", True)
.option("sslmode", "require")
.option("sslcert", <target-folder>/client_cert.der)
.option("sslkey", <target-folder>/client_key.pk8)
.option("sslrootcert", <target-folder>/server_ca.der)
.load()
)
以下の通り置き換えてください。
-
<source-files>
を/dbfs/FileStore/Users/someone@example.com/*
のようなソースディレクトリにある.pem
ファイルのリストで置き換えてください。 -
<target-folder>
を/dbfs/databricks/driver/ssl
のように生成したPK8、DERファイルを格納するターゲットディレクトリの名前で置き換えてください。 -
<connection-string>
をデータベースに対するJDBC URL接続文字列で置き換えてください。 -
<table-name>
をデータベースで使用するテーブル名で置き換えてください。 -
<username>
と<password>
をデータベースにアクセスするために使用するユーザー名、パスワードで置き換えてください。
JDBCからのデータ読み込み
このセクションではデータベーステーブルからデータをロードします。ここでは、Spark環境にテーブルを読み込む際に単一のJDBC接続を用います。並列での読み取りに関しては、並列性の管理をご覧ください。
val employees_table = spark.read.jdbc(jdbcUrl, "employees", connectionProperties)
Sparkはデータベーステーブルからスキーマを自動で読み込み、データベースの型をSpark SQL型にマッピングします。
employees_table.printSchema
JDBCテーブルに対してクエリーを実行することができます。
display(employees_table.select("age", "salary").groupBy("age").avg("salary"))
JDBCへのデータ書き込み
ここでは、既存のSpark SQLテーブルdiamonds
からデータベースにデータを書き込む方法を説明します。
select * from diamonds limit 5
以下のコードは、データをdiamonds
というデータベーステーブルに書き込みます。予約されているキーワードのカラム名は例外を引き起こします。サンプルテーブルにはtable
というカラムがあるので、JDBC APIにプッシュする前にwithColumnRenamed()
を用いてカラム名を変更することができます。
spark.table("diamonds").withColumnRenamed("table", "table_number")
.write
.jdbc(jdbcUrl, "diamonds", connectionProperties)
Sparkは自動でDataFrameスキーマから適切なスキーマを決定し、データベースのテーブルを作成します。
デフォルトの挙動は、新規にテーブルを作成し、すでに同じ名前のテーブルが存在する場合にはエラーメッセージをスローするというものです。この挙動を変更するために、Spark SQLのSaveMode
機能を使用することができます。例えば、以下にテーブルに行を追加するサンプルを示します。
import org.apache.spark.sql.SaveMode
spark.sql("select * from diamonds limit 10").withColumnRenamed("table", "table_number")
.write
.mode(SaveMode.Append) // <--- Append to the existing table
.jdbc(jdbcUrl, "diamonds", connectionProperties)
既存テーブルを上書きすることもできます。
spark.table("diamonds").withColumnRenamed("table", "table_number")
.write
.mode(SaveMode.Overwrite) // <--- Overwrite the existing table
.jdbc(jdbcUrl, "diamonds", connectionProperties)
データベースエンジンへのクエリープッシュダウン
全てのクエリーをデータベースにプッシュダウンし、結果のみを受け取るようにすることができます。table
パラメーターは読み取りを行うJDBCテーブルを特定します。SQLクエリーのFROM
句で妥当なものは何でも使用できます。
// Note: The parentheses are required.
val pushdown_query = "(select * from employees where emp_no < 10008) emp_alias"
val df = spark.read.jdbc(url=jdbcUrl, table=pushdown_query, properties=connectionProperties)
display(df)
プッシュダウンの最適化
テーブル全体を取り込むことに加え、データベースにクエリーをプッシュダウンし処理を行わせ、結果のみを取得することが可能です。
// Explain plan with no column selection returns all columns
spark.read.jdbc(jdbcUrl, "diamonds", connectionProperties).explain(true)
DataFrame
メソッドを用いることで、カラムを削減し、クエリーの述語をデータベースにプッシュダウンすることができます。
// Explain plan with column selection will prune columns and just return the ones specified
// Notice that only the 3 specified columns are in the explain plan
spark.read.jdbc(jdbcUrl, "diamonds", connectionProperties).select("carat", "cut", "price").explain(true)
// You can push query predicates down too
// Notice the filter at the top of the physical plan
spark.read.jdbc(jdbcUrl, "diamonds", connectionProperties).select("carat", "cut", "price").where("cut = 'Good'").explain(true)
並列性の管理
Spark UIでは、起動さえたタスクの数を意味するパーティション数を確認することができます。それぞれのタスクはエグゼキューターに分配され、JDBCインタフェースを介した読み書きの並列性を増加することができます。パフォーマンス改善に寄与するfetchsize
のような他のパラメーターに関してはSpark SQL programming guideを参照ください。
パーティションを指定するために2つのDataFrameReader APIを使用することができます。
-
jdbc(url:String,table:String,columnName:String,lowerBound:Long,upperBound:Long,numPartitions:Int,...)
は数値カラムの名前(columnName
)、2つの範囲の終端(lowerBound
,upperBound
)、ターゲットのnumPartitions
を受け取り、指定した範囲をnumPartitions
個のタスクに均等に分割します。データベースのテーブルが、オートインクリメントの主キーのように均等に分布している数値カラムのインデックスを持っていれば、この処理はうまく動作します。数値カラムが極端に偏っていると、タスクのバランスが悪くなるため、あまりうまく動作しません。 -
jdbc(url:String,table:String,predicates:Array[String],...)
は、カスタムパーティションを定義するために使用するWHERE
条件の配列を受け取ります。偏りに対応するために非数値のカラムでパーティショニングを行う際に有用です。カスタムパーティションを定義する際、パーティションカラムがNULLを許容する場合、NULL
を考慮するようにしてください。境界の述語の記述はより複雑なロジックを必要とするため、2つ以上のカラムを用いてマニュアルでパーティションを定義しないでください。
JDBCの読み込み
データセットのカラムの値に基づいて分割境界を指定することができます。
これらのオプションは読み込み並列性を指定します。これらのオプションはどれか一つを指定する際には、全て指定する必要があることに注意してください。lowerBound
とupperBound
はパーティションの幅を定義しますが、テーブルの行をフィルタリングしません。このため、Sparkはテーブルの全ての行からパーティションを作成して返却します。
以下の例では、columnName
、lowerBound
、upperBound
、numPartitions
パラメーターを用いて、emp_no
カラムに対して、エグゼキューターでテーブル読み込みを分割しています。
val df = (spark.read.jdbc(url=jdbcUrl,
table="employees",
columnName="emp_no",
lowerBound=1L,
upperBound=100000L,
numPartitions=100,
connectionProperties=connectionProperties))
display(df)
JDBCの書き込み
Sparkのパーティションの数は、JDBC APIを通じてデータをプッシュするのに使用する接続数を意味します。既存のパーティション数に基づいてcoalesce(<N>)
やrepartition(<N>)
をコールすることで、並列性をコントロールすることができます。パーティション数を減らす場合にはcoalesce
をコールし、パーティション数を増やしたい場合にはrepartition
をコールします。
import org.apache.spark.sql.SaveMode
val df = spark.table("diamonds")
println(df.rdd.partitions.length)
// Given the number of partitions above, you can reduce the partition value by calling coalesce() or increase it by calling repartition() to manage the number of connections.
df.repartition(10).write.mode(SaveMode.Append).jdbc(jdbcUrl, "diamonds", connectionProperties)
Pythonのサンプル
以下のPythonサンプルでは、Scalaのサンプルと同じタスクのいくつかをカバーしています。
JDBC URLの作成
jdbcHostname = "<hostname>"
jdbcDatabase = "employees"
jdbcPort = 3306
jdbcUrl = "jdbc:mysql://{0}:{1}/{2}?user={3}&password={4}".format(jdbcHostname, jdbcPort, jdbcDatabase, username, password)
上述したScalaのサンプルと同様に、認証情報とドライバークラスを含むディクショナリーを指定することができます。
jdbcUrl = "jdbc:mysql://{0}:{1}/{2}".format(jdbcHostname, jdbcPort, jdbcDatabase)
connectionProperties = {
"user" : jdbcUsername,
"password" : jdbcPassword,
"driver" : "com.mysql.jdbc.Driver"
}
データベースエンジンへのクエリープッシュダウン
pushdown_query = "(select * from employees where emp_no < 10008) emp_alias"
df = spark.read.jdbc(url=jdbcUrl, table=pushdown_query, properties=connectionProperties)
display(df)
複数ワーカーでのJDBCコネクションからの読み込み
df = spark.read.jdbc(url=jdbcUrl, table="employees", column="emp_no", lowerBound=1, upperBound=100000, numPartitions=100)
display(df)
Spark SQLのサンプル
JDBCコネクションを使用するSpark SQLのテーブルやビューを定義するには、最初にJDBCテーブルをSparkのデータソース、一時ビューとして登録する必要があります。
詳細に関しては、以下を参照ください。
- Databricksランタイム 7.x以降:CREATE TABLE USING、CREATE VIEW
- Databricksランタイム 5.5 LTS、6.x:Create Table、Create View
CREATE TABLE <jdbcTable>
USING org.apache.spark.sql.jdbc
OPTIONS (
url "jdbc:<databaseServerType>://<jdbcHostname>:<jdbcPort>",
dbtable "<jdbcDatabase>.atable",
user "<jdbcUsername>",
password "<jdbcPassword>"
)
テーブルへのデータ追加
Spark SQLを用いてテーブルにデータを追加します。
INSERT INTO diamonds
SELECT * FROM diamonds LIMIT 10 -- append 10 records to the table
SELECT count(*) record_count FROM diamonds --count increased by 10
テーブルのデータの上書き
Spark SQLを用いてテーブルのデータを上書きます。これによって、データベースでdiamonds
テーブルのdrop、createが行われます。
INSERT OVERWRITE TABLE diamonds
SELECT carat, cut, color, clarity, depth, TABLE AS table_number, price, x, y, z FROM diamonds
SELECT count(*) record_count FROM diamonds --count returned to original value (10 less)
テーブルに対するビューの作成
Spark SQLを用いてテーブルに対するビューを作成します。
CREATE OR REPLACE VIEW pricey_diamonds
AS SELECT * FROM diamonds WHERE price > 5000;
データ読み取りにおけるパフォーマンスの最適化
外部のJDBCデータベースからデータを読み込む際に処理が遅い場合、このセクションで説明されているパフォーマンス改善のテクニックが役立つかもしれません。
JDBC読み取りが並列実行されていることを確認する
並列でデータを読み込むためには、外部データベースに対して同時に複数のクエリーを発行できるようにSpark JDBCデータソースが適切なパーティション情報で設定されている必要があります。パーティショニングを設定しないと、単一のJDBCクエリーを用いて全てのデータをドライバーにフェッチすることになり、ドライバーがOOM例外を引き起こすリスクが高まります。
パーティションが設定されない状態でのJDBCの読み込みのサンプルを以下に示します。
SQL databases using JDBC | Databricks on AWS
columnName
として引き渡されたカラムpartitionColumn
、範囲の終端(lowerBound
, upperBound
)、パーティションの最大数numPartitions
を用いてパーティションが設定されたJDBC読み込みのサンプルを示します。
JDBC fetchSize
パラメーターをチューニングする
JDBCドライバーには、リモートのJDBCデータベースから一度にフェッチする行数をコントロールするfetchSize
パラメーターがあります。この値を小さく設定してしまうと、全てのリザルトセットを取得するために外部データベースとSparkの間で大量のリクエストのやり取りが発生し、ワークロードがレーテンシーを引き起こす可能性が高まります。あまりに大きい値にするとOOM例外のリスクが高まります。最適値はワークロードに依存(結果のスキーマ、結果の文字列サイズなどに依存)しますが、デフォルト値から徐々に増やしていくことで、大きなパフォーマンス改善を行うことができます。
OracleのデフォルトのfetchSize
は10です。徐々に100まで増やすことで、劇的にパフォーマンスを改善することができ、以下のように2000のような大きな値に増やすことでさらに性能を改善することができます。
PreparedStatement stmt = null;
ResultSet rs = null;
try {
stmt = conn. prepareStatement("select a, b, c from table");
stmt.setFetchSize(100);
rs = stmt.executeQuery();
while (rs.next()) {
...
}
}
Oracle JDBCドライバーにおけるパラメーターチューニングの一般的な議論については、Make your java run fasterをご覧ください。
インデックスのインパクトを考慮する
(パーティションテクニックの一つを用いて)並列読み込みを行なっている際、SparkはJDBCデータベースに同時にクエリーを発行します。これらのクエリーがテーブルのフルスキャンを要求することになると、リモートデータベースにおけるボトルネックとなり、処理が非常に遅くなります。このため、パーティションカラムを選択する際にはインデックスのインパクトを考慮し、それぞれのパーティションのクエリーが適切活効率的に並列処理されるように、カラムを選択すべきです。
重要!
データベースにパーティションカラムのインデックスがあることを確認してください。
ソーステーブルにシングルカラムのインデックスが定義されていない場合、パーティションカラムとして、複合インデックスの先頭(左端)カラムを選択することができます。複合インデックスのみが利用できる場合、多くのデータベースは先頭カラムを用いた検索を行う際に結合インデックスを使用することができます。このため、マルチカラムインデックスの先頭カラムをパーティションカラムとして使用することができます。
パーティション数が適切かどうかを検討する
外部データベースからの読み込みにおいてあまりに多くのパーティションを使用すると、データベースがあまりに多くのクエリーで圧倒されてしまうリスクを抱えることになります。多くのDBMSシステムでは同時接続数の制限を設けています。スタート地点として、並列性を最大にしつつもクエリーの総数を合理的な制限に収めるように、お使いのクラスターのコア数、タスクスロット数に近い数のパーティションから始めます。JDBCのレコードをフェッチした後に高い並列性を必要とするが、データベースにあまりに多くの同軸エリーを発行したくないのであれば、JDBC読み込みにおいては、少ないnumPartitions
を使用し、Sparkで明示的にrepartition()
を実行することを検討してください。
データベース固有のチューニング技術を検討する
データベースベンダーがETL、バルクアクセスのワークロードにおけるパフォーマンスチューニングのガイドを提供している場合があります。