概要
本記事では、DatabricksのPythonノートブックを使用して、追加コンポーネントを必要とせずにAzure SQL Databaseに対してScalaでSQLを実行する方法を紹介します。以前にPython にて databricks から Azure SQL Database を操作する方法についての記事を共有しましたが、継続的な利用にリスクがあることや追加のコンポーネントが必要となるなどのデメリットがありました。そこで、PythonのノートブックでAzure SQL Databaseへの操作をデフォルトでインストールされているAzure SQL Databaseのjdbc (mssql-jdbc)を使用してScalaで実行する方法を検証しました。
ノートブック上で複数の言語を使用する際には、マジックコマンド(%python
や%scala
)を指定することで実行できますが、変数の値の受け渡し方法を検討する必要があります。Databricksで利用できる言語であれば、Sparkと接続することができ、その設定値(spark.conf
)により変数値の受け渡しが可能です。Pytho nでspark.conf.set
を使用してScalaに渡す変数値を定義し、Scalaでspark.conf.get
を使用してその値を取得します。パスワードなどのシークレットは、Databricksシークレット(dbutils.secrets.get
)を使用してそれぞれの言語で取得することが望ましいです。
本記事では、検証時のコードと実行結果を共有します。
検証コードと実行結果
事前準備
Azure SQL Database 上に検証用のテーブルを作成します。
server = "ads-01" ## サーバー名
database = "ads-01" ## データベース名
username = dbutils.secrets.get(scope="qiita", key="sql_authentificate_user") ## Azure SQL DB のユーザー名
password = dbutils.secrets.get(scope="qiita", key="sql_authentificate_password") ## Azure SQL DB のパスワード
# CREATE TABLE の実行
sql = """
IF EXISTS (SELECT * FROM sys.objects WHERE object_id = OBJECT_ID(N'[dbo].[create_tble_01]') AND type in (N'U'))
DROP TABLE [dbo].[create_tble_01]
;
CREATE TABLE dbo.create_tble_01
(
str_col varchar(100) COLLATE Japanese_CI_AS
)
;
"""
df = (
spark.read.format("sqlserver")
.option("host", f"{server}.database.windows.net")
.option("user", username)
.option("password", password)
.option("database", database)
.option("prepareQuery", sql)
.option("dbTable", "dbo.create_tble_01")
.load()
)
df.display()
Python から Scala に渡す変数を定義
# Scala に渡す変数を定義
spark.conf.set("PyToScaal.server", server)
spark.conf.set("PyToScaal.database", database)
sql= """
INSERT INTO dbo.create_tble_01
SELECT 'abc' as str_col
UNION ALL
SELECT N'あいう' as str_col
UNION ALL
SELECT NULL as str_col
;
"""
spark.conf.set("PyToScaal.executed_sql", sql)
Scala にて Azure SQL Database に対する SQL を実行
%scala
import java.sql.DriverManager
import java.sql.Connection
var connection: Connection = null
try {
val executed_sql = spark.conf.get("PyToScaal.executed_sql")
// Get jdbcUsername from dbutils secrets
val server = spark.conf.get("PyToScaal.server")
val database = spark.conf.get("PyToScaal.database")
val jdbcUsername = dbutils.secrets.get(scope="qiita", key="sql_authentificate_user")
val jdbcPassword = dbutils.secrets.get(scope="qiita", key="sql_authentificate_password")
val jdbcUrl = s"""jdbc:sqlserver://${server}.database.windows.net:1433;
|databaseName=${database};
|encrypt=true;
|trustServerCertificate=false;
|hostNameInCertificate=*.database.windows.net;
|loginTimeout=300""".stripMargin.replaceAll("\n", "")
connection = DriverManager.getConnection(jdbcUrl, jdbcUsername, jdbcPassword)
val statement = connection.createStatement()
statement.execute(executed_sql)
} finally {
if (connection != null) {
connection.close()
}
}
データを確認
# データを確認
df = (
spark.read.format("sqlserver")
.option("host", f"{server}.database.windows.net")
.option("user", username)
.option("password", password)
.option("database", database)
.option("dbTable", "[dbo].[create_tble_01]")
.load()
)
df.display()
検証用に作成したテーブルを Drop
# DROP TABLE の実行
sql = """
IF EXISTS (SELECT * FROM sys.objects WHERE object_id = OBJECT_ID(N'[dbo].[create_tble_01]') AND type in (N'U'))
DROP TABLE [dbo].[create_tble_01]
;
"""
sql_db_table_01 = (
spark.read.format("sqlserver")
.option("host", f"{server}.database.windows.net")
.option("user", username)
.option("password", password)
.option("database", database)
.option("prepareQuery", sql)
.option("dbTable", "(select 1 as test) src")
.load()
)