概要
Delta Lalek 3.2 の新機能である型拡張(Type widening)に関する検証結果を共有します。本記事では、 OSS Delta Lake ではなく、 Databricks にフォーカスした記事となっております。
型拡張(Type widening)について
型拡張(Type widening)とは
Delta Lake 3.2 にて実装された機能であり、ソースのデータ型に応じてターゲットのターゲットテーブルのデータ型をデータ書き込み時に変更する機能です。
引用元:Delta Lake 3.2 | Delta Lake
Databricks Runtime 15.2 以降で利用できる機能です。
サポートされている型の変更
変換元と変換先のデータ型については下記のように記述されています。
利用方法
- 事前準備
- テーブルに対して
delta.enableTypeWidening
テーブルプロパティをtrue
に設定 - Date 型を timestampNTZ 型に変更したい場合には
delta.feature.timestampNtz
テーブルプロパティをsupported
に設定
- テーブルに対して
- データ書き込み時にスキーマ展開を許可する方法
- スキーマ展開を許可(
spark.databricks.delta.schema.autoMerge.enabled
を``とする)して INSERT 、あるいは、 MERGE INTO を実行
- スキーマ展開を許可(
- ALTER 文により手動でデータ型を変更
- ALTER COLUMNコマンドを実行
注意事項
1. 数値型を decimal 型に変更する際には手動での実施が推奨
数値型(INT 型や long 型)を decimal に変更する際には桁数が一定を超えない場合にはデータが丸められてしまうため、ALTER COLUMN により手動でデータ型を変更する方が望ましいです。
数値型を decimalに変更する場合、合計精度は開始精度以上である必要があります。 スケールも大きくする場合は、合計精度を対応する量だけ増やす必要があります。
2. Databricks Auto Loader で型拡張を実施するにはスキーマ推論に関する情報のディレクトリ(_schema)を削除する必要あり
cloudFiles.schemaLocation
で指定したディレクトリの_schemas
フォルダにスキーマ推論のデータ型が保持されてしまうため、異なるデータ型のレコードは_rescued_data
列に格納されます。2024年6月20日時点では、手動で_schemas
フォルダ内のファイルを削除する必要があるようです。
引用元:Auto Loaderでスキーマ推論と進化を設定する | Databricks on AWS
3. Delta Lake のプロトコルがバージョンアップされることに注意
引用元:Databricks は Delta Lake 機能の互換性をどのように管理しますか? | Databricks on AWS
検証コードと実行結果
書き込みによるデータ型拡張の実施(int 型から decimal 型への変換)
1. スキーマとテーブルを作成
%sql
CREATE SCHEMA IF NOT EXISTS hive_metastore.manabian_type_widening;
CREATE OR REPLACE TABLE hive_metastore.manabian_type_widening.table_01
(
id int,
byte_to_others BYTE,
short_to_others SHORT,
int_to_others INT,
long_to_decimal long,
float_to_double float,
decimal_to_decimal_greater_precision DECIMAL(3, 2),
date_to_timestampNTZ date
);
INSERT OVERWRITE hive_metastore.manabian_type_widening.table_01
(
id,
byte_to_others,
short_to_others,
int_to_others,
long_to_decimal,
float_to_double,
decimal_to_decimal_greater_precision,
date_to_timestampNTZ
)
SELECT
1,
CAST(1 AS byte),
CAST(1 AS short),
CAST(1 AS int),
CAST(1 AS LONG),
CAST(1 AS FLOAT),
CAST(1 AS DECIMAL(3,2)),
CAST('2020-01-01' AS DATE)
;
SELECT
*
FROM
hive_metastore.manabian_type_widening.table_01
2. テーブル定義を確認
%sql
DESC TABLE hive_metastore.manabian_type_widening.table_01
3. DECIMAL(3, 2) のカラムに DECIMAL(5, 2) のデータを挿入する場合にエラーとなることを確認
%sql
-- 指定桁数以上の値を挿入
DELETE FROM hive_metastore.manabian_type_widening.table_01 WHERE id = 2;
INSERT INTO hive_metastore.manabian_type_widening.table_01
(
id,
decimal_to_decimal_greater_precision
)
SELECT
2,
CAST(123.12 as DECIMAL(5,2))
;
SELECT
id,
decimal_to_decimal_greater_precision
FROM
hive_metastore.manabian_type_widening.table_01
WHERE
id = 2
[CAST_OVERFLOW_IN_TABLE_INSERT] Fail to assign a value of "DECIMAL(5,2)" type to the "DECIMAL(3,2)" type column or variable
decimal_to_decimal_greater_precision
due to an overflow. Usetry_cast
on the input value to tolerate overflow and return NULL instead. SQLSTATE: 22003
3. テーブルで型拡張を有効にしてテーブルのデータ型が DECIMAL(5, 2) となっていることを確認
%sql
ALTER TABLE hive_metastore.manabian_type_widening.table_01
SET TBLPROPERTIES (
'delta.enableTypeWidening' = 'true'
)
;
DESC DETAIL hive_metastore.manabian_type_widening.table_01;
%sql
-- スキーマ展開を許可
SET spark.databricks.delta.schema.autoMerge.enabled= True;
-- 指定桁数以上の値を挿入
DELETE FROM hive_metastore.manabian_type_widening.table_01 WHERE id = 2;
INSERT INTO hive_metastore.manabian_type_widening.table_01
(
id,
decimal_to_decimal_greater_precision
)
SELECT
2,
CAST(123.12 as DECIMAL(5,2))
;
SELECT
id,
decimal_to_decimal_greater_precision
FROM
hive_metastore.manabian_type_widening.table_01
WHERE
id = 2;
%sql
DESC TABLE hive_metastore.manabian_type_widening.table_01
書き込みによるデータ型拡張の実施(date 型から timestampNTZ 型への変換)
1. TIMESTAMP_NTZ 型のデータを挿入した場合に TIMESTAMP_NTZ 型ではなく DATE 型として挿入されることを確認
%sql
-- スキーマ展開を許可
SET spark.databricks.delta.schema.autoMerge.enabled = True;
-- 日付型に timesptamp 型を挿入
DELETE FROM hive_metastore.manabian_type_widening.table_01 WHERE id = 3;
INSERT INTO hive_metastore.manabian_type_widening.table_01
(
id,
date_to_timestampNTZ
)
SELECT
3,
CAST('2020-01-01 12:12:12' AS TIMESTAMP_NTZ)
;
SELECT
id,
date_to_timestampNTZ
FROM
hive_metastore.manabian_type_widening.table_01
WHERE
id = 3
;
2. テーブルに対して TIMESTAMP_NTZ 型を有効にしてカラムが TIMESTAMP_NTZ 型に変更されることを確認
%sql
ALTER TABLE hive_metastore.manabian_type_widening.table_01
SET TBLPROPERTIES (
'delta.feature.timestampNtz' = 'supported'
)
;
DESC DETAIL hive_metastore.manabian_type_widening.table_01;
%sql
-- スキーマ展開を許可
SET spark.databricks.delta.schema.autoMerge.enabled = True;
-- 日付型に timesptamp 型を挿入
DELETE FROM hive_metastore.manabian_type_widening.table_01 WHERE id = 3;
INSERT INTO hive_metastore.manabian_type_widening.table_01
(
id,
date_to_timestampNTZ
)
SELECT
3,
CAST('2020-01-01 12:12:12' AS TIMESTAMP_NTZ)
;
SELECT
id,
date_to_timestampNTZ
FROM
hive_metastore.manabian_type_widening.table_01
WHERE
id = 3
;
%sql
DESC TABLE hive_metastore.manabian_type_widening.table_01
手動によるデータ型変換
1. ALTER COLUMNコマンドを実行することで decimal(38, 10) に変更されることを確認
%sql
ALTER TABLE hive_metastore.manabian_type_widening.table_01
ALTER COLUMN long_to_decimal TYPE decimal(38, 10)
%sql
DESC TABLE hive_metastore.manabian_type_widening.table_01
Databricks Auto Loader による型拡張の検証
1. テーブルを作成
%sql
DROP TABLE IF EXISTS hive_metastore.manabian_type_widening.table_04;
CREATE TABLE hive_metastore.manabian_type_widening.table_04
(
id int,
int_to_long INT,
float_to_double float,
decimal_to_decimal_greater_precision DECIMAL(3, 2),
date_to_timestampNTZ date
)
TBLPROPERTIES (
'delta.enableTypeWidening' = 'true',
'delta.feature.timestampNtz' = 'supported'
)
;
2. ソースディレクトリとチェックポイントの初期化
src_dir = "dbfs:/user/hive/warehouse/manabian_type_widening.db/src"
checkpoint_dir = "dbfs:/user/hive/warehouse/manabian_type_widening.db/checkpoint"
tgt_tbl_name = "hive_metastore.manabian_type_widening.table_04"
# ソースディレクトリとチェックポイントの初期化
dbutils.fs.rm(src_dir, True)
dbutils.fs.rm(checkpoint_dir, True)
3. 初回取り込みを実行
# 初回データの書き込み
src_df = spark.sql(
"""
SELECT
1 AS id,
CAST(1 AS int) AS int_to_long,
CAST(1 AS FLOAT) AS float_to_double,
CAST(1 AS DECIMAL(3,2)) AS decimal_to_decimal_greater_precision,
CAST('2020-01-01' AS DATE) AS date_to_timestampNTZ
"""
)
src_df.write.mode("append").format("parquet").save(src_dir)
src_df = (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "parquet")
.option("cloudFiles.schemaLocation", checkpoint_dir)
.option("cloudFiles.inferColumnTypes", False)
.option("inferSchema", True)
.load(src_dir)
)
(
src_df.writeStream
.option("mergeSchema", True)
.option("checkpointLocation", checkpoint_dir)
.trigger(availableNow=True)
.toTable(tgt_tbl_name)
)
spark.table(tgt_tbl_name).display()
spark.sql(f"DESC {tgt_tbl_name}").display()
4. 型拡張が実施されないことを確認
# 2 回目のデータを書き込み
src_df = spark.sql(
"""
SELECT
2 AS id,
CAST(123.12 AS LONG) AS int_to_long,
CAST(123.12 AS double) AS float_to_double,
CAST(123.12 AS DECIMAL(5,2)) AS decimal_to_decimal_greater_precision,
CAST('2020-01-01' AS TIMESTAMP_NTZ) AS date_to_timestampNTZ
"""
)
src_df.write.mode("append").format("parquet").save(src_dir)
src_df = (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "parquet")
.option("cloudFiles.schemaLocation", checkpoint_dir)
.option("cloudFiles.inferColumnTypes", False)
.option("inferSchema", True)
.load(src_dir)
)
(
src_df.writeStream
.option("mergeSchema", True)
.option("checkpointLocation", checkpoint_dir)
.trigger(availableNow=True)
.toTable(tgt_tbl_name)
)
spark.table(tgt_tbl_name).display()
spark.sql(f"DESC {tgt_tbl_name}").display()
5. スキーマ推論に関する情報が格納されているディレクトリを初期化後にデータ取り込むことで型拡張されることを確認
dbutils.fs.rm(f"{checkpoint_dir}/_schemas", True)
# 3 回目のデータを書き込み
src_df = spark.sql(
"""
SELECT
3 AS id,
CAST(123.12 AS LONG) AS int_to_long,
CAST(123.12 AS double) AS float_to_double,
CAST(123.12 AS DECIMAL(5,2)) AS decimal_to_decimal_greater_precision,
CAST('2020-01-01' AS TIMESTAMP_NTZ) AS date_to_timestampNTZ
"""
)
src_df.write.mode("append").format("parquet").save(src_dir)
src_df = (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "parquet")
.option("cloudFiles.schemaLocation", checkpoint_dir)
.option("cloudFiles.inferColumnTypes", False)
.option("inferSchema", True)
.load(src_dir)
)
(
src_df.writeStream
.option("mergeSchema", True)
.option("checkpointLocation", checkpoint_dir)
.trigger(availableNow=True)
.toTable(tgt_tbl_name)
)
spark.table(tgt_tbl_name).display()
spark.sql(f"DESC {tgt_tbl_name}").display()