0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

Databricks Auto Loader 実行時における NullTypes を利用できない事象への対応方法

Posted at

概要

Databricks Auto Loaderを使用して、NullTypes (void) を含むカラムを持つデータフレームを書き込む際には、特定のエラーが発生することがあります。このエラーは、NullTypesのカラムに対して、明示的にデータ型を指定しなければならないことを示しています。エラーが発生する状況とその解決策について説明します。

com.databricks.sql.transaction.tahoe.DeltaAnalysisException: Delta doesn't accept NullTypes in the schema for streaming writes.

image.png

DataFrameWriter を使用してデータフレームを書き込む際には、NullTypes (void) のカラムも書き込むことができます。しかし、Databricks Auto Loaderでは、このようなカラムの書き込みができないため、注意が必要です。

image.png

エラーの発生方法と対応方法

事前準備

Databricks Unity Catalog 上に検証で利用するオブジェクトを作成します。

%sql
CREATE CATALOG IF NOT EXISTS auto_loader_null_test;
CREATE SCHEMA IF NOT EXISTS auto_loader_null_test.schema;
CREATE VOLUME IF NOT EXISTS auto_loader_null_test.schema.volume_01;
CREATE OR REPLACE TABLE auto_loader_null_test.schema.table_01
(
  col_1 string,
  col_2 string,
  col_3 string,
  _rescued_data string,
  addtional_col string
)
;

image.png

ソースのファイルを配置します。

import os
src_dir = "/Volumes/auto_loader_null_test/schema/volume_01/src_files"
src_path = f"{src_dir}/sample.csv"

src_data = """
col_1,col_2,col_3
a,b,c
あ,い,う
""".strip()

dbutils.fs.mkdirs(src_dir)
dbutils.fs.put(src_path, src_data,True)

image.png

エラーの発生方法

cloudFiles フォーマットの readstream のデータフレームを作成します。

checkpoint_path = "/Volumes/auto_loader_null_test/schema/volume_01/checkpoint"

readstream_df = (
    spark.readStream.format("cloudFiles")
    .option("cloudFiles.format", "csv")
    .option("cloudFiles.schemaLocation", checkpoint_path)
    .load(src_dir)
)

NullTypes (void) のカラムを追加します。

from pyspark.sql.functions import expr

readstream_df_with_column = readstream_df.withColumn('addtional_col', expr('NULL'))

書き込みを実行するとエラーとなります。

(
    readstream_df_with_column.writeStream
    .option("checkpointLocation", checkpoint_path)
    .option("mergeSchema", True)
    .trigger(availableNow=True)
    .toTable("auto_loader_null_test.schema.table_01")
)
com.databricks.sql.transaction.tahoe.DeltaAnalysisException: Delta doesn't accept NullTypes in the schema for streaming writes.

image.png

エラーへの対応方法

カラムを追加する際に、CAST によりデータ型を明示するように変更します。

from pyspark.sql.functions import expr

readstream_df_with_column = readstream_df.withColumn('addtional_col', expr('CAST(NULL AS string)'))

正常に書き込みが完了することを確認します。

(
    readstream_df_with_column.writeStream
    .option("checkpointLocation", checkpoint_path)
    .trigger(availableNow=True)
    .toTable("auto_loader_null_test.schema.table_01")
)

image.png

DataFrameWriter における動作確認

正常通りに書き込める事を確認します。

schema = """
col_1 string,
col_2 string,
col_3 string,
_rescued_data string
"""

df = (
    spark.read.format("csv")
    .schema(schema)
    .option("columnNameOfCorruptRecord", "_rescued_data")
    .option("header", True)
    .load(src_dir)
)

df_with_column = df.withColumn("addtional_col", expr("NULL"))

(
    df_with_column.write.mode("append")
    .option("mergeSchema", True)
    .saveAsTable("auto_loader_null_test.schema.table_01")
)

image.png

本手順で作成したリソースの削除

%sql
DROP CATALOG auto_loader_null_test CASCADE;

image.png

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?