1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

Databricks ( Spark )にて変更データフィード(Change Data Feed、CDF) 適用前の時点に対するクエリエラーへの対応方法

Last updated at Posted at 2022-11-29

概要

Databricks ( Spark )にて変更データフィード(Change Data Feed、CDF) 用前のテーブルバージョンに対して、CDF 機能を用いてデータフレームを作成する際のエラーへの対応方法を共有します。

CDF 有効化前の時点でのデータフレームを作成しようとすると次のようなエラーが発生します。

image.png

df = (
    spark.read.format("delta")
    .option("readChangeFeed", "true")
    .option("startingVersion", 0)
    .table(f"{db_name}.{tbl_name}")
)
df.display()

AnalysisException: Error getting change data for range [0 , 4] as change data was not
recorded for version [0]. If you've enabled change data feed on this table,
use DESCRIBE HISTORY to see when it was first enabled.
Otherwise, to start recording change data, use ALTER TABLE table_name SET TBLPROPERTIES (delta.enableChangeDataFeed=true).

ドキュメントに下記のような記載があるため、このような動作は仕様のようです。

image.png

引用元:バッチ クエリの変更を読み取る - Azure Databricks | Microsoft Learn

本エラーが発生しないようにする方法の1つである、テーブルにて保持させた更新日の監査列に基づき、CDF が有効化されたバージョン以降のバージョンを取得する方法を紹介します。データ書き込み後に CDF を有効にした場合には、本記事で紹介する方法では同様に回避策にはなりません。

image.png

CDF を有効化した以降の時点、かつ、Delta Log の最終処理以降の時点に対するデータに基づき処理を行う場合には、spark.databricks.delta.changeDataFeed.timestampOutOfRange.enabled プロパティによりエラーを回避できます。

image.png

IllegalArgumentException: Provided Start version(1000) for reading change data is invalid. Start version cannot be greater than the latest version of the table(4).

image.png

引用元:Azure Databricks で Delta Lake 変更データ フィードを使用する - Azure Databricks | Microsoft Learn

image.png

対尾法

1. 事前準備

from pyspark.sql.functions import current_timestamp

# データを読み込み
filepath = "dbfs:/databricks-datasets/tpch/data-001/nation/"
 
schema = """
  N_NATIONKEY  integer
  ,N_NAME       string
  ,N_REGIONKEY  integer 
  ,N_COMMENT    string
"""
 
df = (
    spark.read.format("csv")
    .schema(schema)
    .option("inferSchema", "True")
    .option("sep", "|")
    .load(filepath)
)
 
# 更新日列の監査列を付与
df = df.withColumn(
    "_update_timestamp",
    current_timestamp(),
)
 
df.display()

image.png

db_namedb_name = "_qiita"
tbl_name = "nation"

# データベースを作成
sql = f"""
CREATE SCHEMA IF NOT EXISTS {db_name}
"""
spark.sql(sql)
 
# テーブル作成
sql = f"""
DROP TABLE IF EXISTS {db_name}.{tbl_name}
"""
spark.sql(sql)
 
sql = f"""
CREATE TABLE IF NOT EXISTS {db_name}.{tbl_name}
(
  N_NATIONKEY  integer,
  N_NAME       string,
  N_REGIONKEY  integer, 
  N_COMMENT    string,
  _update_timestamp timestamp
)
USING delta
"""
spark.sql(sql)

# 変更データフィードを有効化
sql = f"""
ALTER TABLE {db_name}.{tbl_name}
  SET TBLPROPERTIES (
    delta.enableChangeDataFeed = true
  )
"""
spark.sql(sql)

image.png

spark.sql(
    f"""
SELECT
  *
EXCEPT(
  userName
)
FROM (
  DESC HISTORY {db_name}.{tbl_name}
)
"""
).display()

image.png

for i in range(3):
    df.write.mode("append").saveAsTable(f"{db_name}.{tbl_name}")

spark.sql(
    f"""
SELECT
  *
EXCEPT(
  userName
)
FROM (
  DESC HISTORY {db_name}.{tbl_name}
)
"""
).display()

image.png

2. CDF 機能に基づきデータフレームを作成

df = (
    spark.read.format("delta")
    .option("readChangeFeed", "true")
    .option("startingVersion", 1)
    .table(f"{db_name}.{tbl_name}")
)
df.display()

image.png

3. エラーを再現

tgt_version  = 0
 
df = (
    spark.read.format("delta")
    .option("readChangeFeed", "true")
    .option("startingVersion", tgt_version)
    .table(f"{db_name}.{tbl_name}")
)
df.display()

image.png

4. エラー回避策

min_ts = (
    spark.table(f"{db_name}.{tbl_name}")
    .selectExpr("MIN(_update_timestamp) as min")
    .first()[0]
)
print(min_ts)
 
sql = f"""
select
  *
  FROM (
    DESC HISTORY {db_name}.{tbl_name}
  )
  WHERE
    timestamp >= '{min_ts}'
  ORDER BY
    version ASC
"""

tgt_version  = 0

if tgt_version >= min_delta_version:
    starting_version = tgt_version
else:
    starting_version = min_delta_version

df = (
    spark.read.format("delta")
    .option("readChangeFeed", "true")
    .option("startingVersion", starting_version)
    .table(f"{db_name}.{tbl_name}")
)
df.display()

image.png

5. 本手順で作成したリソースを削除

spark.sql(f'DROP DATABASE {db_name} CASCADE')

image.png

1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?