LoginSignup
1
0

Databricks Auto Loader における Hive スタイルのパーティション参照仕様について

Last updated at Posted at 2024-05-19

概要

Databricks Auto Loader 利用時におけるソースディレクトリに Hive スタイルのパーティション({column_name}={value}のように構成したディレクトリ)を参照する際の仕様に関する調査を実施しました。以前から Databricks Auto Loader を利用しているのですが、パーティション仕様に関して疑問を感じ、改めて動作検証を実施しました。

Databricks Auto Loader をメダリオンアーキテクチャで利用する場合、ソースファイルを Bronze レイヤーにロードすることがあります。その際、データ分析基盤への連携日時を示す監査列(ingest_dateingest_timestamp)を Hive スタイルパーティション(例:ingest_timestamp=2020-01-02 12%3A34%3A56)として保持することが有効です。ingest_timestamp列に基づき Bronze から Silver に連携すべきレコードを特定するために必要であり、その列の値を Hive スタイルのパーティションとして保持させることで静的に値を設定できるようになります。静的に設定することで、連携済みのファイルをコールドストレージへ移行したり、連携不足のファイルをを手動で配置したりすることが可能になります。そのため、Databricks Auto Loader における Hive スタイルパーティション参照仕様を理解することが重要です。

検証の結論としては、下記のようになりました。

  1. cloudFiles.partitionColumnオプションを指定しない場合でもパーティションの値を取得できますが、パーティションの階層が異なる場合などに値を想定通りに取得できない可能性があります。そのため、cloudFiles.partitionColumnオプションを指定することを推奨します。
  2. パーティションの値が String 型となるため、必要に応じてデータ型変換が必要となります。
  3. ソースディレクトリを指定する場合には、Hive スタイルのパーティションを構成するディレクトリの上位ディレクトリ(例:dbfs:/user/hive/warehouse/auto_loader_test.db/src_data)までを指定することで動作します。ソースファイル名の部分一致(拡張子やファイル名を指定)による取り込みを実施すること(例:dbfs:/user/hive/warehouse/auto_loader_test.db/src_data/**/*.csv)も可能です。

検証に利用したコードとその実行結果を共有します。

検証コードと実行結果

事前準備

スキーマの作成とテーブルの準備を行い、ソースファイルを確認し、テーブルとチェックポイントのディレクトリを初期化します。

%sql
CREATE SCHEMA IF NOT EXISTS auto_loader_test;
CREATE
OR REPLACE TABLE auto_loader_test.tbl_1 (
  str_col string,
  ingest_date date,
  ingest_timestamp TIMESTAMP,
  _rescued_data string,
  _metadata_file_path string
);
CREATE
OR REPLACE TABLE auto_loader_test.tbl_2 (
  str_col string,
  ingest_date date,
  ingest_timestamp TIMESTAMP,
  _rescued_data string,
  _metadata_file_path string
);
# ソースのファイルを確認
src_dir = "dbfs:/user/hive/warehouse/auto_loader_test.db/src_data"
dbutils.fs.rm(src_dir, True)

csv_dir = f"{src_dir}/ingest_date=2020-01-01/ingest_timestamp=2020-01-01 12%3A34%3A56/csv_file_01.csv"
csv_contents = '''str_col
"abc"'''
dbutils.fs.put(csv_dir, csv_contents, True)

txt_file = f"{src_dir}/ingest_date=2020-01-02/ingest_timestamp=2020-01-02 12%3A34%3A56/txt_file_01.txt"
txt_contents = '''str_col
"efg"'''
dbutils.fs.put(txt_file, txt_contents, True)
# テーブルとチェックポイントを初期化
checkpoint_dir = "dbfs:/user/hive/warehouse/auto_loader_test.db/_checkpoint"
dbutils.fs.rm(checkpoint_dir, True)

image.png

Databricks Auto Loader を利用するための前提の仕様確認

Databricks Auto Loader におけるパーティションの列のデータ型が String 型となることを確認

Databricks Auto Loader におけるパーティション列のデータ型が String 型であることを確認しました。Spark Dataframe のingest_dateingest_timestamp列のデータ型を確認すると Date 型や Timstamp 型でしたが、Databricks Auto Loader(formatオプションにcloudFilesを指定した場合)ではingest_dateingest_timestamp列のデータ型が String 型となっていました。

