2
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

DatabricksとAzure Synapse Analyticsの連携

Last updated at Posted at 2021-04-16

Azure Synapse Analytics | Databricks on AWS [2021/3/18]の翻訳です。

Azure Synapse Analytics(以前のSQL Data Warehouse)は、ペタバイトのデータに対する複雑なクエリーを高速するために、大量並列処理(MPP)を活用するクラウドベースのエンタープライズデータウェアハウスです。Azureでのビッグデータにおけるキーコンポーネントとなっています。シンプルなPolyBaseのT-SQLあるいはCOPY文を用いてビッグデータをAzureにインポートし、MPPのパワーを活用して高性能の分析を実行します。統合、分析を進めていくことで、このデータウェアハウスが、ビジネス洞察を得る際に信頼できる単一の真実(the single version of truth)になることでしょう。

Azure Synapseコネクターを用いることで、DatabricksからAzure Synapseにアクセスすることができます。Azure Synapseコネクターは、Azure Blob storage、PolyBase、Azure SynapseにおけるCOPY文を使用するApache Sparkに対するデータソース実装であり、Azure SynapseインスタンスとDatabricksクラスター間で効率的に大量データを転送することができます。

二つのシステム間でデータをやり取りするために、DatabricksクラスターとAzure Synapseインスタンスは、共通のBlobストレージコンテナーにアクセスします。Databricksでは、Blobストレージコンテナーにデータを読み書きするために、Azure SynapseコネクターによってApache Sparkジョブが起動されます。Azure Synapse側では、JDBCを通じてAzure Synapseコネクターによって、PolyBaseによるデータのローディング、アンローディングの操作が起動されます。Databricksランタイム7.0以降では、JDBC経由でのAzure SynapseコネクターによるAzure Synapseへのデータロードには、デフォルトでCOPYが使用されます。

注意
より高い性能を提供するAzure Synapse Gen2インスタンスでのみ、COPYを利用できます。お持ちのデータベースがGen1インスタンスを使用しているのであれば、Gen2に移行することをお勧めします。

Azure Synapseコネクターは、インタラクティブなクエリーよりも、クエリー実行において大量のデータをBlobストレージから抽出するETLに適しています。同じAzure Syapseテーブルに繰り返しクエリーを実行する際には、抽出したデータをParquetなどのフォーマットに保存することをお勧めします。

要件

Azure Synapseに対するdatabase master keyが必要となります。

承認

Azure Synapseコネクターは3種類のネットワークコネクションを使用します。

  • SparkドライバーからAzure Synapseへの接続
  • SparkドライバーとエグゼキューターからAzureストレージアカウントへの接続
  • Azure SynapseからAzureストレージアカウントへの接続
                                 ┌─────────┐
      ┌─────────────────────────>│ STORAGE │<────────────────────────┐
      │   Storage acc key /      │ ACCOUNT │  Storage acc key /      │
      │   Managed Service ID /   └─────────┘  OAuth 2.0 /            │
      │                               │                              │
      │                               │                              │
      │                               │ Storage acc key /            │
      │                               │ OAuth 2.0 /                  │
      │                               │                              │
      v                               v                       ┌──────v────┐
┌──────────┐                      ┌──────────┐                │┌──────────┴┐
│ Synapse  │                      │  Spark   │                ││ Spark     │
│ Analytics│<────────────────────>│  Driver  │<───────────────>│ Executors │
└──────────┘  JDBC with           └──────────┘    Configured   └───────────┘
              username & password /                in Spark

以下の章では、それぞれのコネクションの認証設定方法を説明します。

SparkドライバーからAzure Synapseへの接続

Sparkドライバーは、ユーザー名とパスワードを伴うJDBC接続、あるいは認証のためのサービスプリンシパルを伴うOAuth 2.0で、Azure Synapseに接続することができます。

ユーザー名とパスワード

どちらの認証方式においても、Azureポータルで提供される接続用文字列を使用することをお勧めします。これによって、SparkドライバーとAzure Synapseインスタンスとの間でJDBC経由でやり取りされる全てのデータがSecure Sockets Layer (SSL)で暗号化されます。SSL暗号化が有効化されているかどうかは、接続用文字列にencrypt=trueがあることで確認できます。

