概要
Azure Databricks のデータ書き込みにおいて、シングルファイルで可能かどうかという依頼を頂いたので、サクッと検証した結果報告となります。 Spark と Pandas で少し結果が異なるようです。
ローカル環境
- macOS Monterey 12.1
- python 3.8.12
前提条件
- Azure Databricks の環境が準備できていること - 参考
- Azure CostManagement 等のデータがBLOBに蓄積されていること(JSONであれば問わず) - 参考
- Azure Databricks から 上記 BLOB のコンテナをマウントできていること
- 読み込みコンテナのマウントポイント:/mnt/blob-data
- 書き込みコンテナのマウントポイント:/mnt/blob-data2
データの準備
Blob Storageをマウントした ディレクトリ と ファイル の確認
cmd_1
# マウント先DBFSディレクトリ(データ読み込み用)
mount_point = "/mnt/blob-data"
# マウント先DBFSディレクトリ(データ書き込み用)
mount_point2 = "/mnt/blob-data2"
# ディレクトリの確認
display(dbutils.fs.mounts())
# ファイルの確認(ファイルが存在しないとエラーが発生)
# mount_point = "/mnt/{マウント先ディレクトリ}"
display(dbutils.fs.ls(mount_point))
display(dbutils.fs.ls(mount_point2))
データの読み込み(+マージ)
cmd_2
# Blob から 全ファイルの読込 --> PySpark Dataframes 型式
# (読み込むファイルの指定をワイルドカードにすると、複数ファイルを同時に扱えます)
storename = mount_point + '/*.json'
sdf = spark.read\
.option("multiline", "true").option("header", "false").option("inferSchema", "false")\
.json(storename)
display(sdf)
読み込んだデータの変換
cmd_3
import pandas as pd
import datetime
# PySpark Dataframes から Pandas への変換
pdf = sdf.toPandas()
# 日付(date)をint64型からdatetime64型に変換
print(pdf['Date'].dtype) # 変換前のデータ型
pdf['Date'] = pd.to_datetime(pdf['Date'].astype(str), format='%Y-%m-%d')
print(pdf['Date'].dtype) # 変換後のデータ型
# Pandas から PySpark Dataframes への変換
df = spark.createDataFrame(pdf)
display(df)
データの書き込み
Spark to Write Data to Multi JSON files (データの容量により複数ファイルに分割される)
cmd_4
# ファイル名の作成
storename = mount_point2 + '/MargeUsageCost_' + datetime.datetime.now().strftime('%Y%m%d_%H%M%S') + '.json'
print(storename)
# データ保存:複数ファイルの分割されます
df.write.mode('overwrite').json(storename)
# データ確認
display(dbutils.fs.ls(mount_point2))
display(dbutils.fs.ls(storename))
Spark to Write Data to a Single JSON file (Use the coalesce Feature)
cmd_5
# ファイル名の作成
storename = mount_point2 + '/MargeUsageCost_' + datetime.datetime.now().strftime('%Y%m%d_%H%M%S') + '.json'
print(storename)
# データ保存:シングルファイルですが、ディレクトリが作成されます
df.coalesce(1).write.mode('overwrite').json(storename)
# データ確認
display(dbutils.fs.ls(mount_point2))
display(dbutils.fs.ls(storename))
Pandas to Write Data to a Single JSON file (Use collect and Pandas)
cmd_6
# ファイル名の作成
storename = '/dbfs' + mount_point2 + '/MargeUsageCost_' + datetime.datetime.now().strftime('%Y%m%d_%H%M%S') + '.json'
print(storename)
# PySpark Dataframes から Pandas への変換
dfp = df.toPandas()
# データ保存:マウントポイントにシングルファイルで書き込みされます
# orientを指定しない場合、orient='columns'を指定したときと同じ内容でjson出力されます
# dfp.to_json(storename, orient='columns', date_format='iso') # 列で書き込む場合
dfp.to_json(storename, orient='records', date_format='iso') # 行で書き込む場合
# データ確認
display(dbutils.fs.ls(mount_point2))
まとめ
Azure Databricks の書き込みにおいて、通常複数ファイルに分割されますが、シングルファイルでできる方法を確認できました。
参考記事
以下の記事を参考にさせていただきました。感謝申し上げます
Write to a Single CSV File(Python)
pandas.DataFrame.to_json