LoginSignup
2
1

概要

Delta Lalek 3.2 の新機能である型拡張(Type widening)に関する検証結果を共有します。本記事では、 OSS Delta Lake ではなく、 Databricks にフォーカスした記事となっております。

image.png

引用元:型拡張 | Databricks on AWS

型拡張(Type widening)について

型拡張(Type widening)とは

Delta Lake 3.2 にて実装された機能であり、ソースのデータ型に応じてターゲットのターゲットテーブルのデータ型をデータ書き込み時に変更する機能です。

image.png

引用元:Delta Lake 3.2 | Delta Lake

Databricks Runtime 15.2 以降で利用できる機能です。

image.png

引用元:型拡張 | Databricks on AWS

サポートされている型の変更

変換元と変換先のデータ型については下記のように記述されています。

image.png

引用元:型拡張 | Databricks on AWS

利用方法

  1. 事前準備
    1. テーブルに対してdelta.enableTypeWideningテーブルプロパティをtrueに設定
    2. Date 型を timestampNTZ 型に変更したい場合にはdelta.feature.timestampNtzテーブルプロパティをsupportedに設定
  2. データ書き込み時にスキーマ展開を許可する方法
    1. スキーマ展開を許可(spark.databricks.delta.schema.autoMerge.enabledを``とする)して INSERT 、あるいは、 MERGE INTO を実行
  3. ALTER 文により手動でデータ型を変更
    1. ALTER COLUMNコマンドを実行

注意事項

1. 数値型を decimal 型に変更する際には手動での実施が推奨

数値型(INT 型や long 型)を decimal に変更する際には桁数が一定を超えない場合にはデータが丸められてしまうため、ALTER COLUMN により手動でデータ型を変更する方が望ましいです。

数値型を decimalに変更する場合、合計精度は開始精度以上である必要があります。 スケールも大きくする場合は、合計精度を対応する量だけ増やす必要があります。

image.png

引用元:型拡張 | Databricks on AWS

2. Databricks Auto Loader で型拡張を実施するにはスキーマ推論に関する情報のディレクトリ(_schema)を削除する必要あり

cloudFiles.schemaLocationで指定したディレクトリの_schemasフォルダにスキーマ推論のデータ型が保持されてしまうため、異なるデータ型のレコードは_rescued_data列に格納されます。2024年6月20日時点では、手動で_schemasフォルダ内のファイルを削除する必要があるようです。

image.png

引用元:Auto Loaderでスキーマ推論と進化を設定する | Databricks on AWS

3. Delta Lake のプロトコルがバージョンアップされることに注意

image.png

引用元:Databricks は Delta Lake 機能の互換性をどのように管理しますか? | Databricks on AWS

検証コードと実行結果

書き込みによるデータ型拡張の実施(int 型から decimal 型への変換)

1. スキーマとテーブルを作成

%sql
CREATE SCHEMA IF NOT EXISTS hive_metastore.manabian_type_widening;
CREATE OR REPLACE TABLE hive_metastore.manabian_type_widening.table_01
(
  id int,
  byte_to_others BYTE,
  short_to_others SHORT,
  int_to_others INT,
  long_to_decimal long,
  float_to_double float,
  decimal_to_decimal_greater_precision DECIMAL(3, 2),
  date_to_timestampNTZ date
);
INSERT OVERWRITE hive_metastore.manabian_type_widening.table_01
(
  id,
  byte_to_others,
  short_to_others,
  int_to_others,
  long_to_decimal,
  float_to_double,
  decimal_to_decimal_greater_precision,
  date_to_timestampNTZ
)
SELECT
  1,
  CAST(1 AS byte),
  CAST(1 AS short),
  CAST(1 AS int),
  CAST(1 AS LONG),
  CAST(1 AS FLOAT),
  CAST(1 AS DECIMAL(3,2)),
  CAST('2020-01-01' AS DATE)
;
SELECT
  *
  FROM
    hive_metastore.manabian_type_widening.table_01

image.png

2. テーブル定義を確認

%sql
DESC TABLE hive_metastore.manabian_type_widening.table_01

image.png

3. DECIMAL(3, 2) のカラムに DECIMAL(5, 2) のデータを挿入する場合にエラーとなることを確認

%sql
-- 指定桁数以上の値を挿入
DELETE FROM hive_metastore.manabian_type_widening.table_01 WHERE id = 2;
INSERT INTO hive_metastore.manabian_type_widening.table_01
(
  id,
  decimal_to_decimal_greater_precision
)
SELECT
  2,
  CAST(123.12 as DECIMAL(5,2))