src_dir = "dbfs:/user/hive/warehouse/auto_loader_test.db/src_data"
tgt_tbl_name = "auto_loader_test.tbl_1"
print("-- `read`の Spark データフレームのスキーマ確認")
read_df = (
    spark.read.format("csv")
    .schema("str_col string")
    .option("header", True)
    .load(src_dir)
)
read_df.printSchema()

print("-- `readStream`の Spark データフレームのスキーマ確認")
readStream_df = (
    spark.readStream.format("csv")
    .schema("str_col string")
    .option("header", True)
    .load(src_dir)
)
readStream_df.printSchema()
from pyspark.sql.functions import expr


checkpoint_dir = "dbfs:/user/hive/warehouse/auto_loader_test.db/_checkpoint"
dbutils.fs.rm(checkpoint_dir, True)

src_df = (
    spark.readStream.format("cloudFiles")
    .option("cloudFiles.format", "csv")
    .option("cloudFiles.schemaLocation", checkpoint_dir)
    .option("cloudFiles.partitionColumns", "ingest_date,ingest_timestamp")
    .load(src_dir)
)

src_df.printSchema()

-- readの Spark データフレームのスキーマ確認
root
|-- str_col: string (nullable = true)
|-- ingest_date: date (nullable = true)
|-- ingest_timestamp: timestamp (nullable = true)

-- readStreamの Spark データフレームのスキーマ確認
root
|-- str_col: string (nullable = true)
|-- ingest_date: date (nullable = true)
|-- ingest_timestamp: timestamp (nullable = true)

root
|-- str_col: string (nullable = true)
|-- ingest_date: string (nullable = true)
|-- ingest_timestamp: string (nullable = true)
|-- _rescued_data: string (nullable = true)

image.png

cloudFiles.partitionColumnsオプション指定の要否確認

パーティションの階層が同一である場合にはcloudFiles.partitionColumnsオプションを指定しなくてもパーティションの値を取得できるを確認できました。

src_dir = "dbfs:/user/hive/warehouse/auto_loader_test.db/src_data"
dbutils.fs.rm(src_dir, True)

csv_dir = f"{src_dir}/ingest_date=2020-01-01/ingest_timestamp=2020-01-01 12%3A34%3A56/test.csv"
csv_contents = '''str_col
"abc"'''
dbutils.fs.put(csv_dir, csv_contents, True)

txt_file = f"{src_dir}/ingest_date=2020-01-02/ingest_timestamp=2020-01-02 12%3A34%3A56/test.txt"
txt_contents = '''str_col
"efg"'''
dbutils.fs.put(txt_file, txt_contents, True)
src_dir = "dbfs:/user/hive/warehouse/auto_loader_test.db/src_data"
from pyspark.sql.functions import expr

checkpoint_dir = "dbfs:/user/hive/warehouse/auto_loader_test.db/_checkpoint"
dbutils.fs.rm(checkpoint_dir, True)

src_df = (
    spark.readStream.format("cloudFiles")
    .option("cloudFiles.format", "csv")
    .option("cloudFiles.schemaLocation", checkpoint_dir)
    .option("cloudFiles.inferColumnTypes", False)
    # .option("cloudFiles.partitionColumns", "ingest_date,ingest_timestamp")
    .option("inferSchema", False)
    .load(src_dir)
)

src_df.printSchema()

root
|-- str_col: string (nullable = true)
|-- ingest_date: string (nullable = true)
|-- ingest_timestamp: string (nullable = true)
|-- _rescued_data: string (nullable = true)

image.png

ただし、パーティションの階層が同一でない場合には、パーティションの列を取得できていないことを確認できました。よって、cloudFiles.partitionColumnsオプションを指定することが望ましいと結論づけました。

src_dir = "dbfs:/user/hive/warehouse/auto_loader_test.db/src_data"
dbutils.fs.rm(src_dir, True)

csv_dir = f"{src_dir}/ingest_date=2020-01-01/ingest_timestamp=2020-01-01 12%3A34%3A56/test.csv"
csv_contents = '''str_col
"abc"'''
dbutils.fs.put(csv_dir, csv_contents, True)