SparkドライバーがAzure Synapseに到達できるようにするためには、AzureポータルからアクセスできるAzure Synapseサーバーのfirewallペインで、Allow access to Azure servicesONに設定します。この設定により、全てのAzure IPアドレスとAzureサブネットからの通信を許可し、SparkドライバーがAzure Synapseインスタンスに到達できるようになります。

サービスプリンシパルとOAuth 2.0

プレビュー
この機能はパブリックプレビューです。

サービスプリンシパルを用いることで、Azure Synapse Analyticsが背後にあるストレージアカウントへのアクセスを許可することができます。Azureストレージアカウントにアクセスするサービスプリンシパルの認証情報の使い方に関しては、Access Azure Data Lake Storage Gen2 using OAuth 2.0 with an Azure service principalを参照ください。コネクターに対するサービスプリンシパルによる承認を有効化するには、enableServicePrincipalAuthオプションをtrueに設定する必要があります。

Azure Synapse Analytics接続に対して、異なるサービスプリンシパルを用いることができます。Synapseにおける、ストレージアカウントに対するサービスプリンシパルの例を以下に示します。

ini

; Defining the Service Principal credentials for the Azure storage account
fs.azure.account.auth.type OAuth
fs.azure.account.oauth.provider.type org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider
fs.azure.account.oauth2.client.id <application-id>
fs.azure.account.oauth2.client.secret <service-credential>
fs.azure.account.oauth2.client.endpoint https://login.microsoftonline.com/<directory-id>/oauth2/token

; Defining a separate set of service principal credentials for Azure Synapse Analytics (If not defined, the connector will use the Azure storage account credentials)
spark.databricks.sqldw.jdbc.service.principal.client.id <application-id>
spark.databricks.sqldw.jdbc.service.principal.client.secret <service-credential>

Scala