;

SELECT
  id,
  decimal_to_decimal_greater_precision
  FROM 
    hive_metastore.manabian_type_widening.table_01
  WHERE
    id = 2

[CAST_OVERFLOW_IN_TABLE_INSERT] Fail to assign a value of "DECIMAL(5,2)" type to the "DECIMAL(3,2)" type column or variable decimal_to_decimal_greater_precision due to an overflow. Use try_cast on the input value to tolerate overflow and return NULL instead. SQLSTATE: 22003

image.png

3. テーブルで型拡張を有効にしてテーブルのデータ型が DECIMAL(5, 2) となっていることを確認

%sql
ALTER TABLE hive_metastore.manabian_type_widening.table_01
  SET TBLPROPERTIES (
    'delta.enableTypeWidening' = 'true'
  )
;
DESC DETAIL hive_metastore.manabian_type_widening.table_01;

image.png

%sql

-- スキーマ展開を許可
SET spark.databricks.delta.schema.autoMerge.enabled= True;

-- 指定桁数以上の値を挿入
DELETE FROM hive_metastore.manabian_type_widening.table_01 WHERE id = 2;
INSERT INTO hive_metastore.manabian_type_widening.table_01
(
  id,
  decimal_to_decimal_greater_precision
)
SELECT
  2,
  CAST(123.12 as DECIMAL(5,2))
;

SELECT
  id,
  decimal_to_decimal_greater_precision
  FROM 
    hive_metastore.manabian_type_widening.table_01
  WHERE
    id = 2;

image.png

%sql
DESC TABLE hive_metastore.manabian_type_widening.table_01

image.png

書き込みによるデータ型拡張の実施(date 型から timestampNTZ 型への変換)

1. TIMESTAMP_NTZ 型のデータを挿入した場合に TIMESTAMP_NTZ 型ではなく DATE 型として挿入されることを確認

%sql
-- スキーマ展開を許可
SET spark.databricks.delta.schema.autoMerge.enabled = True;

-- 日付型に timesptamp 型を挿入
DELETE FROM hive_metastore.manabian_type_widening.table_01 WHERE id = 3;
INSERT INTO hive_metastore.manabian_type_widening.table_01
(
  id,
  date_to_timestampNTZ
)
SELECT
  3,
  CAST('2020-01-01 12:12:12' AS TIMESTAMP_NTZ)
;

SELECT
  id,
  date_to_timestampNTZ
  FROM 
    hive_metastore.manabian_type_widening.table_01
  WHERE
    id = 3
;

image.png

2. テーブルに対して TIMESTAMP_NTZ 型を有効にしてカラムが TIMESTAMP_NTZ 型に変更されることを確認

%sql
ALTER TABLE hive_metastore.manabian_type_widening.table_01
  SET TBLPROPERTIES (
    'delta.feature.timestampNtz' = 'supported'
  )
;
DESC DETAIL hive_metastore.manabian_type_widening.table_01;

image.png

%sql
-- スキーマ展開を許可
SET spark.databricks.delta.schema.autoMerge.enabled = True;

-- 日付型に timesptamp 型を挿入
DELETE FROM hive_metastore.manabian_type_widening.table_01 WHERE id = 3;
INSERT INTO hive_metastore.manabian_type_widening.table_01
(
  id,
  date_to_timestampNTZ
)
SELECT
  3,
  CAST('2020-01-01 12:12:12' AS TIMESTAMP_NTZ)
;

SELECT
  id,
  date_to_timestampNTZ
  FROM 
    hive_metastore.manabian_type_widening.table_01
  WHERE
    id = 3
;

image.png

%sql
DESC TABLE hive_metastore.manabian_type_widening.table_01

image.png

手動によるデータ型変換

1. ALTER COLUMNコマンドを実行することで decimal(38, 10) に変更されることを確認

%sql
ALTER TABLE hive_metastore.manabian_type_widening.table_01 
  ALTER COLUMN long_to_decimal TYPE decimal(38, 10)

image.png

%sql
DESC TABLE hive_metastore.manabian_type_widening.table_01

image.png

Databricks Auto Loader による型拡張の検証

1. テーブルを作成

%sql
DROP TABLE IF EXISTS hive_metastore.manabian_type_widening.table_04;
CREATE TABLE hive_metastore.manabian_type_widening.table_04
(
  id int,
  int_to_long INT,
  float_to_double float,
  decimal_to_decimal_greater_precision DECIMAL(3, 2),
  date_to_timestampNTZ date
)
TBLPROPERTIES (
  'delta.enableTypeWidening' = 'true',
  'delta.feature.timestampNtz' = 'supported'
)
;