txt_file = f"{src_dir}/ingest_date=2020-01-02/ingest_timestamp=2020-01-02 12%3A34%3A56/test=123/test.txt"
txt_contents = '''str_col
"efg"'''
dbutils.fs.put(txt_file, txt_contents, True)
src_dir = "dbfs:/user/hive/warehouse/auto_loader_test.db/src_data"
# Hive Partition のカラムを保持していないことを確認可能
from pyspark.sql.functions import expr

checkpoint_dir = "dbfs:/user/hive/warehouse/auto_loader_test.db/_checkpoint"
dbutils.fs.rm(checkpoint_dir, True)

src_df = (
    spark.readStream.format("cloudFiles")
    .option("cloudFiles.format", "csv")
    .option("cloudFiles.schemaLocation", checkpoint_dir)
    .option("cloudFiles.inferColumnTypes", False)
    # .option("cloudFiles.partitionColumns", "ingest_date,ingest_timestamp")
    .option("inferSchema", False)
    .load(src_dir)
)

src_df.printSchema()
from pyspark.sql.functions import expr

checkpoint_dir = "dbfs:/user/hive/warehouse/auto_loader_test.db/_checkpoint"
dbutils.fs.rm(checkpoint_dir, True)

src_df = (
    spark.readStream.format("cloudFiles")
    .option("cloudFiles.format", "csv")
    .option("cloudFiles.schemaLocation", checkpoint_dir)
    .option("cloudFiles.inferColumnTypes", False)
    .option("cloudFiles.partitionColumns", "ingest_date,ingest_timestamp")
    .option("inferSchema", False)
    .load(src_dir)
)

src_df.printSchema()

root
|-- str_col: string (nullable = true)
|-- _rescued_data: string (nullable = true)

root
|-- str_col: string (nullable = true)
|-- ingest_date: string (nullable = true)
|-- ingest_timestamp: string (nullable = true)
|-- _rescued_data: string (nullable = true)

image.png

Databricks Auto Loader の基本的な利用方法の確認

Databricks Auto Loader を使用する際の基本的な仕様に従い、次のコードのように記述することで Auto Loader を活用できます。このコードにより、予定どおりに2つのファイルが正しく取り込まれました。取り込んだファイルが正確にどれかを確認するために、_metadata.file_pathの情報をテーブルに記録しました。

from pyspark.sql.functions import expr


src_df = (
    spark.readStream.format("cloudFiles")
    .option("cloudFiles.format", "csv")
    .option("cloudFiles.schemaLocation", checkpoint_dir)
    .option("cloudFiles.inferColumnTypes", False)
    .option("cloudFiles.partitionColumns", "ingest_date,ingest_timestamp")
    .option("inferSchema", True)
    .load(src_dir)
)

add_cols = {
    "_metadata_file_path": expr("_metadata.file_path"),
    "ingest_date": expr("to_date(ingest_date)"),
    "ingest_timestamp": expr("try_to_timestamp(replace(ingest_timestamp, '%3A', ':'))"),
}

src_df = src_df.withColumns(add_cols)

(
    src_df.writeStream
    .option("mergeSchema", True)
    .option("checkpointLocation", checkpoint_dir)
    .trigger(availableNow=True)
    .toTable(tgt_tbl_name)
)
df = spark.table(tgt_tbl_name)
df.display()

image.png

ファイル名の部分一致により動作確認

拡張子を指定する方法

拡張子を指定する方法として、/**/*.csvと指定のように指定することで想定通りの動作となりました。拡張子がcsvであるdbfs:/user/hive/warehouse/auto_loader_test.db/src_data/ingest_date=2020-01-01/ingest_timestamp=2020-01-01%2012%253A34%253A56/csv_file_01.csvのみ取り込まれていました。

src_dir = "dbfs:/user/hive/warehouse/auto_loader_test.db/src_data/**/*.csv"
tgt_tbl_name = "auto_loader_test.tbl_1"
from pyspark.sql.functions import expr

spark.sql(f"TRUNCATE TABLE {tgt_tbl_name}")
checkpoint_dir = "dbfs:/user/hive/warehouse/auto_loader_test.db/_checkpoint"
dbutils.fs.rm(checkpoint_dir, True)

