0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Databricks から Snowflake のマイクロパーティションに近い仕様の Parquet ファイルを出力する方法

Last updated at Posted at 2025-03-10

概要

Databricks に作成したDelta Lake 形式のデータを Snowflake 管理 Apache Iceberg テーブルの仕様(マイクロパーティションに近い仕様)に合わせる方法の検討結果を共有します。本記事では、成功した方法と失敗し方法を紹介します。

Snowflake 管理 Apache Iceberg テーブルのSTORAGE_SERIALIZATION_POLICY オプションをOPTIMIZEDに指定した場合には下記の設定がされていることを確認できました。

  1. 圧縮形式は非圧縮(uncompressed)
  2. 1 つの Parquet ファイルのサイズが 16 ~ 17 MB 程度

その仕様に近い Parquet ファイルを出力するために、ファインサイズを最適化する関数による方法を確立しました。厳密には 16 MB ではないですが、微調整すれば 16 ~ 17 MB 程度に近づけることができます。

image.png

成功した方法

マイクロパーティション仕様に最適化する関数による方法

利用する関数

spark.sql.parquet.compression.codecを指定していることで、2025年3月10日時点で Databricks Notebook Serverless にてエラーが発生するため、通常のクラスターで実行してください。

import math

def optimize_for_micro_partitions(
    table_name: str,
    filter_cond: str = None,
    desired_partition_size_mb: int = 16,
    set_uncompressed: bool = True,
    overhead_factor: float = 1.1,
) -> None:
    """
    Deltaテーブルに対して、指定されたフィルタ条件に基づくレコードのパーティションを
    希望する1パーティションあたりのサイズ(MB単位、デフォルトは16MB)に最適化します。

    Args:
        table_name (str): 対象テーブル名(例: "db.schema.table")
        filter_cond (str, optional): 書き出し対象のフィルタ条件(例: "l_quantity = 3")。Noneの場合、全件対象となる。
        desired_partition_size_mb (int): 希望する1パーティションのサイズ (MB単位)
        set_uncompressed (bool): Trueの場合、spark.sql.parquet.compression.codecを"uncompressed"に設定する
        overhead_factor (float): 書き込み時のオーバーヘッドを見込むための係数(例: 1.1)
    """
    # オプション設定: spark.conf設定を実施
    if set_uncompressed:
        spark.conf.set("spark.sql.parquet.compression.codec", "uncompressed")
        print("spark.sql.parquet.compression.codec を 'uncompressed' に設定しました。")
    
    # 1. DESCRIBE DETAILでテーブル全体のサイズ(バイト)を取得する
    detail_df = spark.sql(f"DESCRIBE DETAIL {table_name}")
    table_detail = detail_df.collect()[0]
    table_size_bytes = table_detail["sizeInBytes"]
    print("テーブル全体のサイズ (bytes):", table_size_bytes)

    # 2. テーブル全体の総レコード数を取得
    full_df = spark.read.table(table_name)
    total_count = full_df.count()
    print("テーブル全体の総レコード数:", total_count)

    # 3. フィルタ条件に基づき、フィルタ後のDataFrameを作成
    if filter_cond:
        filtered_df = full_df.filter(filter_cond)
        filtered_count = filtered_df.count()
        print("フィルタ後のレコード数:", filtered_count)
    else:
        filtered_df = full_df
        filtered_count = total_count
        print("filter_cond が空の場合、全件のレコード数:", filtered_count)

    # 4. フィルタ後DataFrameの概算サイズ(bytes)を計算
    #    テーブル全体サイズに対するフィルタ件数の比率で概算サイズを求める
    estimated_filtered_size = table_size_bytes * (filtered_count / total_count)
    print("フィルタ後DataFrameの概算サイズ (bytes):", estimated_filtered_size)

    # 5. 希望する1パーティションのサイズをバイト単位に変換
    desired_partition_size = desired_partition_size_mb * 1024 * 1024

    # 6. オーバーヘッドを考慮した概算サイズから必要なパーティション数を計算
    num_partitions = math.ceil((estimated_filtered_size * overhead_factor) / desired_partition_size)
    print("再分割するパーティション数:", num_partitions)
    
    # 7. 推定1パーティションあたりのサイズ (MB)
    estimated_partition_mb = (estimated_filtered_size * overhead_factor) / num_partitions / (1024 * 1024)
    print(f"推定1パーティションあたりのサイズ: {estimated_partition_mb:.2f} MB")

    # 8. DataFrame の再パーティション化と書き込み
    filtered_df_repartitioned = filtered_df.repartition(num_partitions)
    writer = filtered_df_repartitioned.write.format("delta").mode("overwrite")
    
    if filter_cond:
        print("Selective overwrite 開始")
        writer = writer.option("replaceWhere", filter_cond)
        writer.saveAsTable(table_name)
        print("Selective overwrite 終了")
    else:
        print("Overwrite 開始")
        writer.saveAsTable(table_name)
        print("Overwrite 終了")

検証結果

検証用のデータベースを作成

%sql
CREATE CATALOG iceberg_snowflake_test;

image.png

テーブルのソースとなる一時ビューを作成します。

%sql
-- 1. CSV ファイルを読み込むテーブル(または一時ビュー)の作成
CREATE OR REPLACE TEMP VIEW _temp_customer_csv (
  c_custkey long,
  c_name string,
  c_address string,
  c_nationkey long,
  c_phone string,
  c_acctbal decimal(12, 2),
  c_mktsegment string,
  c_comment string
)
USING CSV
OPTIONS (
  path 'dbfs:/databricks-datasets/tpch/data-001/customer/customer.tbl',
  header 'false',
  delimiter '|'
);

