LoginSignup
2
1

More than 3 years have passed since last update.

Azure Data Factory でAzure Databricks Delta Lakeをソース、シンクにする際の利用方法と挙動の考察

Last updated at Posted at 2020-12-14

Azure Data Factory でAzure Databricks Delta Lakeをソース、シンクにする際の利用方法と挙動の考察

はじめに

Azure Data Factory (以下、ADF)で Azure Databrikcsで作成したDelta Lake テーブルをソース、シンクにする際の利用方法について、注意ポイントや、データアクセスの流れを考察します。

Azure Databricks Delta Lake との間でデータをコピーする - Azure Data Factory | Microsoft Docs

ADFはMapping DataflowというSpark環境からDelta Lakeのデータにアクセス可能ですが、今回の記事では対象外

(2020/12時点の情報です。)

Delta Lakeについて参考リンク

今注目のストレージ層技術で、データレイク中心のデータ基盤でデータの管理・統合を行うためのAPIが提供されるフォーマットです。
※ストレージをDBMSのように扱える。

Delta Lake - 次世代型データレイク・データウェアハウス - Databricks

Delta Lake概要

Delta Lakeを含む、データレイク上のSW技術に関してこちらを一読するのをおすすめ

大規模データ活用向けストレージレイヤソフトのこれまでとこれから(NTTデータ テクノロジーカンファレンス 2019 講演資料、2019/09/05)

ADF Copyの前提条件から考えるデータアクセス

以下のような記載になっており、ADFのDelta LakeコネクタでのCopyはAzure IRではなく、Azure Databrikcsがソースシンクに直接アクセスして実行されることがわかります。

この Azure Databricks Delta Lake コネクタを使用するには、Azure Databricks でクラスターを設定する必要があります。

  • Delta Lake にデータをコピーする場合、コピー アクティビティは Azure Databricks クラスターを呼び出して、Azure Storage からデータを読み取ります。これは元のソースまたはステージング領域で、Data Factory が組み込みのステージング コピーを介してソース データを最初に書き込みます。 詳細については、「ソースとしての Delta Lake」を参照してください。

  • 同様に、Delta Lake からデータをコピーする場合、コピー アクティビティは Azure Databricks クラスターを呼び出して、Azure Storage にデータを書き込みます。これは元のシンクまたはステージング領域で、Data Factory が引き続き組み込みのステージング コピーを介して最終的なシンクにデータを書き込みます。 詳細については、「シンクとしての Delta Lake」を参照してください。

Databricks クラスターは、Azure Blob または Azure Data Lake Storage Gen2 アカウントにアクセスできる必要があります。これは、ソース/シンク/ステージングに使用されるストレージ コンテナー/ファイル システムと、Data Lake テーブルを書き込むコンテナー/ファイル システムの両方です。

  • Azure Data Lake Storage Gen2 を使用するには、Apache Spark 構成の一部として、Databricks クラスターで サービス プリンシパル または ストレージ アカウント アクセス キー を構成します。 「サービス プリンシパルを使用した直接アクセス」または「ストレージ アカウント アクセス キーを使用した直接アクセス」の手順に従います。

  • Azure Blob Storage を使用するには、Apache Spark 構成の一部として、Databricks クラスターで ストレージ アカウント アクセス キー または SAS トークン を構成します。 「RDD API を使用した Azure Blob Storage へのアクセス」の手順に従います。

コピー アクティビティの実行中、構成したクラスターが終了した場合は、Data Factory によって自動的に開始されます。 Data Factory オーサリング UI を使用してパイプラインを作成する場合、データのプレビューなどの操作にはライブ クラスターが必要ですが、Data Factory によってクラスターが起動されることはありません。

イメージ

datamovement.png

また、このアクセスの際にはマウントによるアクセスはテーブルを参照する場合にのみ実施されると考えられます。

実際に、準備の中で、直接アクセス用のsparkConfを設定しない場合、エラーが発生します。

error.png

検証

準備

ADFとストレージアカウントとDatabricksを利用します。

ストレージアカウント

ストレージアカウントは二つのコンテナを用意しておきます。

  • adf: ADFで出力、読み取りするコンテナ
  • databricks: Databricksで出力、読み取りするコンテナ

container.png

datafactory にはadfコンテナのみ権限をあたえます。

スクリーンショット 2020-12-14 213359.png

Azure Databricks

クイック スタート - Azure portal を使用して Azure Databricks ワークスペースで Spark ジョブを実行する | Microsoft Docsを参考にクラスタを作成します。

このとき、後述のクラスター構成、およびRDD APIでの直接アクセス設定に従って、spark configを設定します。

RDD APIでの直接アクセス設定

クラスター構成でsecretを利用する方法は不明でした。。


