概要
Databricks Auto Loader にて parquet をソースに(cloudFiles.format
オプションにparquet
指定)する場合における timestamp の hive スタイルのパーティションの値を取得できない事象への対応方法を共有します。2024/06/04 時点で仕様であるかの確認を実施できておりませんが、暫定対応方法を共有します。
Spark の処理にてpartitionBy
を指定して書き込むと、Timestamp 型のカラムがingest_timestamp=2020-01-02 12%3A34%3A56
という形式で書きこまれます。
Spark 書き込み時の形式のディレクトリから、 Databricks Auto Loader にて取り込むとNULL
となってしまいます。
暫定対応方法として、_metadata.file_path
にて取得したディレクトリから Hive スタイルのパーティションの値を取得する方法を確立しました。_metadata.file_path
にて取得したディレクトリは既存のディレクトリ(:
が URL エンコーディングされているディレクトリ)を URL エンコーディングするため、2度の URL デコーディングが必要となります。
%sql
WITH src AS (
-- 基本的なケース
SELECT
'dbfs:/user/hive/warehouse/qiita_01.db/src/ingest_timestamp=2020-01-02%2012%253A34%253A56/part-00000-tid-314819177862442950-4140058a-dd1e-41da-8335-19a436805704-173-1.c000.snappy.parquet' AS datasource
UNION ALL
-- ingest_timestamp がないケース
SELECT
'src/part-00000-tid-8632074880238721543-6e1d2f1d-241d-4139-953a-f1c384afb058-10-1.c000.snappy.parquet'
UNION ALL
-- ingest_timestamp が 2 つ以上含まれるケース
SELECT
'ingest_timestamp=abc/src/ingest_timestamp=2020-01-02%2012%253A34%253A56/part-00000-tid-8632074880238721543-6e1d2f1d-241d-4139-953a-f1c384afb058-10-1.c000.snappy.parquet'
)
SELECT
url_decode(
url_decode(
element_at(
FILTER(
TRANSFORM(
split(datasource, '/'),
x -> IF(
x LIKE 'ingest_timestamp=%',
replace(x, 'ingest_timestamp=', ''),
NULL
)
),
x -> x IS NOT NULL
),
-1
)
)
) AS ingest_timestamp
FROM
src
上記では次のようなステップを実施しています。
-
split(datasource, '/')
:データソース URL を/
で分割し、その結果を配列として返します。 -
TRANSFORM(...)
:配列の各要素に対して指定した関数を適用し、その結果からなる新しい配列を作成します。この例では、IF
関数を使用して各要素がingest_timestamp=
で始まるかどうかをチェックし、その場合はingest_timestamp=
を削除しています。そうでない場合はNULL
を返します。 -
FILTER(...)
:配列の各要素に対して指定した条件を適用し、条件を満たす要素からなる新しい配列を作成します。この例では、NULL
でない要素をフィルタリングしています。 -
element_at(..., -1)
:配列の最後の要素を取得します。ingest_timestamp
が 2 つ以上存在する場合、最後のingest_timestamp
が取得されます。 -
url_decode(url_decode(...))
:URL デコードを 2 回適用します。これは、ingest_timestamp
が URL エンコードされている可能性があり、さらにその結果が再度 URL エンコードされているためです。この操作により、元のingest_timestamp
が取得されます。
事象の再現方法
事前準備
テーブルとスキーマを作成
base_dir = "dbfs:/user/hive/warehouse/qiita_01.db"
tgt_table_name = "hive_metastore.qiita_01.table_01
%sql
CREATE SCHEMA IF NOT EXISTS hive_metastore.qiita_01;
CREATE OR REPLACE TABLE hive_metastore.qiita_01.table_01 (
n_nationkey integer,
n_name string,
n_regionkey integer,
n_comment string,
ingest_timestamp TIMESTAMP,
datasource STRING,
_rescued_data STRING
);
DESC SCHEMA hive_metastore.ms_sr_01;
2. parquet 形式でpartitionBy
を指定した上でディレクトリに書き込み
from pyspark.sql.functions import expr
file_dir = base_dir + "/src"
filepath = "dbfs:/databricks-datasets/tpch/data-001/nation/nation.tbl"
schema = """
n_nationkey integer
,n_name string
,n_regionkey integer
,n_comment string
"""
nation_df = (spark
.read
.format("csv")
.schema(schema)
.option("inferSchema", "True")
.option("sep", "|")
.load(filepath)
)
add_col = {
"ingest_timestamp": expr("CAST('2020-01-02 12:34:56' AS timestamp)"),
}
nation_df = nation_df.withColumns(add_col)
nation_df.write.format("parquet").partitionBy("ingest_timestamp").mode("overwrite").save(file_dir)
# `ingest_timestamp=2020-01-02 12%3A34%3A56/`というディレクトリが作成されることを確認
display(dbutils.fs.ls(file_dir))
3. Spark Dataframe として読み込み Hive スタイルのパーティションの値を取得できることを確認
df = spark.read.format("parquet").load(file_dir)
df.display()
事象の再現
1. Databricks Auto Loader に書き込みを実施
checkpoint_dir = base_dir + "/checkpint"
# チェックポイントを初期化
dbutils.fs.rm(checkpoint_dir, True)
auto_loader_options = {
"cloudFiles.format": "parquet",
"cloudFiles.schemaLocation": checkpoint_dir,
"cloudFiles.schemaEvolutionMode": "addNewColumns",
"cloudFiles.partitionColumns": "ingest_timestamp",
"cloudFiles.inferColumnTypes": "false",
"ignoreMissingFiles": True,
}
src_df = (
spark.readStream.format("cloudFiles").options(**auto_loader_options).load(file_dir)
)
with_cond = {
"datasource": expr("_metadata.file_path"),
}
src_df = src_df.withColumns(with_cond)
write_options = {
"checkpointLocation": checkpoint_dir,
}
src_df.writeStream.trigger(availableNow=True).options(**write_options).toTable(tgt_table_name)
2. 書きこみ先のテーブルを確認し、 Hive スタイルのパーティションの値が書き込まれていないことを確認
df = spark.table(tgt_table_name)
df.display()
事象への対応方法
事前準備
1. テーブルのデータを削除
spark.sql(f"TRUNCATE TABLE {tgt_table_name}")
ディレクトリから Hive スタイルのパーティションの値を取得する方法による対応
1. Databricks Auto Loader に書き込みを実施
datasource_col_name = "datasource"
ingest_timestamp_col_name = "ingest_timestamp"
# チェックポイントを初期化
dbutils.fs.rm(checkpoint_dir, True)
auto_loader_options = {
"cloudFiles.format": "parquet",
"cloudFiles.schemaLocation": checkpoint_dir,
"cloudFiles.schemaEvolutionMode": "addNewColumns",
"cloudFiles.partitionColumns": "ingest_timestamp",
"cloudFiles.inferColumnTypes": "false",
"ignoreMissingFiles": True,
}
src_df = (
spark.readStream.format("cloudFiles").options(**auto_loader_options).load(file_dir)
)
audit_col_conf = {
datasource_col_name: expr("_metadata.file_path"),
}
src_df = src_df.withColumns(audit_col_conf)
ingest_ts_col_conf = {
ingest_timestamp_col_name: expr(
f"""
try_to_timestamp(
url_decode(
url_decode(
element_at(
FILTER(
TRANSFORM(
split(datasource, '/'),
x -> IF(
x LIKE 'ingest_timestamp=%',
replace(x, 'ingest_timestamp=', ''),
NULL
)
),
x -> x IS NOT NULL
),
-1
)
)
)
)
"""
),
}
src_df = src_df.withColumns(ingest_ts_col_conf)
write_options = {
"checkpointLocation": checkpoint_dir,
}
src_df.writeStream.trigger(availableNow=True).options(**write_options).toTable(tgt_table_name)
2. Hive スタイルのパーティションの値が書き込まれていることを確認
df = spark.table(tgt_table_name)
df.display()
事後処理
1. 作成したスキーマを削除
%sql
DROP SCHEMA IF EXISTS hive_metastore.qiita_01 CASCADE;