【 Azure Databricks ETL編 その3 】
概要
Azure Databricks を使ってみよう! ということで、以下の3ステップで簡単に說明します。
- 【環境準備編】:Azure CLI から Azure Databricks 環境を作成します
- 【抽出変換編】:Databricks から ADLS Gen2 のデータを抽出し、変換します
- 【格納編】:Databricks で ETL されたデータを ADLS Gen2 に格納します
今回は 【格納編】の說明となります。 Databricks の Notebook を使用していきます。
- Azure Databricks
- NoteBook 実装
- Blob コンテナ(データ格納用)のマウント
- データ格納
- NoteBook 実装
ローカル環境
- macOS Big Sur 11.3
- python 3.8.3
- Azure CLI 2.28.0
前提条件
- Azure環境がすでに用意されていること(テナント/サブスクリプション)。
- ローカル環境に「azure cli」がインストールされていること。
- 本記事シリーズの【環境準備編】 と 【抽出変換編】 が完了していること
Azure Databricks の Notebook を使います
【抽出変換編】 で作成した Notebook :「ETL_UsageCost_01」 の続きとして実装します
なお、Blob コンテナのマウントについては、この記事 の内容をほぼそのまま活用させていただきました
Blobコンテナーのマウント
cmd_10
# コンテナーのマウント
# 一度マウントすると、Clusterを停止、変更してもマウント状態が維持されます
# マウントされた状態で再度操作を実行するとエラーが発生するため、マウント状態をチェックする
# Blob Storage情報(データ抽出用)
storage = {
"account": "testaccount",
"container": "etl-cost-data",
"key": "xxxxxxxxxxxxxxxxxxxxxxxxxxxx"
}
# マウント先DBFSディレクトリ(データ抽出用)
mount_point_etl = "/mnt/etl-blob-data"
try:
# マウント状態のチェック
mount_dir = mount_point_etl
if mount_dir[-1] == "/":
mount_dir = mount_dir[:-1]
if len(list(filter(lambda x: x.mountPoint == mount_dir, dbutils.fs.mounts()))) > 0:
print("Already mounted.")
mounted = True
else:
mounted = False
# Blob Storageのマウント
if not mounted:
source = "wasbs://{container}@{account}.blob.core.windows.net".format(**storage)
conf_key = "fs.azure.account.key.{account}.blob.core.windows.net".format(**storage)
mounted = dbutils.fs.mount(
source=source,
mount_point = mount_point_etl,
extra_configs = {conf_key: storage["key"]}
)
except Exception as e:
raise e
"mounted: {}".format(mounted)
Blob Storageをマウントしたディレクトリの確認
cmd_11
# ディレクトリの確認
display(dbutils.fs.mounts())
変換されたデータの格納 : Json
cmd_12
# データの保存:階層化指定なし
df.write.mode('overwrite').json("/mnt/etl-blob-data/NoPartition_01.json")
# データの保存:Subscription で階層化
df.write.mode('overwrite').partitionBy('Subscription').json("/mnt/etl-blob-data/ParSubscription_02.json")
# データの保存:Subscription - ResourceGroup で階層化
df.write.mode('overwrite').partitionBy('Subscription', 'ResourceGroup').json("/mnt/etl-blob-data/ParResourceGroup_03.json")
格納されたファイルの確認 : Json
cmd_13
# 格納ファイルの確認(ファイルが存在しないとエラーが発生)
# mount_point = "/mnt/{マウント先ディレクトリ}"
display(dbutils.fs.ls(mount_point_etl))
cmd_14
# 「階層化指定なし」で格納されたファイルの確認
display(dbutils.fs.ls("/mnt/etl-blob-data/NoPartition_01.json/"))
cmd_15
# 「Subscription で階層化」で格納されたファイルの確認
display(dbutils.fs.ls("/mnt/etl-blob-data/ParSubscription_02.json"))
display(dbutils.fs.ls("/mnt/etl-blob-data/ParSubscription_02.json/Subscription=NSG-01"))
display(dbutils.fs.ls("/mnt/etl-blob-data/ParSubscription_02.json/Subscription=iapp-01"))
cmd_16
# 「Subscription - ResourceGroup で階層化」で格納されたディレクトリの確認
display(dbutils.fs.ls("/mnt/etl-blob-data/ParResourceGroup_03.json"))
display(dbutils.fs.ls("/mnt/etl-blob-data/ParResourceGroup_03.json/Subscription=NSG-01/"))
display(dbutils.fs.ls("/mnt/etl-blob-data/ParResourceGroup_03.json/Subscription=iapp-01/"))
格納したファイルの再読み込み(確認) : Json
cmd_17
# 「Subscription で階層化」で格納された全件データ読み込み
rdf = spark.read.json('/mnt/etl-blob-data/ParSubscription_02.json')
display(rdf)
# 「Subscription で階層化」で格納された「Subscription=iapp-01」のデータ読み込み
rdf = spark.read.json('/mnt/etl-blob-data/ParSubscription_02.json/Subscription=NSG-01')
display(rdf)
変換されたデータの格納 : Parquet
cmd_18
# データの保存:階層化指定なし
df.write.mode('overwrite').parquet("/mnt/etl-blob-data/Parquet/NoPartition_11")
# データの保存:Subscription で階層化
df.write.mode('overwrite').partitionBy('Subscription').parquet("/mnt/etl-blob-data/Parquet/ParSubscription_12")
格納されたファイルの確認 : Parquet
cmd_19
# 「Subscription で階層化」で格納されたファイルの確認
display(dbutils.fs.ls("/mnt/etl-blob-data/Parquet/ParSubscription_12"))
display(dbutils.fs.ls("/mnt/etl-blob-data/Parquet/ParSubscription_12/Subscription=NSG-01"))
display(dbutils.fs.ls("/mnt/etl-blob-data/Parquet/ParSubscription_12/Subscription=iapp-01"))
格納したファイルの再読み込み(確認) : Parquet
cmd_20
# 「Subscription で階層化」で格納された全件データ読み込み
rdf = spark.read.parquet('/mnt/etl-blob-data/Parquet/NoPartition_11')
display(rdf)
# 「Subscription で階層化」で格納された「Subscription=iapp-01」のデータ読み込み
rdf = spark.read.parquet('/mnt/etl-blob-data/Parquet/ParSubscription_12/Subscription=NSG-01')
display(rdf)
ローカル環境からコンテナー内の BLOB を一覧表示する
$ az storage blob list --account-name $STORAGE_ACCOUNT --container-name $ETL_CONTAINER --output table
まとめ
Azure Databricks から Blob のデータを抽出し、そのデータを変換後、Blob に指定したパーティショニングで格納できることを確認できました
参考記事
以下の記事を参考にさせていただきました。感謝申し上げます
Azure Databricks: 3-1. DBFSにBlob Storageをマウント