概要
Databricks Auto Loader 利用時におけるソースディレクトリに Hive スタイルのパーティション({column_name}={value}
のように構成したディレクトリ)を参照する際の仕様に関する調査を実施しました。以前から Databricks Auto Loader を利用しているのですが、パーティション仕様に関して疑問を感じ、改めて動作検証を実施しました。
Databricks Auto Loader をメダリオンアーキテクチャで利用する場合、ソースファイルを Bronze レイヤーにロードすることがあります。その際、データ分析基盤への連携日時を示す監査列(ingest_date
やingest_timestamp
)を Hive スタイルパーティション(例:ingest_timestamp=2020-01-02 12%3A34%3A56
)として保持することが有効です。ingest_timestamp
列に基づき Bronze から Silver に連携すべきレコードを特定するために必要であり、その列の値を Hive スタイルのパーティションとして保持させることで静的に値を設定できるようになります。静的に設定することで、連携済みのファイルをコールドストレージへ移行したり、連携不足のファイルをを手動で配置したりすることが可能になります。そのため、Databricks Auto Loader における Hive スタイルパーティション参照仕様を理解することが重要です。
検証の結論としては、下記のようになりました。
-
cloudFiles.partitionColumn
オプションを指定しない場合でもパーティションの値を取得できますが、パーティションの階層が異なる場合などに値を想定通りに取得できない可能性があります。そのため、cloudFiles.partitionColumn
オプションを指定することを推奨します。 - パーティションの値が String 型となるため、必要に応じてデータ型変換が必要となります。
- ソースディレクトリを指定する場合には、Hive スタイルのパーティションを構成するディレクトリの上位ディレクトリ(例:
dbfs:/user/hive/warehouse/auto_loader_test.db/src_data
)までを指定することで動作します。ソースファイル名の部分一致(拡張子やファイル名を指定)による取り込みを実施すること(例:dbfs:/user/hive/warehouse/auto_loader_test.db/src_data/**/*.csv
)も可能です。
検証に利用したコードとその実行結果を共有します。
検証コードと実行結果
事前準備
スキーマの作成とテーブルの準備を行い、ソースファイルを確認し、テーブルとチェックポイントのディレクトリを初期化します。
%sql
CREATE SCHEMA IF NOT EXISTS auto_loader_test;
CREATE
OR REPLACE TABLE auto_loader_test.tbl_1 (
str_col string,
ingest_date date,
ingest_timestamp TIMESTAMP,
_rescued_data string,
_metadata_file_path string
);
CREATE
OR REPLACE TABLE auto_loader_test.tbl_2 (
str_col string,
ingest_date date,
ingest_timestamp TIMESTAMP,
_rescued_data string,
_metadata_file_path string
);
# ソースのファイルを確認
src_dir = "dbfs:/user/hive/warehouse/auto_loader_test.db/src_data"
dbutils.fs.rm(src_dir, True)
csv_dir = f"{src_dir}/ingest_date=2020-01-01/ingest_timestamp=2020-01-01 12%3A34%3A56/csv_file_01.csv"
csv_contents = '''str_col
"abc"'''
dbutils.fs.put(csv_dir, csv_contents, True)
txt_file = f"{src_dir}/ingest_date=2020-01-02/ingest_timestamp=2020-01-02 12%3A34%3A56/txt_file_01.txt"
txt_contents = '''str_col
"efg"'''
dbutils.fs.put(txt_file, txt_contents, True)
# テーブルとチェックポイントを初期化
checkpoint_dir = "dbfs:/user/hive/warehouse/auto_loader_test.db/_checkpoint"
dbutils.fs.rm(checkpoint_dir, True)
Databricks Auto Loader を利用するための前提の仕様確認
Databricks Auto Loader におけるパーティションの列のデータ型が String 型となることを確認
Databricks Auto Loader におけるパーティション列のデータ型が String 型であることを確認しました。Spark Dataframe のingest_date
とingest_timestamp
列のデータ型を確認すると Date 型や Timstamp 型でしたが、Databricks Auto Loader(format
オプションにcloudFiles
を指定した場合)ではingest_date
とingest_timestamp
列のデータ型が String 型となっていました。
src_dir = "dbfs:/user/hive/warehouse/auto_loader_test.db/src_data"
tgt_tbl_name = "auto_loader_test.tbl_1"
print("-- `read`の Spark データフレームのスキーマ確認")
read_df = (
spark.read.format("csv")
.schema("str_col string")
.option("header", True)
.load(src_dir)
)
read_df.printSchema()
print("-- `readStream`の Spark データフレームのスキーマ確認")
readStream_df = (
spark.readStream.format("csv")
.schema("str_col string")
.option("header", True)
.load(src_dir)
)
readStream_df.printSchema()
from pyspark.sql.functions import expr
checkpoint_dir = "dbfs:/user/hive/warehouse/auto_loader_test.db/_checkpoint"
dbutils.fs.rm(checkpoint_dir, True)
src_df = (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "csv")
.option("cloudFiles.schemaLocation", checkpoint_dir)
.option("cloudFiles.partitionColumns", "ingest_date,ingest_timestamp")
.load(src_dir)
)
src_df.printSchema()
--
read
の Spark データフレームのスキーマ確認
root
|-- str_col: string (nullable = true)
|-- ingest_date: date (nullable = true)
|-- ingest_timestamp: timestamp (nullable = true)
--
readStream
の Spark データフレームのスキーマ確認
root
|-- str_col: string (nullable = true)
|-- ingest_date: date (nullable = true)
|-- ingest_timestamp: timestamp (nullable = true)
root
|-- str_col: string (nullable = true)
|-- ingest_date: string (nullable = true)
|-- ingest_timestamp: string (nullable = true)
|-- _rescued_data: string (nullable = true)
cloudFiles.partitionColumns
オプション指定の要否確認
パーティションの階層が同一である場合にはcloudFiles.partitionColumns
オプションを指定しなくてもパーティションの値を取得できるを確認できました。
src_dir = "dbfs:/user/hive/warehouse/auto_loader_test.db/src_data"
dbutils.fs.rm(src_dir, True)
csv_dir = f"{src_dir}/ingest_date=2020-01-01/ingest_timestamp=2020-01-01 12%3A34%3A56/test.csv"
csv_contents = '''str_col
"abc"'''
dbutils.fs.put(csv_dir, csv_contents, True)
txt_file = f"{src_dir}/ingest_date=2020-01-02/ingest_timestamp=2020-01-02 12%3A34%3A56/test.txt"
txt_contents = '''str_col
"efg"'''
dbutils.fs.put(txt_file, txt_contents, True)
src_dir = "dbfs:/user/hive/warehouse/auto_loader_test.db/src_data"
from pyspark.sql.functions import expr
checkpoint_dir = "dbfs:/user/hive/warehouse/auto_loader_test.db/_checkpoint"
dbutils.fs.rm(checkpoint_dir, True)
src_df = (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "csv")
.option("cloudFiles.schemaLocation", checkpoint_dir)
.option("cloudFiles.inferColumnTypes", False)
# .option("cloudFiles.partitionColumns", "ingest_date,ingest_timestamp")
.option("inferSchema", False)
.load(src_dir)
)
src_df.printSchema()
root
|-- str_col: string (nullable = true)
|-- ingest_date: string (nullable = true)
|-- ingest_timestamp: string (nullable = true)
|-- _rescued_data: string (nullable = true)
ただし、パーティションの階層が同一でない場合には、パーティションの列を取得できていないことを確認できました。よって、cloudFiles.partitionColumns
オプションを指定することが望ましいと結論づけました。
src_dir = "dbfs:/user/hive/warehouse/auto_loader_test.db/src_data"
dbutils.fs.rm(src_dir, True)
csv_dir = f"{src_dir}/ingest_date=2020-01-01/ingest_timestamp=2020-01-01 12%3A34%3A56/test.csv"
csv_contents = '''str_col
"abc"'''
dbutils.fs.put(csv_dir, csv_contents, True)
txt_file = f"{src_dir}/ingest_date=2020-01-02/ingest_timestamp=2020-01-02 12%3A34%3A56/test=123/test.txt"
txt_contents = '''str_col
"efg"'''
dbutils.fs.put(txt_file, txt_contents, True)
src_dir = "dbfs:/user/hive/warehouse/auto_loader_test.db/src_data"
# Hive Partition のカラムを保持していないことを確認可能
from pyspark.sql.functions import expr
checkpoint_dir = "dbfs:/user/hive/warehouse/auto_loader_test.db/_checkpoint"
dbutils.fs.rm(checkpoint_dir, True)
src_df = (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "csv")
.option("cloudFiles.schemaLocation", checkpoint_dir)
.option("cloudFiles.inferColumnTypes", False)
# .option("cloudFiles.partitionColumns", "ingest_date,ingest_timestamp")
.option("inferSchema", False)
.load(src_dir)
)
src_df.printSchema()
from pyspark.sql.functions import expr
checkpoint_dir = "dbfs:/user/hive/warehouse/auto_loader_test.db/_checkpoint"
dbutils.fs.rm(checkpoint_dir, True)
src_df = (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "csv")
.option("cloudFiles.schemaLocation", checkpoint_dir)
.option("cloudFiles.inferColumnTypes", False)
.option("cloudFiles.partitionColumns", "ingest_date,ingest_timestamp")
.option("inferSchema", False)
.load(src_dir)
)
src_df.printSchema()
root
|-- str_col: string (nullable = true)
|-- _rescued_data: string (nullable = true)
root
|-- str_col: string (nullable = true)
|-- ingest_date: string (nullable = true)
|-- ingest_timestamp: string (nullable = true)
|-- _rescued_data: string (nullable = true)
Databricks Auto Loader の基本的な利用方法の確認
Databricks Auto Loader を使用する際の基本的な仕様に従い、次のコードのように記述することで Auto Loader を活用できます。このコードにより、予定どおりに2つのファイルが正しく取り込まれました。取り込んだファイルが正確にどれかを確認するために、_metadata.file_path
の情報をテーブルに記録しました。
from pyspark.sql.functions import expr
src_df = (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "csv")
.option("cloudFiles.schemaLocation", checkpoint_dir)
.option("cloudFiles.inferColumnTypes", False)
.option("cloudFiles.partitionColumns", "ingest_date,ingest_timestamp")
.option("inferSchema", True)
.load(src_dir)
)
add_cols = {
"_metadata_file_path": expr("_metadata.file_path"),
"ingest_date": expr("to_date(ingest_date)"),
"ingest_timestamp": expr("try_to_timestamp(replace(ingest_timestamp, '%3A', ':'))"),
}
src_df = src_df.withColumns(add_cols)
(
src_df.writeStream
.option("mergeSchema", True)
.option("checkpointLocation", checkpoint_dir)
.trigger(availableNow=True)
.toTable(tgt_tbl_name)
)
df = spark.table(tgt_tbl_name)
df.display()
ファイル名の部分一致により動作確認
拡張子を指定する方法
拡張子を指定する方法として、/**/*.csv
と指定のように指定することで想定通りの動作となりました。拡張子がcsv
であるdbfs:/user/hive/warehouse/auto_loader_test.db/src_data/ingest_date=2020-01-01/ingest_timestamp=2020-01-01%2012%253A34%253A56/csv_file_01.csv
のみ取り込まれていました。
src_dir = "dbfs:/user/hive/warehouse/auto_loader_test.db/src_data/**/*.csv"
tgt_tbl_name = "auto_loader_test.tbl_1"
from pyspark.sql.functions import expr
spark.sql(f"TRUNCATE TABLE {tgt_tbl_name}")
checkpoint_dir = "dbfs:/user/hive/warehouse/auto_loader_test.db/_checkpoint"
dbutils.fs.rm(checkpoint_dir, True)
src_df = (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "csv")
.option("cloudFiles.schemaLocation", checkpoint_dir)
.option("cloudFiles.inferColumnTypes", False)
.option("cloudFiles.partitionColumns", "ingest_date,ingest_timestamp")
.option("inferSchema", False)
.load(src_dir)
)
add_cols = {
"_metadata_file_path": expr("_metadata.file_path"),
"ingest_date": expr("to_date(ingest_date)"),
"ingest_timestamp": expr("try_to_timestamp(replace(ingest_timestamp, '%3A', ':'))"),
}
src_df = src_df.withColumns(add_cols)
(
src_df.writeStream.option("mergeSchema", True)
.option("checkpointLocation", checkpoint_dir)
.trigger(availableNow=True)
.toTable(tgt_tbl_name)
)
df = spark.table(tgt_tbl_name)
df.display()
ファイル名を指定する方法
ファイル名を指定する方法として、/**/txt_file*
と指定のように指定することで想定通りの動作となりました。ファイル名がtxt_file
ではじまるdbfs:/user/hive/warehouse/auto_loader_test.db/src_data/ingest_date=2020-01-02/ingest_timestamp=2020-01-02%2012%253A34%253A56/txt_file_01.txt
のみが取り込まれていました。
src_dir = "dbfs:/user/hive/warehouse/auto_loader_test.db/src_data/**/txt_file*"
tgt_tbl_name = "auto_loader_test.tbl_2"
from pyspark.sql.functions import expr
spark.sql(f"TRUNCATE TABLE {tgt_tbl_name}")
checkpoint_dir = "dbfs:/user/hive/warehouse/auto_loader_test.db/_checkpoint"
dbutils.fs.rm(checkpoint_dir, True)
src_df = (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "csv")
.option("cloudFiles.schemaLocation", checkpoint_dir)
.option("cloudFiles.inferColumnTypes", False)
.option("cloudFiles.partitionColumns", "ingest_date,ingest_timestamp")
.option("inferSchema", False)
.load(src_dir)
)
add_cols = {
"_metadata_file_path": expr("_metadata.file_path"),
"ingest_date": expr("to_date(ingest_date)"),
"ingest_timestamp": expr("try_to_timestamp(replace(ingest_timestamp, '%3A', ':'))"),
}
src_df = src_df.withColumns(add_cols)
(
src_df.writeStream.option("mergeSchema", True)
.option("checkpointLocation", checkpoint_dir)
.trigger(availableNow=True)
.toTable(tgt_tbl_name)
)
df = spark.table(tgt_tbl_name)
df.display()
その他の仕様確認
/**/*.csv
ではなく/*/*.csv
を指定した場合も同様の動作となることを確認
/*/*.csv
と記述したい場合にはその直下のディレクトリのみが対象となる想定でしたが、/**/*.csv
の動作と同様にすべてのサブディレクトリを参照するような動作となりました。
src_dir = "dbfs:/user/hive/warehouse/auto_loader_test.db/src_data/*/*.csv"
tgt_tbl_name = "auto_loader_test.tbl_1"
from pyspark.sql.functions import expr
spark.sql(f"TRUNCATE TABLE {tgt_tbl_name}")
checkpoint_dir = "dbfs:/user/hive/warehouse/auto_loader_test.db/_checkpoint"
dbutils.fs.rm(checkpoint_dir, True)
src_df = (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "csv")
.option("cloudFiles.schemaLocation", checkpoint_dir)
.option("cloudFiles.inferColumnTypes", False)
.option("cloudFiles.partitionColumns", "ingest_date,ingest_timestamp")
.option("inferSchema", False)
.load(src_dir)
)
add_cols = {
"_metadata_file_path": expr("_metadata.file_path"),
"ingest_date": expr("to_date(ingest_date)"),
"ingest_timestamp": expr("try_to_timestamp(replace(ingest_timestamp, '%3A', ':'))"),
}
src_df = src_df.withColumns(add_cols)
(
src_df.writeStream.option("mergeSchema", True)
.option("checkpointLocation", checkpoint_dir)
.trigger(availableNow=True)
.toTable(tgt_tbl_name)
)
df = spark.table(tgt_tbl_name)
df.display()
from pyspark.sql.functions import expr
checkpoint_dir = "dbfs:/user/hive/warehouse/auto_loader_test.db/_checkpoint"
dbutils.fs.rm(checkpoint_dir, True)
src_df = (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "csv")
.option("cloudFiles.schemaLocation", checkpoint_dir)
.option("cloudFiles.inferColumnTypes", False)
.option("cloudFiles.partitionColumns", "ingest_date,ingest_timestamp")
.option("inferSchema", False)
.load(src_dir)
)
src_df.printSchema()
header
のデフォルトがfalse
と記載されているがデフォルトがtrue
であるかの動作を確認
Databricks の Auto Loader のドキュメントにてheader
のデフォルト値がfalse
と記述されていますが、上記の説明文ではヘッダーがあると想定する旨の記載があり、実際の動作もheader
のデフォルト値がtrue
ど同様の動作となりました。
CSVファイルにヘッダーが含まれているかどうか。Auto Loaderは、スキーマを推論するときにファイルにヘッダーがあると想定します。
デフォルト値 false
引用元:Auto Loaderのオプション | Databricks on AWS
header
を指定しない場合とそれぞれの値の場合で、データフレームのカラム名を確認すると、デフォルトの動作はheader
がtrue
ど同様の動作となりました。
from pyspark.sql.functions import expr
checkpoint_dir = "dbfs:/user/hive/warehouse/auto_loader_test.db/_checkpoint"
dbutils.fs.rm(checkpoint_dir, True)
src_df = (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "csv")
.option("cloudFiles.schemaLocation", checkpoint_dir)
.option("cloudFiles.partitionColumns", "ingest_date,ingest_timestamp")
# .option("header", False)
.load(src_dir)
)
print("-- デフォルトの動作を確認")
src_df.printSchema()
from pyspark.sql.functions import expr
checkpoint_dir = "dbfs:/user/hive/warehouse/auto_loader_test.db/_checkpoint"
dbutils.fs.rm(checkpoint_dir, True)
src_df = (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "csv")
.option("cloudFiles.schemaLocation", checkpoint_dir)
.option("cloudFiles.partitionColumns", "ingest_date,ingest_timestamp")
.option("header", False)
.load(src_dir)
)
print("-- `header`が`False`の場合のスキーマ確認")
src_df.printSchema()
from pyspark.sql.functions import expr
checkpoint_dir = "dbfs:/user/hive/warehouse/auto_loader_test.db/_checkpoint"
dbutils.fs.rm(checkpoint_dir, True)
src_df = (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "csv")
.option("cloudFiles.schemaLocation", checkpoint_dir)
.option("cloudFiles.partitionColumns", "ingest_date,ingest_timestamp")
.option("header", True)
.load(src_dir)
)
print("-- `header`が`True`の場合のスキーマ確認")
src_df.printSchema()
-- デフォルトの動作を確認
root
|-- str_col: string (nullable = true)
|-- ingest_date: string (nullable = true)
|-- ingest_timestamp: string (nullable = true)
|-- _rescued_data: string (nullable = true)--
header
がFalse
の場合のスキーマ確認
root
|-- _c0: string (nullable = true)
|-- ingest_date: string (nullable = true)
|-- ingest_timestamp: string (nullable = true)
|-- _rescued_data: string (nullable = true)--
header
がTrue
の場合のスキーマ確認
root
|-- str_col: string (nullable = true)
|-- ingest_date: string (nullable = true)
|-- ingest_timestamp: string (nullable = true)
|-- _rescued_data: string (nullable = true)
事後処理
検証で作成したスキーマを削除します。
%sql
DROP SCHEMA IF EXISTS auto_loader_test CASCADE;