src_df = (
    spark.readStream.format("cloudFiles")
    .option("cloudFiles.format", "csv")
    .option("cloudFiles.schemaLocation", checkpoint_dir)
    .option("cloudFiles.inferColumnTypes", False)
    .option("cloudFiles.partitionColumns", "ingest_date,ingest_timestamp")
    .option("inferSchema", False)
    .load(src_dir)
)

add_cols = {
    "_metadata_file_path": expr("_metadata.file_path"),
    "ingest_date": expr("to_date(ingest_date)"),
    "ingest_timestamp": expr("try_to_timestamp(replace(ingest_timestamp, '%3A', ':'))"),
}

src_df = src_df.withColumns(add_cols)

(
    src_df.writeStream.option("mergeSchema", True)
    .option("checkpointLocation", checkpoint_dir)
    .trigger(availableNow=True)
    .toTable(tgt_tbl_name)
)
df = spark.table(tgt_tbl_name)
df.display()

image.png

ファイル名を指定する方法

ファイル名を指定する方法として、/**/txt_file*と指定のように指定することで想定通りの動作となりました。ファイル名がtxt_fileではじまるdbfs:/user/hive/warehouse/auto_loader_test.db/src_data/ingest_date=2020-01-02/ingest_timestamp=2020-01-02%2012%253A34%253A56/txt_file_01.txtのみが取り込まれていました。

src_dir = "dbfs:/user/hive/warehouse/auto_loader_test.db/src_data/**/txt_file*"
tgt_tbl_name = "auto_loader_test.tbl_2"
from pyspark.sql.functions import expr

spark.sql(f"TRUNCATE TABLE {tgt_tbl_name}")
checkpoint_dir = "dbfs:/user/hive/warehouse/auto_loader_test.db/_checkpoint"
dbutils.fs.rm(checkpoint_dir, True)

src_df = (
    spark.readStream.format("cloudFiles")
    .option("cloudFiles.format", "csv")
    .option("cloudFiles.schemaLocation", checkpoint_dir)
    .option("cloudFiles.inferColumnTypes", False)
    .option("cloudFiles.partitionColumns", "ingest_date,ingest_timestamp")
    .option("inferSchema", False)
    .load(src_dir)
)

add_cols = {
    "_metadata_file_path": expr("_metadata.file_path"),
    "ingest_date": expr("to_date(ingest_date)"),
    "ingest_timestamp": expr("try_to_timestamp(replace(ingest_timestamp, '%3A', ':'))"),
}

src_df = src_df.withColumns(add_cols)

(
    src_df.writeStream.option("mergeSchema", True)
    .option("checkpointLocation", checkpoint_dir)
    .trigger(availableNow=True)
    .toTable(tgt_tbl_name)
)
df = spark.table(tgt_tbl_name)
df.display()

image.png

その他の仕様確認

/**/*.csvではなく/*/*.csvを指定した場合も同様の動作となることを確認

/*/*.csvと記述したい場合にはその直下のディレクトリのみが対象となる想定でしたが、/**/*.csvの動作と同様にすべてのサブディレクトリを参照するような動作となりました。

src_dir = "dbfs:/user/hive/warehouse/auto_loader_test.db/src_data/*/*.csv"
tgt_tbl_name = "auto_loader_test.tbl_1"
from pyspark.sql.functions import expr

spark.sql(f"TRUNCATE TABLE {tgt_tbl_name}")
checkpoint_dir = "dbfs:/user/hive/warehouse/auto_loader_test.db/_checkpoint"
dbutils.fs.rm(checkpoint_dir, True)

src_df = (
    spark.readStream.format("cloudFiles")
    .option("cloudFiles.format", "csv")
    .option("cloudFiles.schemaLocation", checkpoint_dir)
    .option("cloudFiles.inferColumnTypes", False)
    .option("cloudFiles.partitionColumns", "ingest_date,ingest_timestamp")
    .option("inferSchema", False)
    .load(src_dir)
)

add_cols = {
    "_metadata_file_path": expr("_metadata.file_path"),
    "ingest_date": expr("to_date(ingest_date)"),
    "ingest_timestamp": expr("try_to_timestamp(replace(ingest_timestamp, '%3A', ':'))"),
}

src_df = src_df.withColumns(add_cols)