// Defining the Service Principal credentials for the Azure storage account
spark.conf.set("fs.azure.account.auth.type", "OAuth")
spark.conf.set("fs.azure.account.oauth.provider.type",  "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
spark.conf.set("fs.azure.account.oauth2.client.id", "<application-id>")
spark.conf.set("fs.azure.account.oauth2.client.secret", "<service-credential>")
spark.conf.set("fs.azure.account.oauth2.client.endpoint", "https://login.microsoftonline.com/<directory-id>/oauth2/token")

// Defining a separate set of service principal credentials for Azure Synapse Analytics (If not defined, the connector will use the Azure storage account credentials)
spark.conf.set("spark.databricks.sqldw.jdbc.service.principal.client.id", "<application-id>")
spark.conf.set("spark.databricks.sqldw.jdbc.service.principal.client.secret", "<service-credential>")

Python

# Defining the service principal credentials for the Azure storage account
spark.conf.set("fs.azure.account.auth.type", "OAuth")
spark.conf.set("fs.azure.account.oauth.provider.type",  "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
spark.conf.set("fs.azure.account.oauth2.client.id", "<application-id>")
spark.conf.set("fs.azure.account.oauth2.client.secret", "<service-credential>")
spark.conf.set("fs.azure.account.oauth2.client.endpoint", "https://login.microsoftonline.com/<directory-id>/oauth2/token")

# Defining a separate set of service principal credentials for Azure Synapse Analytics (If not defined, the connector will use the Azure storage account credentials)
spark.conf.set("spark.databricks.sqldw.jdbc.service.principal.client.id", "<application-id>")
spark.conf.set("spark.databricks.sqldw.jdbc.service.principal.client.secret", "<service-credential>")

R

# Load SparkR
library(SparkR)
conf <- sparkR.callJMethod(sparkR.session(), "conf")

# Defining the service principal credentials for the Azure storage account
sparkR.callJMethod(conf, "set", "fs.azure.account.auth.type", "OAuth")
sparkR.callJMethod(conf, "set", "fs.azure.account.oauth.provider.type",  "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
sparkR.callJMethod(conf, "set", "fs.azure.account.oauth2.client.id", "<application-id>")
sparkR.callJMethod(conf, "set", "fs.azure.account.oauth2.client.secret", "<service-credential>")
sparkR.callJMethod(conf, "set", "fs.azure.account.oauth2.client.endpoint", "https://login.microsoftonline.com/<directory-id>/oauth2/token")

# Defining a separate set of service principal credentials for Azure Synapse Analytics (If not defined, the connector will use the Azure storage account credentials)
sparkR.callJMethod(conf, "set", "spark.databricks.sqldw.jdbc.service.principal.client.id", "<application-id>")
sparkR.callJMethod(conf, "set", "spark.databricks.sqldw.jdbc.service.principal.client.secret", "<service-credential>")

SparkドライバーとエグゼキューターからAzureストレージアカウントへの接続

Azureストレージコンテナーは、Azure Synapseに対してバルクデータを読み書きする際に、中間層として動作します。Sparkは、ビルトインコネクター:Azure Blob storageAzure Data Lake Storage (ADLS) Gen2の一つを用いてストレージコンテナーに接続します。Azure Data Lake Storage Gen1はサポートされておらず、SSLで暗号化されたHTTPアクセスのみが許可されます。このため、サポートされているURIスキーマはwasbsabfssです。

以下の認証方法を利用できます:

以下の例では、ストレージアカウントキーのアプローチにおける二つの方法を説明します。これはOAuth 2.0においても適用できます。

ノートブックセッションの設定(推奨)

このアプローチでは、コマンドを実行するノートブックに関連づけられたセッション設定にアカウントのアクセスキーが設定されます。この設定は、同じクラスターにアタッチされている他のノートブックには影響を与えません。sparkがノートブックで提供されるSparkSessionオブジェクトとなります。

Python

spark.conf.set(
  "fs.azure.account.key.<your-storage-account-name>.blob.core.windows.net",
  "<your-storage-account-access-key>")

グローバルHadoop設定

このアプローチでは、全てのノートブックで共有されるSparkContextに関連づけられるHadoop設定を更新します。

Scala

sc.hadoopConfiguration.set(
  "fs.azure.account.key.<your-storage-account-name>.blob.core.windows.net",
  "<your-storage-account-access-key>")

Python

hadoopConfigurationは全てのバージョンのPySparkでは外部に公開されていません。以下のコマンドはある意味Sparkの内部機構に依存しています。このため、これは全てのPySparkで動作するはずですが、動作が将来的に保証されている訳ではありません。

sc._jsc.hadoopConfiguration().set(
  "fs.azure.account.key.<your-storage-account-name>.blob.core.windows.net",
  "<your-storage-account-access-key>")

Azure SynapseからAzureストレージアカウントへの接続

Azure Synapseは一時データのロード、アンロードの際にもストレージアカウントに接続します。

ストレージアカウントに対するアカウントキーとシークレットを設定した場合には、forwardSparkAzureStorageCredentialstrueに設定することができます。これにより、Azure Synapseはノートブックのセッション設定あるいはグローバルHadoop設定にあるアカウントアクセスキーとシークレットを探し出し、ストレージアカウントのアクセスキーを転送し、Azureのdatabase scoped credentialを作成することで、Azure Synapseインスタンスに接続できるようになります。

ADLS Gen2 + OAuth 2.0認証を使用している場合、Azure SynapseインスタンスがManaged Service Identity(多くの場合、VNet + Service Endpoints setupの組み合わせ)を持つように設定されている場合には、useAzureMSI trueに設定する必要があります。この場合、コネクターは、データベーススコープの認証情報にIDENTITY = 'Managed Service Identity'を設定し、SECRETは設定しません。

ストリーミングのサポート

Azure Synapseインスタンス、Databricksクラスター間で大量データをやり取りするためにPolyBaseやCOPYを用いて一貫性のあるユーザーエクスペリエンスを提供するAzure Synpseに対して、Azure Synapseコネクターは、効率的かつスケーラブルな構造化ストリーミングによる書き込みもサポートしています。バッチによる書き込みと同様、ストリーミングは主にETL向けに設定されており遅延が大きくなるケースがあり、リアルタイムデータ処理には適さないケースがあります。

フォールトトレランス

デフォルトでは、Azure Synapseストリーミングは、エンドツーエンドでexactly-onceの書き込みを保証します。これには、DBFSに格納されるチェックポイントとAzure Synapseテーブルに格納されるチェックポイント、ロック機構を組み合わせて、あらゆる種類の失敗、リトライ、クエリーの再起動をハンドリングできるようにしています。オプションで、spark.databricks.sqldw.streaming.exactlyOnce.enabledfalseに設定することで、Azure Synapseストリーミングのat-least-onceの制限を緩和することができます。この場合、Azure Synapseに対する切断、予期せぬクエリーの停止などで、データの重複が起こり得ます。

利用方法(バッチ)

Scala、Python、SQL、RのノートブックからデータソースAPIを通じてコネクターを利用することができます。

Scala

// Otherwise, set up the Blob storage account access key in the notebook session conf.
spark.conf.set(
  "fs.azure.account.key.<your-storage-account-name>.blob.core.windows.net",
  "<your-storage-account-access-key>")

// Get some data from an Azure Synapse table.
val df: DataFrame = spark.read
  .format("com.databricks.spark.sqldw")
  .option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>")
  .option("tempDir", "wasbs://<your-container-name>@<your-storage-account-name>.blob.core.windows.net/<your-directory-name>")
  .option("forwardSparkAzureStorageCredentials", "true")
  .option("dbTable", "<your-table-name>")
  .load()

// Load data from an Azure Synapse query.
val df: DataFrame = spark.read
  .format("com.databricks.spark.sqldw")
  .option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>")
  .option("tempDir", "wasbs://<your-container-name>@<your-storage-account-name>.blob.core.windows.net/<your-directory-name>")
  .option("forwardSparkAzureStorageCredentials", "true")
  .option("query", "select x, count(*) as cnt from table group by x")
  .load()

// Apply some transformations to the data, then use the
// Data Source API to write the data back to another table in Azure Synapse.

df.write
  .format("com.databricks.spark.sqldw")
  .option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>")
  .option("forwardSparkAzureStorageCredentials", "true")
  .option("dbTable", "<your-table-name>")
  .option("tempDir", "wasbs://<your-container-name>@<your-storage-account-name>.blob.core.windows.net/<your-directory-name>")
  .save()

Python


# Otherwise, set up the Blob storage account access key in the notebook session conf.
spark.conf.set(
  "fs.azure.account.key.<your-storage-account-name>.blob.core.windows.net",
  "<your-storage-account-access-key>")

# Get some data from an Azure Synapse table.
df = spark.read \
  .format("com.databricks.spark.sqldw") \
  .option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>") \
  .option("tempDir", "wasbs://<your-container-name>@<your-storage-account-name>.blob.core.windows.net/<your-directory-name>") \
  .option("forwardSparkAzureStorageCredentials", "true") \
  .option("dbTable", "<your-table-name>") \
  .load()

# Load data from an Azure Synapse query.
df = spark.read \
  .format("com.databricks.spark.sqldw") \
  .option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>") \
  .option("tempDir", "wasbs://<your-container-name>@<your-storage-account-name>.blob.core.windows.net/<your-directory-name>") \
  .option("forwardSparkAzureStorageCredentials", "true") \
  .option("query", "select x, count(*) as cnt from table group by x") \
  .load()

# Apply some transformations to the data, then use the
# Data Source API to write the data back to another table in Azure Synapse.

df.write \
  .format("com.databricks.spark.sqldw") \
  .option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>") \
  .option("forwardSparkAzureStorageCredentials", "true") \
  .option("dbTable", "<your-table-name>") \
  .option("tempDir", "wasbs://<your-container-name>@<your-storage-account-name>.blob.core.windows.net/<your-directory-name>") \
  .save()

SQL

-- Otherwise, set up the Blob storage account access key in the notebook session conf.
SET fs.azure.account.key.<your-storage-account-name>.blob.core.windows.net=<your-storage-account-access-key>;

-- Read data using SQL.
CREATE TABLE example_table_in_spark_read
USING com.databricks.spark.sqldw
OPTIONS (
  url 'jdbc:sqlserver://<the-rest-of-the-connection-string>',
  forwardSparkAzureStorageCredentials 'true',
  dbTable '<your-table-name>',
  tempDir 'wasbs://<your-container-name>@<your-storage-account-name>.blob.core.windows.net/<your-directory-name>'
);

-- Write data using SQL.
-- Create a new table, throwing an error if a table with the same name already exists:

CREATE TABLE example_table_in_spark_write
USING com.databricks.spark.sqldw
OPTIONS (
  url 'jdbc:sqlserver://<the-rest-of-the-connection-string>',
  forwardSparkAzureStorageCredentials 'true',
  dbTable '<your-table-name>',
  tempDir 'wasbs://<your-container-name>@<your-storage-account-name>.blob.core.windows.net/<your-directory-name>'
)
AS SELECT * FROM table_to_save_in_spark;

R

# Load SparkR
library(SparkR)

# Otherwise, set up the Blob storage account access key in the notebook session conf.
conf <- sparkR.callJMethod(sparkR.session(), "conf")
sparkR.callJMethod(conf, "set", "fs.azure.account.key.<your-storage-account-name>.blob.core.windows.net", "<your-storage-account-access-key>")

# Get some data from an Azure Synapse table.
df <- read.df(
   source = "com.databricks.spark.sqldw",
   url = "jdbc:sqlserver://<the-rest-of-the-connection-string>",
   forward_spark_azure_storage_credentials = "true",
   dbTable = "<your-table-name>",
   tempDir = "wasbs://<your-container-name>@<your-storage-account-name>.blob.core.windows.net/<your-directory-name>")

# Load data from an Azure Synapse query.
df <- read.df(
   source = "com.databricks.spark.sqldw",
   url = "jdbc:sqlserver://<the-rest-of-the-connection-string>",
   forward_spark_azure_storage_credentials = "true",
   query = "select x, count(*) as cnt from table group by x",
   tempDir = "wasbs://<your-container-name>@<your-storage-account-name>.blob.core.windows.net/<your-directory-name>")

# Apply some transformations to the data, then use the
# Data Source API to write the data back to another table in Azure Synapse.

write.df(
  df,
  source = "com.databricks.spark.sqldw",
  url = "jdbc:sqlserver://<the-rest-of-the-connection-string>",
  forward_spark_azure_storage_credentials = "true",
  dbTable = "<your-table-name>",
  tempDir = "wasbs://<your-container-name>@<your-storage-account-name>.blob.core.windows.net/<your-directory-name>")

利用方法(ストリーミング)

Scala、Pythonノートブックから構造化ストリーミングを用いてデータを書き込むことができます。

Scala

// Set up the Blob storage account access key in the notebook session conf.
spark.conf.set(
  "fs.azure.account.key.<your-storage-account-name>.blob.core.windows.net",
  "<your-storage-account-access-key>")

// Prepare streaming source; this could be Kafka or a simple rate stream.
val df: DataFrame = spark.readStream
  .format("rate")
  .option("rowsPerSecond", "100000")
  .option("numPartitions", "16")
  .load()

// Apply some transformations to the data then use
// Structured Streaming API to continuously write the data to a table in Azure Synapse.

df.writeStream
  .format("com.databricks.spark.sqldw")
  .option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>")
  .option("tempDir", "wasbs://<your-container-name>@<your-storage-account-name>.blob.core.windows.net/<your-directory-name>")
  .option("forwardSparkAzureStorageCredentials", "true")
  .option("dbTable", "<your-table-name>")
  .option("checkpointLocation", "/tmp_checkpoint_location")
  .start()

Python

# Set up the Blob storage account access key in the notebook session conf.
spark.conf.set(
  "fs.azure.account.key.<your-storage-account-name>.blob.core.windows.net",
  "<your-storage-account-access-key>")

# Prepare streaming source; this could be Kafka or a simple rate stream.
df = spark.readStream \
  .format("rate") \
  .option("rowsPerSecond", "100000") \
  .option("numPartitions", "16") \
  .load()

# Apply some transformations to the data then use
# Structured Streaming API to continuously write the data to a table in Azure Synapse.

df.writeStream \
  .format("com.databricks.spark.sqldw") \
  .option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>") \
  .option("tempDir", "wasbs://<your-container-name>@<your-storage-account-name>.blob.core.windows.net/<your-directory-name>") \
  .option("forwardSparkAzureStorageCredentials", "true") \
  .option("dbTable", "<your-table-name>") \
  .option("checkpointLocation", "/tmp_checkpoint_location") \
  .start()

設定

本章では、コネクターに対する書き込み設定、要求されるアクセス権、その他の設定パラメーターをご説明します。

バッチ書き込みにおけるセーブモード

Azure Synapseコネクターは、ErrorIfExistsIgnoreAppendOverwriteセーブモードをサポートしており、デフォルトはErrorIfExistsです。詳細はSpark SQL documentation on Save Modesを参照ください。

ストリーミング書き込みにおけるアウトプットモード

Azure Synapseコネクターは、レコード追加と集計のためのAppendCompleteのアウトプットモードをサポートしています。アウトプットモードと互換性に関する詳細は、Structured Streaming guideを参照ください。

書き込みのセマンティクス

PolyBaseに加えて、Azure SynapseコネクターはCOPY文をサポートしています。COPY文は、外部テーブルを作成することなしにデータロードが可能で、データロードに必要な権限が少なくてすみ、Azure Synapseに対して高速なデータ投入を可能とする便利な方法を提供します。

デフォルトでは、コネクターは自動で適切な書き込みセマンティクスを検出します。しかし、以下の設定で書き込みセマンティクスを強制することができます。

Scala

// Configure the write semantics for Azure Synapse connector in the notebook session conf.
spark.conf.set("spark.databricks.sqldw.writeSemantics", "<write-semantics>")

Python

# Configure the write semantics for Azure Synapse connector in the notebook session conf.
spark.conf.set("spark.databricks.sqldw.writeSemantics", "<write-semantics>")

SQL

-- Configure the write semantics for Azure Synapse connector in the notebook session conf.
SET spark.databricks.sqldw.writeSemantics=<write-semantics>;

R

# Load SparkR
library(SparkR)

# Configure the write semantics for Azure Synapse connector in the notebook session conf.
conf <- sparkR.callJMethod(sparkR.session(), "conf")
sparkR.callJMethod(conf, "set", "spark.databricks.sqldw.writeSemantics", "<write-semantics>")

ここで、<write-semantics>は、

  • polybase (Azure SynapseへのデータロードにPolyBaseを使用)
  • copy (Databricksランタイム7.0以降で、データロードにCOPYを使用)
  • 指定なし (デフォルト設定に従う:Databricksランタイム7.0以降でADLS Gen2であればCOPY、そうでない場合はpolybase)

PolyBaseに必要なAzure Synapseの権限

PolyBaseを使用する際、Azure Synapseコネクターは、接続先のAzure Synapseインスタンスで以下のコマンドを実行できるユーザーによるJDBC接続を必要とします。

最初のコマンドを実行する前に、既にデータベースマスターキーが特定のAzure Synapseインスタンスに存在している必要があります。そうでない場合には、CREATE MASTER KEYコマンドでキーを作成する必要があります。

加えて、dbTableを通じて設定された、あるいは、queryで参照されるAzure Synapseテーブルを読み込むには、JDBCユーザーは、当該のAzure Synapseテーブルに対する権限を有している必要があります。dbTableを通じてAzure Synapseテーブルにデータを書き戻す場合には、JDBCユーザーはAzure Synapseテーブルに対する書き込み権限を有している必要があります。

以下の表にPolyBaseの全ての操作に関わる権限をまとめています。

オペレーション 権限
バッチ書き込み CONTROL
ストリーミング書き込み CONTROL
読み取り CONTROL

COPYに必要なAzure Synapseの権限

注意
本機能はDatabricksランタイム7.0以上で利用できます。

COPY文を使用する際には、Azure Synapseコネクターは、接続先のAzure Synapseインスタンスで以下のコマンドを実行できるユーザーによるJDBC接続を必要とします。

Azure Synapseにターゲットのテーブルが存在しない場合、上のコマンドの実行権限に加えて、以下のテーブルに記載されている権限が必要となります。

以下のテーブルに、COPYで行うバッチ処理、ストリーミング処理に必要な権限をまとめています。

オペレーション 権限(存在するテーブルへのインサート) 権限(新規テーブルへのインサート)
バッチ書き込み ADMINISTER DATABASE BULK OPERATIONS

INSERT
ADMINISTER DATABASE BULKOPERATIONS

INSERT

CREATE TABLE

ALTER ON SCHEMA :: dbo
ストリーミング書き込み ADMINISTER DATABASE BULK OPERATIONS

INSERT
ADMINISTER DATABASE BULKOPERATIONS

INSERT

CREATE TABLE

ALTER ON SCHEMA :: dbo

パラメーター

パラメーター、Spark SQLにおけるOPTIONで設定可能な内容についてはこちらを参照ください。

Azure Synapseへのクエリープッシュダウン

Azure Synapseコネクターは、以下のオペレーターをAzure Synapseにプッシュダウンする最適化ルールを実装しています。

  • Filter
  • Project
  • Limit

ProjectFilterオペレーターは以下の表現をサポートしています。

  • 多くのブールロジックオペレーター
  • 比較
  • 基礎的な計算オペレーション
  • 数値、文字列のキャスト

Limitオペレーターに対しては、ORDERが指定されない場合にのみプッシュダウンがサポートされます。例えば、

SELECT TOP(10) * FROM tableではプッシュダウンが行われますが、SELECT TOP(10) * FROM table ORDER BY colではプッシュダウンが行われません。

注意
Azure Synapseコネクターは、文字列、日付、タイムスタンプに対する条件はプッシュダウンしません。

Azure Synapseコネクターで実装されるクエリープッシュダウンはデフォルトで有効化されています。spark.databricks.sqldw.pushdownfalseに設定することで無効化することができます。

テンポラリーデータの管理

Azure Synapseコネクターは、Blobストレージコンテナーに作成されるテンポラリーファイルを削除しません。このため、ユーザーが指定したtempDir配下にある一時ファイルを定期的に削除することをお勧めします。

データのクリーンアップを容易にするために、Azure SynapseコネクターはtempDir直下にデータファイルを格納しません。代わりに<tempDir>/<yyyy-MM-dd>/<HH-mm-ss-SSS>/<randomUUID>/の形式でサブディレクトリを作成します。定期的ジョブ(Databricksのジョブ機能などを活用)をセットアップして、特定の期間(例えば2日)を経過した古いサブディレクトリを削除することができます。

よりシンプルな代替案としては、定期的にコンテナー全体を削除して、同じ名前のコンテナーを再作成するというものです。これには、Azure Synapseコネクターが生成する一時データ専用のコンテナーと、コネクターが関わるクエリーが実行されていないことを保証できる時間帯が必要となります。

テンポラリーオブジェクトの管理

Azure Synapseコネクターは、Azure SynapseインスタンスとDatabricksクラスター間のデータ転送を自動化します。Azure Synapseテーブルからデータを読み込んだり、Azures Synapseテーブルに書き込んだりする際には、Azure Synapseコネクターは、DATABASE SCOPED CREDENTIALEXTERNAL DATA SOURCEEXTERNAL FILE FORMATEXTERNAL TABLEを含むテンポラリーオブジェクトを生成します。これらのオブジェクトは、対応するSparkジョブが存在している間のみ生存し、その後は自動的に削除されます。

Azure Synapseコネクターを用いたクエリーがクラスターで実行される際、Sparkドライバーがクラッシュ、あるいは、強制的に再起動された場合、または、クラスターが強制的に停止、再起動された場合、テンポラリーオブジェクトは削除されずに残ってしまう場合があります。この場合に、手動での削除を容易にするために、Azure Synapseコネクターは、Azure Synapseインスタンスで作成された全ての中間一時オブジェクトに対して、tmp_<yyyy_MM_dd_HH_mm_ss_SSS>_<randomUUID>_の形式で名前を付けます。

定期的に以下のようなクエリーを実行することで、削除されていないオブジェクトがないか確認することをお勧めします。

  • SELECT * FROM sys.database_scoped_credentials WHERE name LIKE 'tmp_databricks_%'
  • SELECT * FROM sys.external_data_sources WHERE name LIKE 'tmp_databricks_%'
  • SELECT * FROM sys.external_file_formats WHERE name LIKE 'tmp_databricks_%'
  • SELECT * FROM sys.external_tables WHERE name LIKE 'tmp_databricks_%'

ストリーミングチェックポイントテーブルの管理

Azure Synapseコネクターは、新たなストリーミングクエリーが開始した際に作成されるストリーミングチェックポイントテーブルを削除しません。これはDBFSにおけるcheckpointLocationの挙動と同じです。このため、定期的に将来的に実行する予定が無いクエリーのDBFS上のチェックポイントを削除するのと同じタイミングでチェックポイントテーブルを削除することをお勧めします。

デフォルトでは、チェックポイントテーブルの名称は<prefix>_<query_id>となっています。<prefix>は設定することができ、デフォルトはdatabricks_streaming_checkpointとなっています。query_idはストリーミングクエリーIDから_を除外したものです。古い、あるいは削除されたストリーミングクエリーに対するチェックポイントテーブルを検索するには、以下のクエリーを実行します。

SQL

SELECT * FROM sys.tables WHERE name LIKE 'databricks_streaming_checkpoint%'

Spark SQLの設定オプションspark.databricks.sqldw.streaming.exactlyOnce.checkpointTableNamePrefixでプレフィクスを変更することができます。

FAQ

Azure Synapseコネクターを使っている時にエラーが発生しました。どうすればエラーがAzure Synapseによるものなのか、Databricksによるものなのかを識別できますか?

エラーのデバッグを容易にするために、Azure Synapseコネクター特有のコードからスローされた全ての例外は、SqlDWExceptionトレイトを拡張した例外でラップされています。また、以下のように例外を識別することが可能です。

  • SqlDWConnectorException Azure Synapseコネクターによってスローされた例外
  • SqlDWSideException 接続先のAzure Synapseインスタンスによってスローされた例外

“No access key found in the session conf or the global Hadoop conf”というエラーが発生してクエリーに失敗した時にはどうすればいいですか?

このエラーは、tempDirで指定されたストレージアカウントに対するアクセキーが、ノートブックのセッション設定やHadoop設定に見つからなかったことを意味します。どのようにストレージアカウントへのアクセスを設定するのかに関しては、利用方法(バッチ)を参照ください。Sparkテーブルが、Azure Synapseコネクターで作成された場合には、Sparkテーブルに読み書きするためにストレージアカウントの認証情報を提供する必要があります。

tempDirに指定されたBlobストレージコンテナーにアクセスするために、Shared Access Signature (SAS)を使用することはできますか?

Azure SynapseはSASを用いたBlobストレージへのアクセスをサポートしていません。このため、Azure SynapseコネクターはtempDirで指定されたBlobストレージコンテナーにアクセスするためにSASを使用することはできません。

dbTableオプションを指定してAzure SynapseコネクターでSparkテーブルを作成し、Sparkテーブルに書き込みを行い、Sparkテーブルを削除しました。この場合、Azure Synapse側のテーブルも削除されますか?

いいえ。Azure Synapseは外部データソースと看做されます。dbTableを通じて設定されたAzure Synapseテーブルは、Sparkテーブルが削除されても削除されません。

Azure Synapseにデータフレームを書き込む際に、.saveAsTable(tableName)ではなく.option("dbTable", tableName).save()を指定しなくてはならないのですか?

以下の区別を明確にするためです:

  • .option("dbTable", tableName)はデータベース(Azure Synapse)のテーブルを参照します。
  • .saveAsTable(tableName)はSparkテーブルを参照します。

実際のところは、df.write. ... .option("dbTable", tableNameDW).saveAsTable(tableNameSpark)のように二つを組み合わせることができます。これによって、Azure SynapseにはtableNameDWを作成し、Sparkの外部テーブルには、Azure SynapseにバックアップがあるtableNameSpark を作成します。

警告!
以下の.save().saveAsTable()の違いに注意してください。

  • df.write. ... .option("dbTable", tableNameDW).mode(writeMode).save()においては、writeMode は予想通りAzure Synapseテーブルに対して作用します。
  • df.write. ... .option("dbTable", tableNameDW).mode(writeMode).saveAsTable(tableNameSpark)においては、writeModeはSparkテーブルに作用します。tableNameDWがAzure Synapseに存在する際には、サイレントで上書きされます。

この振る舞いは、異なるデータソースに対して書き込みを行う際にも同様です。これは、SparkのDataFrameWriter APIを使用する際のちょっとしたTipsとなります。

Databricks 無料トライアル

Databricks 無料トライアル

2
2
1

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?