2
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

AWS AnalyticsAdvent Calendar 2024

Day 7

AWS上でIcebergテーブルを作成する方法についての検討メモ

Last updated at Posted at 2024-12-07

AWS上でIcebergテーブルを作成する方法は色々あるが、ツールによってIcebergの各機能への対応状況が異なっている。

この記事はGlue Data Catalog上にIcebergテーブルの作成やDDL操作ができる主要なソフトウェア・フレームワーク・サービスとして以下を検討したときのメモ。

  • Spark (Glue/EMR/Athena)
  • Trino (Athena/EMR)
  • PyIceberg
  • CDK

やりたいこと

結論から言うと今回はPyIcebergを採用した。要件は以下の通り。

  1. ストリーミングデータを生成するプロデューサーアプリケーションがあり、メッセージングキューを介してコンシューマーからIcebergテーブルにデータをINSERTしたい(パイプラインについてはここでは触れない)
  2. Icebergテーブルをコードで管理したい
    1. 本番・開発環境に合わせてS3のバケット名を動的に指定したい
    2. プロデューサー側のアプリケーションで生成するデータとターゲットテーブルのスキーマを合わせたい。今回はモノレポジトリなので、プロデューサーとコンシューマーでテーブル名や列名をプログラムから再利用な形で定義したい
    3. Schema evolutionをプログラムから行いたい。できればIaCで、宣言的に
  3. Hidden Partitioningを使ってTimestamp型から日付単位でパーティションを作りたい
  4. データ量はそこまで大きくないため(数GB/day)、できるだけ安く済ませたい

Spark

Icebergを触るならSparkは選択肢として外せない。テーブルの作成や更新は基本的にSpark SQLで記述する。SparkでETL処理を記述している場合はスクリプトの中からCREATE TABLECTASすることでテーブルを作成できる。サクッと試すならAthena for SparkのNotebookが楽だった。

CREATE TABLE IF NOT EXISTS {CATALOG_NAME}.{DB_NAME}.{TABLE_NAME}_withpartitions (
    c_customer_sk             int,
    c_customer_id             string,
    c_first_name              string,
    c_last_name               string,
    c_birth_country           string,
    c_email_address           string)
USING iceberg
PARTITIONED BY (c_birth_country)
OPTIONS ('format-version'='2')

利用方法

  • Glue, EMR, AthenaのSpark Notebookなど

良い点

  • SparkとIcebergの組み合わせはよく使われているだけあってドキュメントや記事が見つけやすい
  • IcebergのSpecに沿った各種のテーブルプロパティ、プロシージャの実装が充実している

気になった点

  • DDLでテーブル名やカラム名を動的に指定したい場合はテキストベースでSQLを組み立てる必要がある
  • ETLでSparkを使用する予定がなければ他の選択肢にも目を向けたい

Trino

TrinoもIcebergのサポートが充実しており、SQLから簡単にテーブルを操作できる。AWSのマネージドサービスとしてはAthenaが最も手軽。

CREATE TABLE example_table (
    c1 INTEGER,
    c2 DATE,
    c3 DOUBLE
)
WITH (
    format = 'PARQUET',
    partitioning = ARRAY['c1', 'c2'],
    sorted_by = ARRAY['c3'],
    location = 's3://my-bucket/a/path/'
);

利用方法

  • Athena, EMR on EC2など

良い点

  • Athenaの場合はインフラのプロビジョニングなしですぐに利用可能
  • EMRの場合はOSSと同じ機能が利用可能

気になった点

  • DDLでテーブル名やカラム名を動的に指定したい場合はテキストベースでSQLを組み立てる必要がある
  • Athenaから設定できるTable propertiesIcebergのSpec上のTable propertiesと比べると制約がある。テーブルプロパティはトランザクション分離レベルやコミット時の挙動、MoR/CoWなど実際運用を開始してからチューニングのために変更する可能性があるので、変更できる手段は持っておきたい
  • OSSのTrinoで設定できるTable propertiesの方が種類が多く、extra_propertiesという項目から好きなテーブルプロパティが設定できる(Trinoからは使用されない)ので、EMR上でTrinoを動かすのであれば制約はない

PyIceberg