(
    src_df.writeStream.option("mergeSchema", True)
    .option("checkpointLocation", checkpoint_dir)
    .trigger(availableNow=True)
    .toTable(tgt_tbl_name)
)
df = spark.table(tgt_tbl_name)
df.display()
from pyspark.sql.functions import expr

checkpoint_dir = "dbfs:/user/hive/warehouse/auto_loader_test.db/_checkpoint"
dbutils.fs.rm(checkpoint_dir, True)

src_df = (
    spark.readStream.format("cloudFiles")
    .option("cloudFiles.format", "csv")
    .option("cloudFiles.schemaLocation", checkpoint_dir)
    .option("cloudFiles.inferColumnTypes", False)
    .option("cloudFiles.partitionColumns", "ingest_date,ingest_timestamp")
    .option("inferSchema", False)
    .load(src_dir)
)

src_df.printSchema()

headerのデフォルトがfalseと記載されているがデフォルトがtrueであるかの動作を確認

Databricks の Auto Loader のドキュメントにてheaderのデフォルト値がfalseと記述されていますが、上記の説明文ではヘッダーがあると想定する旨の記載があり、実際の動作もheaderのデフォルト値がtrueど同様の動作となりました。

CSVファイルにヘッダーが含まれているかどうか。Auto Loaderは、スキーマを推論するときにファイルにヘッダーがあると想定します。
デフォルト値 false

image.png

引用元:Auto Loaderのオプション | Databricks on AWS

headerを指定しない場合とそれぞれの値の場合で、データフレームのカラム名を確認すると、デフォルトの動作はheadertrueど同様の動作となりました。

from pyspark.sql.functions import expr


checkpoint_dir = "dbfs:/user/hive/warehouse/auto_loader_test.db/_checkpoint"
dbutils.fs.rm(checkpoint_dir, True)

src_df = (
    spark.readStream.format("cloudFiles")
    .option("cloudFiles.format", "csv")
    .option("cloudFiles.schemaLocation", checkpoint_dir)
    .option("cloudFiles.partitionColumns", "ingest_date,ingest_timestamp")
    # .option("header", False)
    .load(src_dir)
)

print("-- デフォルトの動作を確認")
src_df.printSchema()

from pyspark.sql.functions import expr


checkpoint_dir = "dbfs:/user/hive/warehouse/auto_loader_test.db/_checkpoint"
dbutils.fs.rm(checkpoint_dir, True)

src_df = (
    spark.readStream.format("cloudFiles")
    .option("cloudFiles.format", "csv")
    .option("cloudFiles.schemaLocation", checkpoint_dir)
    .option("cloudFiles.partitionColumns", "ingest_date,ingest_timestamp")
    .option("header", False)
    .load(src_dir)
)

print("-- `header`が`False`の場合のスキーマ確認")
src_df.printSchema()

from pyspark.sql.functions import expr


checkpoint_dir = "dbfs:/user/hive/warehouse/auto_loader_test.db/_checkpoint"
dbutils.fs.rm(checkpoint_dir, True)

src_df = (
    spark.readStream.format("cloudFiles")
    .option("cloudFiles.format", "csv")
    .option("cloudFiles.schemaLocation", checkpoint_dir)
    .option("cloudFiles.partitionColumns", "ingest_date,ingest_timestamp")
    .option("header", True)
    .load(src_dir)
)

print("-- `header`が`True`の場合のスキーマ確認")
src_df.printSchema()

-- デフォルトの動作を確認
root
|-- str_col: string (nullable = true)
|-- ingest_date: string (nullable = true)
|-- ingest_timestamp: string (nullable = true)
|-- _rescued_data: string (nullable = true)

-- headerFalseの場合のスキーマ確認
root
|-- _c0: string (nullable = true)
|-- ingest_date: string (nullable = true)
|-- ingest_timestamp: string (nullable = true)
|-- _rescued_data: string (nullable = true)

-- headerTrueの場合のスキーマ確認
root
|-- str_col: string (nullable = true)
|-- ingest_date: string (nullable = true)
|-- ingest_timestamp: string (nullable = true)
|-- _rescued_data: string (nullable = true)

image.png

事後処理

検証で作成したスキーマを削除します。

%sql
DROP SCHEMA IF EXISTS auto_loader_test CASCADE;

image.png

1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0