0
0

Databricks Auto Loader にて parquet をソースにする場合における timestamp の hive スタイルのパーティションの値を取得できない事象への対応方法

Last updated at Posted at 2024-06-03

概要

Databricks Auto Loader にて parquet をソースに(cloudFiles.formatオプションにparquet指定)する場合における timestamp の hive スタイルのパーティションの値を取得できない事象への対応方法を共有します。2024/06/04 時点で仕様であるかの確認を実施できておりませんが、暫定対応方法を共有します。

Spark の処理にてpartitionByを指定して書き込むと、Timestamp 型のカラムがingest_timestamp=2020-01-02 12%3A34%3A56という形式で書きこまれます。

image.png

Spark 書き込み時の形式のディレクトリから、 Databricks Auto Loader にて取り込むとNULLとなってしまいます。

image.png

暫定対応方法として、_metadata.file_pathにて取得したディレクトリから Hive スタイルのパーティションの値を取得する方法を確立しました。_metadata.file_pathにて取得したディレクトリは既存のディレクトリ(:が URL エンコーディングされているディレクトリ)を URL エンコーディングするため、2度の URL デコーディングが必要となります。

%sql
WITH src AS (
  -- 基本的なケース
  SELECT
    'dbfs:/user/hive/warehouse/qiita_01.db/src/ingest_timestamp=2020-01-02%2012%253A34%253A56/part-00000-tid-314819177862442950-4140058a-dd1e-41da-8335-19a436805704-173-1.c000.snappy.parquet' AS datasource
  UNION ALL
  -- ingest_timestamp がないケース
  SELECT
    'src/part-00000-tid-8632074880238721543-6e1d2f1d-241d-4139-953a-f1c384afb058-10-1.c000.snappy.parquet'
  UNION ALL
  -- ingest_timestamp が 2 つ以上含まれるケース
  SELECT
    'ingest_timestamp=abc/src/ingest_timestamp=2020-01-02%2012%253A34%253A56/part-00000-tid-8632074880238721543-6e1d2f1d-241d-4139-953a-f1c384afb058-10-1.c000.snappy.parquet'
)
SELECT
  url_decode(
    url_decode(
      element_at(
        FILTER(
          TRANSFORM(
            split(datasource, '/'),
            x -> IF(
              x LIKE 'ingest_timestamp=%',
              replace(x, 'ingest_timestamp=', ''),
              NULL
            )
          ),
          x -> x IS NOT NULL
        ),
        -1
      )
    )
  ) AS ingest_timestamp
FROM
  src

image.png

上記では次のようなステップを実施しています。

  1. split(datasource, '/'):データソース URL を / で分割し、その結果を配列として返します。
  2. TRANSFORM(...):配列の各要素に対して指定した関数を適用し、その結果からなる新しい配列を作成します。この例では、IF 関数を使用して各要素が ingest_timestamp= で始まるかどうかをチェックし、その場合は ingest_timestamp= を削除しています。そうでない場合は NULL を返します。
  3. FILTER(...):配列の各要素に対して指定した条件を適用し、条件を満たす要素からなる新しい配列を作成します。この例では、NULL でない要素をフィルタリングしています。
  4. element_at(..., -1):配列の最後の要素を取得します。ingest_timestamp が 2 つ以上存在する場合、最後の ingest_timestamp が取得されます。
  5. url_decode(url_decode(...)):URL デコードを 2 回適用します。これは、ingest_timestamp が URL エンコードされている可能性があり、さらにその結果が再度 URL エンコードされているためです。この操作により、元の ingest_timestamp が取得されます。

事象の再現方法

事前準備

テーブルとスキーマを作成

base_dir = "dbfs:/user/hive/warehouse/qiita_01.db"
tgt_table_name = "hive_metastore.qiita_01.table_01
%sql
CREATE SCHEMA IF NOT EXISTS hive_metastore.qiita_01;
CREATE OR REPLACE TABLE hive_metastore.qiita_01.table_01 (
  n_nationkey  integer,
  n_name       string,
  n_regionkey  integer,
  n_comment    string,
  ingest_timestamp TIMESTAMP,
  datasource STRING,
  _rescued_data STRING
);
DESC SCHEMA hive_metastore.ms_sr_01;

image.png

2. parquet 形式でpartitionByを指定した上でディレクトリに書き込み

from pyspark.sql.functions import expr

file_dir = base_dir + "/src" 