image.png

ファイルサイズの比較をしやすいように、テーブルの DROP 後にソースの一時ビューから CTAS によりテーブルを作成します。

%sql

DROP TABLE IF EXISTS iceberg_snowflake_share_test.default.customer_delta;

-- 2. 上記データから Delta Lake テーブルを作成(書き出し時のターゲットファイルサイズを 16 MB に設定)
CREATE OR REPLACE TABLE iceberg_snowflake_share_test.default.customer_delta
USING DELTA
TBLPROPERTIES (
  'delta.enableDeletionVectors' = false
)
AS
SELECT * FROM _temp_customer_csv;

image.png

関数の定義と実行を行います。

import math

def optimize_for_micro_partitions(
    table_name: str,
    filter_cond: str = None,
    desired_partition_size_mb: int = 16,
    set_uncompressed: bool = True,
    overhead_factor: float = 1.1,
) -> None:
    """
    Deltaテーブルに対して、指定されたフィルタ条件に基づくレコードのパーティションを
    希望する1パーティションあたりのサイズ(MB単位、デフォルトは16MB)に最適化します。

    Args:
        table_name (str): 対象テーブル名(例: "db.schema.table")
        filter_cond (str, optional): 書き出し対象のフィルタ条件(例: "l_quantity = 3")。Noneの場合、全件対象となる。
        desired_partition_size_mb (int): 希望する1パーティションのサイズ (MB単位)
        set_uncompressed (bool): Trueの場合、spark.sql.parquet.compression.codecを"uncompressed"に設定する
        overhead_factor (float): 書き込み時のオーバーヘッドを見込むための係数(例: 1.1)
    """
    # オプション設定: spark.conf設定を実施
    if set_uncompressed:
        spark.conf.set("spark.sql.parquet.compression.codec", "uncompressed")
        print("spark.sql.parquet.compression.codec を 'uncompressed' に設定しました。")
    
    # 1. DESCRIBE DETAILでテーブル全体のサイズ(バイト)を取得する
    detail_df = spark.sql(f"DESCRIBE DETAIL {table_name}")
    table_detail = detail_df.collect()[0]
    table_size_bytes = table_detail["sizeInBytes"]
    print("テーブル全体のサイズ (bytes):", table_size_bytes)

    # 2. テーブル全体の総レコード数を取得
    full_df = spark.read.table(table_name)
    total_count = full_df.count()
    print("テーブル全体の総レコード数:", total_count)

    # 3. フィルタ条件に基づき、フィルタ後のDataFrameを作成
    if filter_cond:
        filtered_df = full_df.filter(filter_cond)
        filtered_count = filtered_df.count()
        print("フィルタ後のレコード数:", filtered_count)
    else:
        filtered_df = full_df
        filtered_count = total_count
        print("filter_cond が空の場合、全件のレコード数:", filtered_count)

    # 4. フィルタ後DataFrameの概算サイズ(bytes)を計算
    #    テーブル全体サイズに対するフィルタ件数の比率で概算サイズを求める
    estimated_filtered_size = table_size_bytes * (filtered_count / total_count)
    print("フィルタ後DataFrameの概算サイズ (bytes):", estimated_filtered_size)

    # 5. 希望する1パーティションのサイズをバイト単位に変換
    desired_partition_size = desired_partition_size_mb * 1024 * 1024

    # 6. オーバーヘッドを考慮した概算サイズから必要なパーティション数を計算
    num_partitions = math.ceil((estimated_filtered_size * overhead_factor) / desired_partition_size)
    print("再分割するパーティション数:", num_partitions)
    
    # 7. 推定1パーティションあたりのサイズ (MB)
    estimated_partition_mb = (estimated_filtered_size * overhead_factor) / num_partitions / (1024 * 1024)
    print(f"推定1パーティションあたりのサイズ: {estimated_partition_mb:.2f} MB")

    # 8. DataFrame の再パーティション化と書き込み
    filtered_df_repartitioned = filtered_df.repartition(num_partitions)
    writer = filtered_df_repartitioned.write.format("delta").mode("overwrite")
    
    if filter_cond:
        print("Selective overwrite 開始")
        writer = writer.option("replaceWhere", filter_cond)
        writer.saveAsTable(table_name)
        print("Selective overwrite 終了")
    else:
        print("Overwrite 開始")
        writer.saveAsTable(table_name)
        print("Overwrite 終了")
# 使用例
optimize_for_micro_partitions(
    table_name="iceberg_snowflake_share_test.default.customer_delta",
    filter_cond=None,
)

image.png

下記の出力結果のlocation列からストレージのディレクトリを確認

%sql
DESCRIBE DETAIL iceberg_snowflake_share_test.default.customer_delta;

image.png

30 MB だった parquet ファイルが 13 MB 程度になっていてことを確認します。

image.png

失敗した方法

delta.targetFileSizeプロパティを指定する方法

Delta Lake テーブルのdelta.targetFileSizeプロパティでファイルサイズを調整できないかと検討しましたが、ドキュメントにてベストエフォートと書かれてこともあり想定のファイルサイズにはなりませんでした。

指定されたサイズのファイルを生成するためのベストエフォートの試みが行われます。

出所:データ ファイル サイズを制御するように Delta Lake を構成する - Azure Databricks | Microsoft Learn

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?