image.png

2. ソースディレクトリとチェックポイントの初期化

src_dir = "dbfs:/user/hive/warehouse/manabian_type_widening.db/src"
checkpoint_dir = "dbfs:/user/hive/warehouse/manabian_type_widening.db/checkpoint"
tgt_tbl_name = "hive_metastore.manabian_type_widening.table_04"

# ソースディレクトリとチェックポイントの初期化
dbutils.fs.rm(src_dir, True)
dbutils.fs.rm(checkpoint_dir, True)

image.png

3. 初回取り込みを実行

# 初回データの書き込み
src_df = spark.sql(
    """
    SELECT
        1 AS id,
        CAST(1 AS int) AS int_to_long,
        CAST(1 AS FLOAT) AS float_to_double,
        CAST(1 AS DECIMAL(3,2)) AS decimal_to_decimal_greater_precision,
        CAST('2020-01-01' AS DATE) AS date_to_timestampNTZ
    """
)

src_df.write.mode("append").format("parquet").save(src_dir)

image.png

src_df = (
    spark.readStream.format("cloudFiles")
    .option("cloudFiles.format", "parquet")
    .option("cloudFiles.schemaLocation", checkpoint_dir)
    .option("cloudFiles.inferColumnTypes", False)
    .option("inferSchema", True)
    .load(src_dir)
)
(
    src_df.writeStream
    .option("mergeSchema", True)
    .option("checkpointLocation", checkpoint_dir)
    .trigger(availableNow=True)
    .toTable(tgt_tbl_name)
)

image.png

spark.table(tgt_tbl_name).display()
spark.sql(f"DESC {tgt_tbl_name}").display()

image.png

4. 型拡張が実施されないことを確認

# 2 回目のデータを書き込み
src_df = spark.sql(
    """
    SELECT
        2 AS id,
        CAST(123.12 AS LONG) AS int_to_long,
        CAST(123.12 AS double) AS float_to_double,
        CAST(123.12 AS DECIMAL(5,2)) AS decimal_to_decimal_greater_precision,
        CAST('2020-01-01' AS TIMESTAMP_NTZ) AS date_to_timestampNTZ
    """
)

src_df.write.mode("append").format("parquet").save(src_dir)

image.png

src_df = (
    spark.readStream.format("cloudFiles")
    .option("cloudFiles.format", "parquet")
    .option("cloudFiles.schemaLocation", checkpoint_dir)
    .option("cloudFiles.inferColumnTypes", False)
    .option("inferSchema", True)
    .load(src_dir)
)
(
    src_df.writeStream
    .option("mergeSchema", True)
    .option("checkpointLocation", checkpoint_dir)
    .trigger(availableNow=True)
    .toTable(tgt_tbl_name)
)

image.png

spark.table(tgt_tbl_name).display()
spark.sql(f"DESC {tgt_tbl_name}").display()

image.png

5. スキーマ推論に関する情報が格納されているディレクトリを初期化後にデータ取り込むことで型拡張されることを確認

dbutils.fs.rm(f"{checkpoint_dir}/_schemas", True)

image.png

# 3 回目のデータを書き込み
src_df = spark.sql(
    """
    SELECT
        3 AS id,
        CAST(123.12 AS LONG) AS int_to_long,
        CAST(123.12 AS double) AS float_to_double,
        CAST(123.12 AS DECIMAL(5,2)) AS decimal_to_decimal_greater_precision,
        CAST('2020-01-01' AS TIMESTAMP_NTZ) AS date_to_timestampNTZ
    """
)

src_df.write.mode("append").format("parquet").save(src_dir)

image.png

src_df = (
    spark.readStream.format("cloudFiles")
    .option("cloudFiles.format", "parquet")
    .option("cloudFiles.schemaLocation", checkpoint_dir)
    .option("cloudFiles.inferColumnTypes", False)
    .option("inferSchema", True)
    .load(src_dir)
)
(
    src_df.writeStream
    .option("mergeSchema", True)
    .option("checkpointLocation", checkpoint_dir)
    .trigger(availableNow=True)
    .toTable(tgt_tbl_name)
)

image.png

spark.table(tgt_tbl_name).display()
spark.sql(f"DESC {tgt_tbl_name}").display()

image.png

2
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
1