filepath = "dbfs:/databricks-datasets/tpch/data-001/nation/nation.tbl"
schema = """
n_nationkey  integer
,n_name       string
,n_regionkey  integer 
,n_comment    string
"""
nation_df = (spark
       .read
       .format("csv")
       .schema(schema)
       .option("inferSchema", "True")
       .option("sep", "|")
       .load(filepath)
    )

add_col = {
    "ingest_timestamp": expr("CAST('2020-01-02 12:34:56' AS timestamp)"),
}
nation_df = nation_df.withColumns(add_col)
nation_df.write.format("parquet").partitionBy("ingest_timestamp").mode("overwrite").save(file_dir)

# `ingest_timestamp=2020-01-02 12%3A34%3A56/`というディレクトリが作成されることを確認
display(dbutils.fs.ls(file_dir))

image.png

3. Spark Dataframe として読み込み Hive スタイルのパーティションの値を取得できることを確認

df = spark.read.format("parquet").load(file_dir)
df.display()

image.png

事象の再現

1. Databricks Auto Loader に書き込みを実施

checkpoint_dir = base_dir + "/checkpint" 

# チェックポイントを初期化
dbutils.fs.rm(checkpoint_dir, True)

auto_loader_options = {
    "cloudFiles.format": "parquet",
    "cloudFiles.schemaLocation": checkpoint_dir,
    "cloudFiles.schemaEvolutionMode": "addNewColumns",
    "cloudFiles.partitionColumns": "ingest_timestamp",
    "cloudFiles.inferColumnTypes": "false",
    "ignoreMissingFiles": True,
}

src_df = (
    spark.readStream.format("cloudFiles").options(**auto_loader_options).load(file_dir)
)

with_cond = {
    "datasource": expr("_metadata.file_path"),
}
src_df = src_df.withColumns(with_cond)

write_options = {
    "checkpointLocation": checkpoint_dir,
}
src_df.writeStream.trigger(availableNow=True).options(**write_options).toTable(tgt_table_name)

image.png

2. 書きこみ先のテーブルを確認し、 Hive スタイルのパーティションの値が書き込まれていないことを確認

df = spark.table(tgt_table_name)
df.display()

image.png

事象への対応方法

事前準備

1. テーブルのデータを削除

spark.sql(f"TRUNCATE TABLE {tgt_table_name}")

image.png

ディレクトリから Hive スタイルのパーティションの値を取得する方法による対応

1. Databricks Auto Loader に書き込みを実施

datasource_col_name = "datasource"
ingest_timestamp_col_name = "ingest_timestamp"

# チェックポイントを初期化
dbutils.fs.rm(checkpoint_dir, True)

auto_loader_options = {
    "cloudFiles.format": "parquet",
    "cloudFiles.schemaLocation": checkpoint_dir,
    "cloudFiles.schemaEvolutionMode": "addNewColumns",
    "cloudFiles.partitionColumns": "ingest_timestamp",
    "cloudFiles.inferColumnTypes": "false",
    "ignoreMissingFiles": True,
}

src_df = (
    spark.readStream.format("cloudFiles").options(**auto_loader_options).load(file_dir)
)

audit_col_conf = {
    datasource_col_name: expr("_metadata.file_path"),
}
src_df = src_df.withColumns(audit_col_conf)

ingest_ts_col_conf = {
    ingest_timestamp_col_name: expr(
        f"""
        try_to_timestamp(
            url_decode(
                url_decode(
                    element_at(
                        FILTER(
                            TRANSFORM(
                                split(datasource, '/'),
                                x -> IF(
                                x LIKE 'ingest_timestamp=%',
                                replace(x, 'ingest_timestamp=', ''),
                                NULL
                                )
                            ),
                            x -> x IS NOT NULL
                        ),
                        -1
                    )
                )
            )
        )
        """
    ),
}
src_df = src_df.withColumns(ingest_ts_col_conf)

write_options = {
    "checkpointLocation": checkpoint_dir,
}
src_df.writeStream.trigger(availableNow=True).options(**write_options).toTable(tgt_table_name)

image.png

2. Hive スタイルのパーティションの値が書き込まれていることを確認

df = spark.table(tgt_table_name)

df.display()

image.png

事後処理

1. 作成したスキーマを削除

%sql
DROP SCHEMA IF EXISTS hive_metastore.qiita_01 CASCADE;

image.png

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0