概要
Databricks ( Spark )にて変更データフィード(Change Data Feed、CDF) 用前のテーブルバージョンに対して、CDF 機能を用いてデータフレームを作成する際のエラーへの対応方法を共有します。
CDF 有効化前の時点でのデータフレームを作成しようとすると次のようなエラーが発生します。
df = (
spark.read.format("delta")
.option("readChangeFeed", "true")
.option("startingVersion", 0)
.table(f"{db_name}.{tbl_name}")
)
df.display()
AnalysisException: Error getting change data for range [0 , 4] as change data was not
recorded for version [0]. If you've enabled change data feed on this table,
useDESCRIBE HISTORY
to see when it was first enabled.
Otherwise, to start recording change data, useALTER TABLE table_name SET TBLPROPERTIES (delta.enableChangeDataFeed=true)
.
ドキュメントに下記のような記載があるため、このような動作は仕様のようです。
引用元:バッチ クエリの変更を読み取る - Azure Databricks | Microsoft Learn
本エラーが発生しないようにする方法の1つである、テーブルにて保持させた更新日の監査列に基づき、CDF が有効化されたバージョン以降のバージョンを取得する方法を紹介します。データ書き込み後に CDF を有効にした場合には、本記事で紹介する方法では同様に回避策にはなりません。
CDF を有効化した以降の時点、かつ、Delta Log の最終処理以降の時点に対するデータに基づき処理を行う場合には、spark.databricks.delta.changeDataFeed.timestampOutOfRange.enabled
プロパティによりエラーを回避できます。
IllegalArgumentException: Provided Start version(1000) for reading change data is invalid. Start version cannot be greater than the latest version of the table(4).
引用元:Azure Databricks で Delta Lake 変更データ フィードを使用する - Azure Databricks | Microsoft Learn
対尾法
1. 事前準備
from pyspark.sql.functions import current_timestamp
# データを読み込み
filepath = "dbfs:/databricks-datasets/tpch/data-001/nation/"
schema = """
N_NATIONKEY integer
,N_NAME string
,N_REGIONKEY integer
,N_COMMENT string
"""
df = (
spark.read.format("csv")
.schema(schema)
.option("inferSchema", "True")
.option("sep", "|")
.load(filepath)
)
# 更新日列の監査列を付与
df = df.withColumn(
"_update_timestamp",
current_timestamp(),
)
df.display()
db_namedb_name = "_qiita"
tbl_name = "nation"
# データベースを作成
sql = f"""
CREATE SCHEMA IF NOT EXISTS {db_name}
"""
spark.sql(sql)
# テーブル作成
sql = f"""
DROP TABLE IF EXISTS {db_name}.{tbl_name}
"""
spark.sql(sql)
sql = f"""
CREATE TABLE IF NOT EXISTS {db_name}.{tbl_name}
(
N_NATIONKEY integer,
N_NAME string,
N_REGIONKEY integer,
N_COMMENT string,
_update_timestamp timestamp
)
USING delta
"""
spark.sql(sql)
# 変更データフィードを有効化
sql = f"""
ALTER TABLE {db_name}.{tbl_name}
SET TBLPROPERTIES (
delta.enableChangeDataFeed = true
)
"""
spark.sql(sql)
spark.sql(
f"""
SELECT
*
EXCEPT(
userName
)
FROM (
DESC HISTORY {db_name}.{tbl_name}
)
"""
).display()
for i in range(3):
df.write.mode("append").saveAsTable(f"{db_name}.{tbl_name}")
spark.sql(
f"""
SELECT
*
EXCEPT(
userName
)
FROM (
DESC HISTORY {db_name}.{tbl_name}
)
"""
).display()
2. CDF 機能に基づきデータフレームを作成
df = (
spark.read.format("delta")
.option("readChangeFeed", "true")
.option("startingVersion", 1)
.table(f"{db_name}.{tbl_name}")
)
df.display()
3. エラーを再現
tgt_version = 0
df = (
spark.read.format("delta")
.option("readChangeFeed", "true")
.option("startingVersion", tgt_version)
.table(f"{db_name}.{tbl_name}")
)
df.display()
4. エラー回避策
min_ts = (
spark.table(f"{db_name}.{tbl_name}")
.selectExpr("MIN(_update_timestamp) as min")
.first()[0]
)
print(min_ts)
sql = f"""
select
*
FROM (
DESC HISTORY {db_name}.{tbl_name}
)
WHERE
timestamp >= '{min_ts}'
ORDER BY
version ASC
"""
tgt_version = 0
if tgt_version >= min_delta_version:
starting_version = tgt_version
else:
starting_version = min_delta_version
df = (
spark.read.format("delta")
.option("readChangeFeed", "true")
.option("startingVersion", starting_version)
.table(f"{db_name}.{tbl_name}")
)
df.display()
5. 本手順で作成したリソースを削除
spark.sql(f'DROP DATABASE {db_name} CASCADE')