0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Apche Iceberg テーブルの複製が可能となる rewrite_table_path プロシージャの紹介

Last updated at Posted at 2025-03-11

概要

Apache Iceberg で Spark を利用するときに、rewrite_table_path プロシージャを試した結果を共有します。これは、DR 対策の一環として別のディレクトリに Apache Iceberg のデータを複製したい場合などに便利です。rewrite_table_path プロシージャは Apache Iceberg 1.8.0 でリリースされました。

image.png

出所:Procedures - Apache Iceberg™

image.png

出所:1.8.0 release - Apache Iceberg™

Apache Iceberg ではメタデータファイルに絶対パスが記載されているため、ディレクトリごとデータをコピーしただけではテーブルとして認識されません。バックアップを別ストレージに移動するといったケースでは不便な仕様でした。なお、メタデータファイルの内容を確認する方法は下記の記事で紹介しています。

image.png

出所:Getting started with PyIceberg と Iceberg で作成されるファイル郡の確認を Google Colab でやってみた #Python - Qiita

rewrite_table_path プロシージャを使うと、メタデータディレクトリ配下にパスを書き換えたメタデータファイルを生成し、あわせて移動元と移動先のファイルリストを作成します。その後、このファイルリストに基づいてコピーや移動を行うことで、Apache Iceberg テーブルの複製を可能にします。

image.png

image.png

検証コードと結果

検証内容

Google Colab 上で複製元の test_table テーブルを作成し、rewrite_table_path プロシージャを実行します。プロシージャが出力するファイルリストに従ってファイルを移動し、テーブルを複製できることを確認します。

本記事で紹介する方法は、あくまで動作確認が目的です。

Spark のバージョンを確認

!pyspark --version

image.png

SparkSession の定義

from pyspark.sql import SparkSession

spark = (SparkSession.builder
    .appName("IcebergExample")
    .config("spark.jars.packages", "org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.8.0")
    .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
    .config("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkCatalog")
    .config("spark.sql.catalog.spark_catalog.type", "hadoop")
    .config("spark.sql.catalog.spark_catalog.warehouse", "/content/iceberg")
    .getOrCreate()
)
spark

image.png

複製元のテーブルを作成

spark.sql("DROP TABLE IF EXISTS original.test_table")
spark.sql(f"""
    CREATE TABLE original.test_table (
        id BIGINT,
        data STRING
    )
    USING iceberg
""")
spark.sql("INSERT INTO original.test_table VALUES (1, 'a'), (2, 'b')")
spark.sql("SELECT * FROM original.test_table").show()
spark.sql("DESC EXTENDED original.test_table").show(truncate=False)

image.png

rewrite_table_path プロシージャの実行

# rewrite_table_path プロシージャの実行
return_df = spark.sql(f"""
CALL spark_catalog.system.rewrite_table_path(
    table => 'original.test_table', 
    source_prefix => "/content/iceberg/original/test_table",
    target_prefix => "/content/iceberg/cloning/rewrite_table"
)
""")  
return_df.show(truncate=False)

image.png

metadata ディレクトリ配下に、copy-table-staging で始まるディレクトリと、複製元テーブルのメタデータファイルと同名のファイルが作成されています。

image.png

また、file-list ディレクトリ配下には CSV ファイルが生成されているのを確認できます。この CSV ファイル(ヘッダーなし)には、移動元ディレクトリと移動先ディレクトリのパスがペアで記載されています。このファイルをもとに実際のデータを移動させることで、Iceberg テーブルを複製できます。

image.png

/content/iceberg/original/test_table/data/00000-0-e6e91fc2-1546-476e-8b14-ab56d497ed10-0-00001.parquet /content/iceberg/cloning/rewrite_table/data/00000-0-e6e91fc2-1546-476e-8b14-ab56d497ed10-0-00001.parquet
/content/iceberg/original/test_table/metadata/copy-table-staging-334b1c32-0808-4c3a-8021-b27676cf4971/v1.metadata.json /content/iceberg/cloning/rewrite_table/metadata/v1.metadata.json
/content/iceberg/original/test_table/metadata/copy-table-staging-334b1c32-0808-4c3a-8021-b27676cf4971/v2.metadata.json /content/iceberg/cloning/rewrite_table/metadata/v2.metadata.json
/content/iceberg/original/test_table/metadata/copy-table-staging-334b1c32-0808-4c3a-8021-b27676cf4971/snap-8317511617499303009-1-cb7dc205-bc3d-4744-8dbd-dda7685fac32.avro /content/iceberg/cloning/rewrite_table/metadata/snap-8317511617499303009-1-cb7dc205-bc3d-4744-8dbd-dda7685fac32.avro
/content/iceberg/original/test_table/metadata/copy-table-staging-334b1c32-0808-4c3a-8021-b27676cf4971/cb7dc205-bc3d-4744-8dbd-dda7685fac32-m0.avro /content/iceberg/cloning/rewrite_table/metadata/cb7dc205-bc3d-4744-8dbd-dda7685fac32-m0.avro
/content/iceberg/original/test_table/data/00001-1-e6e91fc2-1546-476e-8b14-ab56d497ed10-0-00001.parquet /content/iceberg/cloning/rewrite_table/data/00001-1-e6e91fc2-1546-476e-8b14-ab56d497ed10-0-00001.parquet

ファイルリストのデータを取得

file_list = return_df.collect()[0].asDict()
file_location_df = spark.read.csv(file_list["file_list_location"], header=False)
print("-- original")
file_location_df.show(truncate=False)

print("-- file_location_df_renamed")
file_location_df_renamed = file_location_df.withColumnsRenamed({"_c0":"src", "_c1":"dist"})
file_location_df_renamed.show(truncate=False)

image.png

ファイルリストのデータに基づきファイルを移動

import os
import shutil

move_list = [row.asDict() for row in file_location_df_renamed.collect()]
for item in move_list:
    src = item['src']
    dist = item['dist']
    # 宛先ディレクトリがなければ作成
    os.makedirs(os.path.dirname(dist), exist_ok=True)
    try:
        shutil.copy2(src, dist)
        print(f"Copied {src} -> {dist}")
    except Exception as e:
        print(f"Error copying {src} -> {dist}: {e}")

image.png

複製先のテーブルを確認

spark.sql("SELECT * FROM cloning.rewrite_table").show()
spark.sql("DESC EXTENDED cloning.rewrite_table").show(truncate=False)

image.png

メタデータファイル内の location を確認すると、移動先ディレクトリが正しく設定されていることがわかります。

!cat /content/iceberg/cloning/rewrite_table/metadata/v2.metadata.json

image.png

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?