0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

#3 Azure Databricks を使ってみよう 【格納編】

Last updated at Posted at 2021-10-25

【 Azure Databricks ETL編 その3 】

概要

Azure Databricks を使ってみよう! ということで、以下の3ステップで簡単に說明します。

  1. 【環境準備編】:Azure CLI から Azure Databricks 環境を作成します
  2. 【抽出変換編】:Databricks から ADLS Gen2 のデータを抽出し、変換します
  3. 【格納編】:Databricks で ETL されたデータを ADLS Gen2 に格納します

image.png

今回は 【格納編】の說明となります。 Databricks の Notebook を使用していきます。

  • Azure Databricks
    • NoteBook 実装
      • Blob コンテナ(データ格納用)のマウント
      • データ格納

ローカル環境

  • macOS Big Sur 11.3
  • python 3.8.3
  • Azure CLI 2.28.0

前提条件

  1. Azure環境がすでに用意されていること(テナント/サブスクリプション)。
  2. ローカル環境に「azure cli」がインストールされていること。
  3. 本記事シリーズの【環境準備編】【抽出変換編】 が完了していること

Azure Databricks の Notebook を使います

【抽出変換編】 で作成した Notebook :「ETL_UsageCost_01」 の続きとして実装します
なお、Blob コンテナのマウントについては、この記事 の内容をほぼそのまま活用させていただきました

Blobコンテナーのマウント

cmd_10
# コンテナーのマウント
# 一度マウントすると、Clusterを停止、変更してもマウント状態が維持されます
# マウントされた状態で再度操作を実行するとエラーが発生するため、マウント状態をチェックする

# Blob Storage情報(データ抽出用)
storage = {
  "account": "testaccount",
  "container": "etl-cost-data",
  "key": "xxxxxxxxxxxxxxxxxxxxxxxxxxxx"
}

# マウント先DBFSディレクトリ(データ抽出用)
mount_point_etl = "/mnt/etl-blob-data"

try:
  # マウント状態のチェック
  mount_dir = mount_point_etl
  if mount_dir[-1] == "/":
    mount_dir = mount_dir[:-1]
  if len(list(filter(lambda x: x.mountPoint == mount_dir, dbutils.fs.mounts()))) > 0:
    print("Already mounted.")
    mounted = True
  else:
    mounted = False

  # Blob Storageのマウント
  if not mounted:
    source = "wasbs://{container}@{account}.blob.core.windows.net".format(**storage)
    conf_key = "fs.azure.account.key.{account}.blob.core.windows.net".format(**storage)

    mounted = dbutils.fs.mount(
      source=source,
      mount_point = mount_point_etl,
      extra_configs = {conf_key: storage["key"]}
    ) 

except Exception as e:
  raise e

"mounted: {}".format(mounted)

image.png

Blob Storageをマウントしたディレクトリの確認

cmd_11
# ディレクトリの確認
display(dbutils.fs.mounts())

image.png

変換されたデータの格納 : Json

cmd_12
# データの保存:階層化指定なし
df.write.mode('overwrite').json("/mnt/etl-blob-data/NoPartition_01.json")

# データの保存:Subscription で階層化
df.write.mode('overwrite').partitionBy('Subscription').json("/mnt/etl-blob-data/ParSubscription_02.json")

# データの保存:Subscription - ResourceGroup で階層化
df.write.mode('overwrite').partitionBy('Subscription', 'ResourceGroup').json("/mnt/etl-blob-data/ParResourceGroup_03.json")

格納されたファイルの確認 : Json

cmd_13
# 格納ファイルの確認(ファイルが存在しないとエラーが発生)
# mount_point = "/mnt/{マウント先ディレクトリ}"
display(dbutils.fs.ls(mount_point_etl))

image.png

cmd_14
# 「階層化指定なし」で格納されたファイルの確認
display(dbutils.fs.ls("/mnt/etl-blob-data/NoPartition_01.json/"))

image.png

cmd_15
# 「Subscription で階層化」で格納されたファイルの確認
display(dbutils.fs.ls("/mnt/etl-blob-data/ParSubscription_02.json"))
display(dbutils.fs.ls("/mnt/etl-blob-data/ParSubscription_02.json/Subscription=NSG-01"))
display(dbutils.fs.ls("/mnt/etl-blob-data/ParSubscription_02.json/Subscription=iapp-01"))

image.png

cmd_16
# 「Subscription - ResourceGroup で階層化」で格納されたディレクトリの確認
display(dbutils.fs.ls("/mnt/etl-blob-data/ParResourceGroup_03.json"))
display(dbutils.fs.ls("/mnt/etl-blob-data/ParResourceGroup_03.json/Subscription=NSG-01/"))
display(dbutils.fs.ls("/mnt/etl-blob-data/ParResourceGroup_03.json/Subscription=iapp-01/"))

image.png

格納したファイルの再読み込み(確認) : Json

cmd_17
# 「Subscription で階層化」で格納された全件データ読み込み
rdf = spark.read.json('/mnt/etl-blob-data/ParSubscription_02.json')
display(rdf)

# 「Subscription で階層化」で格納された「Subscription=iapp-01」のデータ読み込み
rdf = spark.read.json('/mnt/etl-blob-data/ParSubscription_02.json/Subscription=NSG-01')
display(rdf)

image.png

変換されたデータの格納 : Parquet

cmd_18
# データの保存:階層化指定なし
df.write.mode('overwrite').parquet("/mnt/etl-blob-data/Parquet/NoPartition_11")

# データの保存:Subscription で階層化
df.write.mode('overwrite').partitionBy('Subscription').parquet("/mnt/etl-blob-data/Parquet/ParSubscription_12")

格納されたファイルの確認 : Parquet

cmd_19
# 「Subscription で階層化」で格納されたファイルの確認
display(dbutils.fs.ls("/mnt/etl-blob-data/Parquet/ParSubscription_12"))
display(dbutils.fs.ls("/mnt/etl-blob-data/Parquet/ParSubscription_12/Subscription=NSG-01"))
display(dbutils.fs.ls("/mnt/etl-blob-data/Parquet/ParSubscription_12/Subscription=iapp-01"))

image.png

格納したファイルの再読み込み(確認) : Parquet

cmd_20
# 「Subscription で階層化」で格納された全件データ読み込み
rdf = spark.read.parquet('/mnt/etl-blob-data/Parquet/NoPartition_11')
display(rdf)

# 「Subscription で階層化」で格納された「Subscription=iapp-01」のデータ読み込み
rdf = spark.read.parquet('/mnt/etl-blob-data/Parquet/ParSubscription_12/Subscription=NSG-01')
display(rdf)

image.png

ローカル環境からコンテナー内の BLOB を一覧表示する

$ az storage blob list --account-name $STORAGE_ACCOUNT --container-name $ETL_CONTAINER --output table

まとめ

Azure Databricks から Blob のデータを抽出し、そのデータを変換後、Blob に指定したパーティショニングで格納できることを確認できました

参考記事

以下の記事を参考にさせていただきました。感謝申し上げます
Azure Databricks: 3-1. DBFSにBlob Storageをマウント

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?