DatabricksとAzure Synapse Analyticsの連携 - Qiitaで説明した内容を実際に実行した結果をまとめています。
Synapse Workspaceの作成
ここでは、チュートリアル:Azure Synapse Analytics の使用を開始する - Azure Synapse Analytics | Microsoft Docsを参考にして、ダミーのワークスペースを作成しています。既存のワークスペースが存在する場合には、そちらを参照してください。
ここでは以下の設定を行なっています。
項目 | 設定値 |
---|---|
workspace名 | taka-workspace |
ストレージアカウント名 | takaaccount |
ストレージコンテナー名 | users |
専用のSQLプールを作成
ここでは「mysqlpool」と言うSQLプールを作成しています。
Synapseワークスペースの設定の確認
ファイアウォール設定の確認
DatabricksからSynapseにアクセスできるようにするためには、Synapseのファイアウォールの設定で「Azure サービスおよびリソースに、このワークスペースへのアクセスを許可する」がオンになっていることを確認します。
SQL 管理ユーザー名の確認
ワークスペースの概要に表示される「SQL 管理ユーザー名」をメモしておいてください。また、パスワードもご確認ください。これらはSynapse接続時に指定する必要があります。
SQLプールのマスターパスワードが作成されていることを確認
DatabricksからSQLプールにアクセスして操作を行う場合には、当該SQLプールでマスターパスワードが作成されている必要があります。作成されていない場合には、当該SQLプールで以下のSQLを実行してください。
ストレージアカウントのアクセスキーを設定
ストレージアカウントのアクセスキーは、ホーム > ストレージアカウントで「キーの表示」をクリックし、表示されるKey1を指定します。
以下のPythonの例ではアクセスキーやパスワードを平文で記載していますが、本運用の際にはシークレットの活用をご検討ください。
参考資料
storage_account_key = "<<ストレージアカウントのアクセスキー>>"
spark.conf.set("fs.azure.account.key.takaaccount.blob.core.windows.net", storage_account_key)
JDBCユーザー名とパスワードで接続しデータを読み込む
-
hostname
Workspace SQL endpointを指定 -
database
SQL pool名を指定 -
dbuser
SQL 管理ユーザー名をユーザー名@ワークスペース名
の形式で指定 -
dbpassword
= SQL 管理ユーザーのパスワードを指定 -
storage_account
Workspace作成時に指定したストレージアカウント名を指定 -
container_name
= Workspace作成時に指定したストレージコンテナー名を指定 -
table_name
= 読み取るテーブル名を指定
hostname = "taka-workspace.sql.azuresynapse.net"
database = "mysqlpool"
dbuser = "sqladminuser@taka-workspace"
dbpassword = "<<SQL管理ユーザーのパスワード>>"
storage_account = "takaaccount"
container_name = "users"
table_name = "NYCTaxiTripSmall"
# Get some data from an Azure Synapse table.
df = spark.read \
.format("com.databricks.spark.sqldw") \
.option("url", "jdbc:sqlserver://{0}:1433;database={1};user={2};password={3};trustServerCertificate=false;hostNameInCertificate=*.sql.azuresynapse.net;loginTimeout=30;".format(hostname, database, dbuser, dbpassword)) \
.option("tempDir", "wasbs://{0}@{1}.blob.core.windows.net/tempdir".format(container_name, storage_account)) \
.option("forwardSparkAzureStorageCredentials", "true") \
.option("dbTable", table_name) \
.load()
display(df)
Synapse上にテーブルを作成する
ここでは、Databricksデータセットに格納されているダイアモンドのデータセットをSynapse上に作成します。
dataFrame = "/databricks-datasets/Rdatasets/data-001/csv/ggplot2/diamonds.csv"
diamonds = spark.read.format("csv").option("header","true")\
.option("inferSchema", "true").load(dataFrame)
display(diamonds)
書き込みの際にCOPY文を使用するように強制します。
Azure SynapseコネクターはCOPY文をサポートしています。COPY文は、外部テーブルを作成することなしにデータロードが可能で、データロードに必要な権限が少なくてすみ、Azure Synapseに対して高速なデータ投入を可能とする便利な方法を提供します。
# COPY文の使用を強制
spark.conf.set("spark.databricks.sqldw.writeSemantics", "copy")
# 書き込み先のテーブル名
write_table_name = "diamonds"
diamonds.write \
.format("com.databricks.spark.sqldw") \
.option("url", "jdbc:sqlserver://{0}:1433;database={1};user={2};password={3};trustServerCertificate=false;hostNameInCertificate=*.sql.azuresynapse.net;loginTimeout=30;".format(hostname, database, dbuser, dbpassword)) \
.option("tempDir", "wasbs://{0}@{1}.blob.core.windows.net/tempdir".format(container_name, storage_account)) \
.option("forwardSparkAzureStorageCredentials", "true") \
.option("dbTable", write_table_name) \
.save()
Synapse Analytics側でデータを確認します。