spark.databricks.delta.optimizeWrite.enabled true
spark.databricks.delta.autoCompact.enabled true
fs.azure.sas.<コンテナ名>.<ストレージアカウント名>.dfs.core.windows.net sasurl例:https://<ストレージアカウント名>.dfs.core.windows.net/コンテナ名/?sv=xxxx

※blob.coreはADLSが相手先の場合はdfs.coreに変更

cluster.png

ストレージアカウントをマウント

pyspark
# 対象
storageAccount = "ストレージアカウント名"
containerName = "databricks"
# key 取得
accountKey = "アカウントキー"


try:
  dbutils.fs.unmount("/mnt/databricks")#すでにマウントされていればアンマウント
  print('Unmount!')

except:
  print('Already Unmount!')

result=dbutils.fs.mount(
  source = "wasbs://" + containerName + "@" + storageAccount + ".blob.core.windows.net/",
  mount_point = "/mnt/databricks",
  extra_configs = {"fs.azure.account.key."+storageAccount+".blob.core.windows.net":accountKey}
)
print('Mount!')

次に外部テーブルを作成

pyspark

delta_route = "/mnt/databricks/delta/"
delta_path_sink= delta_route +"sink/"
delta_path_source=delta_route +"source/"

spark.sql("DROP TABLE IF EXISTS delta_Table_sink")
spark.sql(
    """
    CREATE TABLE delta_Table_sink (
      battery_level long
      ,02_level long
      ,ca2 string
      ,ca3 string
      ,n string
      ,evice_id long
      ,evice_name string
      ,umidity long
      ,p string
      ,atitude double
      ,cd string
      ,ongitude double
      ,cale string
      ,temp long
      ,timestamp long
      )
    using DELTA
    LOCATION '{}'
    """.format(delta_path_sink)\
)
spark.sql("DROP TABLE IF EXISTS delta_Table_source")
spark.sql(
    """
    CREATE TABLE delta_Table_source (
      battery_level long
      ,02_level long
      ,ca2 string
      ,ca3 string
      ,n string
      ,evice_id long
      ,evice_name string
      ,umidity long
      ,p string
      ,atitude double
      ,cd string
      ,ongitude double
      ,cale string
      ,temp long
      ,timestamp long
      )
    using DELTA
    LOCATION '{}'
    """.format(delta_path_source)\
)

作成後、以下のようになります。

deltatable.png

データ登録をして準備完了

python

df = spark.read.json("dbfs:/databricks-datasets/iot/iot_devices.json")
df.write.insertInto("delta_Table_source")
# display(df)

Azure Data Factory

最後に、ADFのlinked Serviceを構成しておきます。

linked.png

Databricksについては、先ほど作成したクラスターを利用するように指定します。

adb_linked.png

Copy Delta Lake -> Storage

ソースデータセット

d2s_sourceDS.png

シンクデータセット

blob.png

結果

d2s_throwput.png

Copy Storage -> Delta Lake

ソースデータセット

blob.png

シンクデータセット

s2d.png

結果(直接コピー)

直接コピーの場合、Wildcard folder pathは指定できません。

s2d_direct.png

結果

s2d_d_result.png

結果(ステージングコピー)

ステージングコピーが現実的でしょう

s2d_stage.png

結果

s2d_s_result.png

トラブルシューティング

pythonで直接アクセスを設定してもうまくいかない

以下はnotebook上でのdata frame API、RDD API用の構成設定ですが、クラスターが停止されると保持されない設定のため、ADFからの実行だと使えません


spark.conf.set(
  "fs.azure.account.key.<storage-account-name>.blob.core.windows.net",
  "<storage-account-access-key>")


%scala 
// Using an account access key
spark.sparkContext.hadoopConfiguration.set(
  "fs.azure.account.key.<storage-account-name>.blob.core.windows.net",
  "<storage-account-access-key>"
)

RDDでのspark conf設定

spark.hadoop.fs.azure.sas.<コンテナ名>.<ストレージアカウント名>.blob.core.windows.net sasurl例:https://<ストレージアカウント名>.blob.core.windows.net/コンテナ名/?sv=xxxx

エラーメッセージ全文

※ストレージアカウント名はxxxに置換

ErrorCode=AzureDatabricksCommandError,Hit an error when running the command in Azure Databricks. Error details: shaded.databricks.org.apache.hadoop.fs.azure.AzureException: shaded.databricks.org.apache.hadoop.fs.azure.AzureException: Container adf in account xxx.blob.core.windows.net not found, and we can't create it using anoynomous credentials, and no credentials found for them in the configuration. Caused by: shaded.databricks.org.apache.hadoop.fs.azure.AzureException: Container adf in account xxx.blob.core.windows.net not found, and we can't create it using anoynomous credentials, and no credentials found for them in the configuration..

2
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
1