Python製の軽量ライブラリ。Pythonのオブジェクトとしてスキーマの定義やテーブルの作成・管理が可能。

# 一時クレデンシャルを渡してデータカタログを操作する例
catalog = load_catalog(
    "glue",
    **{
        "type": "glue",
        "client.region": REGION,
        "client.access-key-id": AWS_ACCESS_KEY_ID,
        "client.secret-access-key": AWS_SECRET_ACCESS_KEY,
        "client.session-token": AWS_SESSION_TOKEN,
    }
)

schema = Schema(
    NestedField(field_id=1, name="cloudCover", field_type=DoubleType(), required=False),
    NestedField(field_id=2, name="dayOfWeek", field_type=StringType(), required=False),
    NestedField(field_id=3, name="dayOrNight", field_type=StringType(), required=False),
    NestedField(field_id=4, name="expirationTimeUtc", field_type=TimestampType(), required=False)
)

partition_spec = PartitionSpec(
    PartitionField(field_id=4, source_id=4, transform=DayTransform(), name="DayOfexpirationTimeUtc"),
)

catalog.create_table(
    identifier="my_namespace.my_table",
    location="s3://my-bucket/my_table",
    schema=schema,
    partition_spec=partition_spec,
)

良い点

  • Javaのランタイムが不要。Pythonのランタイムさえあれば良いので別の環境に持って行っても動かしやすい
  • IcebergのSpecに沿った実装が充実している。NestedFieldやDecimalType, required=True, Hidden partitioning, PartitionField, SortOrder, identifier_field_idsなども指定できる
  • Schema evolutionにも対応
    • PyIcebergでスクリプトを書いておけば、本番と開発環境に対して同じSchema evolution操作を適用可能
  • 存在するテーブルの情報をPythonオブジェクトとして取得可能
    • 本番と開発環境でスキーマに差異がないかなどのチェックがしやすい
  • Icebergテーブルへデータの書き込みも可能(本記事の主題ではないので深掘りはしない)

気になった点

  • 機能面については特に気になる点はなかった
  • 強いて言えばライブラリが発展途上なためか古い仕組みと新しい仕組みが混在している箇所があってDeprecationWarningが出たり(TableIdentifierなど)、引数に何を渡せば良いかわかりづらい箇所があったので(GlueCatalogの__init__()のpropertiesなどなど)、コードを辿ったりIssueを検索できる必要がある

CDK

CDKのglue.CfnTableを使う方法。

CDK (typescript)
this.table = new glue.CfnTable(this, "myTable", {
        databaseName: this.databaseName,
        catalogId: this.account,
        tableInput: {
            name: "mytable",
            description: description,
            storageDescriptor: {
                columns: columns,  // 
                location: "s3://bucketname/mytable",
            },
            tableType: 'EXTERNAL_TABLE'
        },
        openTableFormatInput: {
            icebergInput: {
                metadataOperation: "CREATE",
                version: "2",
            }
        }

良い点

  • CDKであればテーブル名やカラム名をプログラムから動的に設定できる

気になった点

  • 現状CDKのL2 ConstructにIcebergテーブルがないため、L1 Constructを使う必要がある
  • CDKでテーブル作成後に他の方法でALTER TABLEで足りない設定を追加する必要があるので、最初から他の方法でCREATE TABLEした方が楽かもしれない
  • 主題から若干ずれるが、Kinesis Data Firehose to IcebergでターゲットのIcebergテーブルを直接指定する場合はDelivery Streamがないとデプロイ時にエラーを吐くため、CDK上の依存先としてダミーのIcebergテーブルを作成せざるを得ないケースはあった

まとめ

AWS上でIcebergテーブルを作成する方法について紹介した。既にSparkやTrinoで処理を書いているのであればCREATE TABLEでIcebergテーブルを作るのが一番手に馴染みがあるだろう。一方、軽量・プログラマブルなテーブル管理手段としてPyIcebergも実用に足るレベルかと思う。今回は試せなかったが、そのうちIceberg Rustも検証したい。

Icebergは急速に発展しているエコシステムであり、コミュニティや各ベンダーから次々と新しい機能が出てきている。現状機能が足りない部分も今後アップデートされていくことに期待したい。

参考

2
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?