概要
Azure DataLake Gen2に格納したrawデータを分析しやすいようにETL処理します。
ETL処理をするために、Azure Databricksを利用し、ETL処理したデータをAzure DataLake Gen2に格納します。
Azure Databricks notebooksの実装
Azure Databricks notebooksの実装の前にデータを蓄積する為のAzure DataLake Gen2やAzure Databricksのクラスターを作成する必要がありますが、作成手順は割愛させて頂きます。
Azure DataLake Storageコンテナーのマウント
cmd_1
# コンテナーのマウント
# DataLake Storage情報(データ抽出用)
mount_name= "【任意のマウント先ディレクトリ名】"
storage_account_name = "【ストレージアカウント名】"
container_name = "【任意のマウント先ディレクトリ名】"
storage_account_access_key = "【ストレージアカウントアクセスキー】"
# マウント先DBFSディレクトリ
order_items_json = "dbfs:/mnt/【任意のマウント先ディレクトリ名】/【データ抽出対象ファイル】"
# DataLake Storageのマウント
mount_point = "/mnt/" + mount_name
source = "wasbs://" + container_name + "@" + storage_account_name + ".blob.core.windows.net"
conf_key = "fs.azure.account.key." + storage_account_name + ".blob.core.windows.net"
mounted = dbutils.fs.mount(
source=source,
mount_point = mount_point,
extra_configs = {conf_key: storage_account_access_key}
)
DataFrame化
cmd_2
df = spark.read\
.format("json")\
.options(header="true", inferSchema="true")\
.option('charset', 'utf-8')\
.load(order_items_json)
df.display(df)
必要なカラムの選択
cmd_3
"【任意の名前】" = df.select("【必要なカラム】")
display("【任意の名前】")
日付(string型)のdatetime型へ変更
cmd_4
import pandas as pd
import datetime
pdf = df.toPandas()
pdf["【対象のカラム】"] = pd.to_datetime(pdf["【対象のカラム】"].astype(str), format='%Y-%m-%d')
df = spark.createDataFrame(pdf)
display(df)
変換されたデータの格納 : Json
cmd_5
# データの保存
df.write.mode('overwrite').json("/mnt/【任意のマウント先ディレクトリ名】/【任意のファイル名】")
まとめ
Azure DataLake Gen2に格納されたrawデータをETL処理し、変換データを格納するところまでが完了しました。
データ分析の内容も掲載してみようかなと思います。と言っても、データの公開が出来ないのでSQL文の内容だけになるので、どうしようかと・・・