AWS上でIcebergテーブルを作成する方法は色々あるが、ツールによってIcebergの各機能への対応状況が異なっている。
この記事はGlue Data Catalog上にIcebergテーブルの作成やDDL操作ができる主要なソフトウェア・フレームワーク・サービスとして以下を検討したときのメモ。
- Spark (Glue/EMR/Athena)
- Trino (Athena/EMR)
- PyIceberg
- CDK
やりたいこと
結論から言うと今回はPyIcebergを採用した。要件は以下の通り。
- ストリーミングデータを生成するプロデューサーアプリケーションがあり、メッセージングキューを介してコンシューマーからIcebergテーブルにデータをINSERTしたい(パイプラインについてはここでは触れない)
- Icebergテーブルをコードで管理したい
- 本番・開発環境に合わせてS3のバケット名を動的に指定したい
- プロデューサー側のアプリケーションで生成するデータとターゲットテーブルのスキーマを合わせたい。今回はモノレポジトリなので、プロデューサーとコンシューマーでテーブル名や列名をプログラムから再利用な形で定義したい
- Schema evolutionをプログラムから行いたい。できればIaCで、宣言的に
- Hidden Partitioningを使ってTimestamp型から日付単位でパーティションを作りたい
- データ量はそこまで大きくないため(数GB/day)、できるだけ安く済ませたい
Spark
Icebergを触るならSparkは選択肢として外せない。テーブルの作成や更新は基本的にSpark SQLで記述する。SparkでETL処理を記述している場合はスクリプトの中からCREATE TABLE
やCTAS
することでテーブルを作成できる。サクッと試すならAthena for SparkのNotebookが楽だった。
CREATE TABLE IF NOT EXISTS {CATALOG_NAME}.{DB_NAME}.{TABLE_NAME}_withpartitions (
c_customer_sk int,
c_customer_id string,
c_first_name string,
c_last_name string,
c_birth_country string,
c_email_address string)
USING iceberg
PARTITIONED BY (c_birth_country)
OPTIONS ('format-version'='2')
利用方法
- Glue, EMR, AthenaのSpark Notebookなど
良い点
- SparkとIcebergの組み合わせはよく使われているだけあってドキュメントや記事が見つけやすい
- IcebergのSpecに沿った各種のテーブルプロパティ、プロシージャの実装が充実している
気になった点
- DDLでテーブル名やカラム名を動的に指定したい場合はテキストベースでSQLを組み立てる必要がある
- ETLでSparkを使用する予定がなければ他の選択肢にも目を向けたい
Trino
TrinoもIcebergのサポートが充実しており、SQLから簡単にテーブルを操作できる。AWSのマネージドサービスとしてはAthenaが最も手軽。
CREATE TABLE example_table (
c1 INTEGER,
c2 DATE,
c3 DOUBLE
)
WITH (
format = 'PARQUET',
partitioning = ARRAY['c1', 'c2'],
sorted_by = ARRAY['c3'],
location = 's3://my-bucket/a/path/'
);
利用方法
- Athena, EMR on EC2など
良い点
- Athenaの場合はインフラのプロビジョニングなしですぐに利用可能
- EMRの場合はOSSと同じ機能が利用可能
気になった点
- DDLでテーブル名やカラム名を動的に指定したい場合はテキストベースでSQLを組み立てる必要がある
- Athenaから設定できるTable propertiesはIcebergのSpec上のTable propertiesと比べると制約がある。テーブルプロパティはトランザクション分離レベルやコミット時の挙動、MoR/CoWなど実際運用を開始してからチューニングのために変更する可能性があるので、変更できる手段は持っておきたい
-
OSSのTrinoで設定できるTable propertiesの方が種類が多く、
extra_properties
という項目から好きなテーブルプロパティが設定できる(Trinoからは使用されない)ので、EMR上でTrinoを動かすのであれば制約はない
PyIceberg
Python製の軽量ライブラリ。Pythonのオブジェクトとしてスキーマの定義やテーブルの作成・管理が可能。
# 一時クレデンシャルを渡してデータカタログを操作する例
catalog = load_catalog(
"glue",
**{
"type": "glue",
"client.region": REGION,
"client.access-key-id": AWS_ACCESS_KEY_ID,
"client.secret-access-key": AWS_SECRET_ACCESS_KEY,
"client.session-token": AWS_SESSION_TOKEN,
}
)
schema = Schema(
NestedField(field_id=1, name="cloudCover", field_type=DoubleType(), required=False),
NestedField(field_id=2, name="dayOfWeek", field_type=StringType(), required=False),
NestedField(field_id=3, name="dayOrNight", field_type=StringType(), required=False),
NestedField(field_id=4, name="expirationTimeUtc", field_type=TimestampType(), required=False)
)
partition_spec = PartitionSpec(
PartitionField(field_id=4, source_id=4, transform=DayTransform(), name="DayOfexpirationTimeUtc"),
)
catalog.create_table(
identifier="my_namespace.my_table",
location="s3://my-bucket/my_table",
schema=schema,
partition_spec=partition_spec,
)
良い点
- Javaのランタイムが不要。Pythonのランタイムさえあれば良いので別の環境に持って行っても動かしやすい
- IcebergのSpecに沿った実装が充実している。NestedFieldやDecimalType, required=True, Hidden partitioning, PartitionField, SortOrder, identifier_field_idsなども指定できる
- Schema evolutionにも対応
- PyIcebergでスクリプトを書いておけば、本番と開発環境に対して同じSchema evolution操作を適用可能
- 存在するテーブルの情報をPythonオブジェクトとして取得可能
- 本番と開発環境でスキーマに差異がないかなどのチェックがしやすい
- Icebergテーブルへデータの書き込みも可能(本記事の主題ではないので深掘りはしない)
気になった点
- 機能面については特に気になる点はなかった
- 強いて言えばライブラリが発展途上なためか古い仕組みと新しい仕組みが混在している箇所があってDeprecationWarningが出たり(TableIdentifierなど)、引数に何を渡せば良いかわかりづらい箇所があったので(GlueCatalogの__init__()のpropertiesなどなど)、コードを辿ったりIssueを検索できる必要がある
CDK
CDKのglue.CfnTable
を使う方法。
this.table = new glue.CfnTable(this, "myTable", {
databaseName: this.databaseName,
catalogId: this.account,
tableInput: {
name: "mytable",
description: description,
storageDescriptor: {
columns: columns, //
location: "s3://bucketname/mytable",
},
tableType: 'EXTERNAL_TABLE'
},
openTableFormatInput: {
icebergInput: {
metadataOperation: "CREATE",
version: "2",
}
}
良い点
- CDKであればテーブル名やカラム名をプログラムから動的に設定できる
気になった点
- 現状CDKのL2 ConstructにIcebergテーブルがないため、L1 Constructを使う必要がある
- glue.CfnTableの引数のopenTableFormatInput.IcebergInputで指定
- metadataOperationは
CREATE
、つまりテーブルの作成のみ対応。更新はできない - Partitionや各種テーブルプロパティが殆ど設定できない
- GlueのCreateTable APIやCloudFormationのaws_cdk.aws_glue.CfnTableも同じ制約がある
- metadataOperationは
- glue.CfnTableの引数のopenTableFormatInput.IcebergInputで指定
- CDKでテーブル作成後に他の方法で
ALTER TABLE
で足りない設定を追加する必要があるので、最初から他の方法でCREATE TABLE
した方が楽かもしれない - 主題から若干ずれるが、Kinesis Data Firehose to IcebergでターゲットのIcebergテーブルを直接指定する場合はDelivery Streamがないとデプロイ時にエラーを吐くため、CDK上の依存先としてダミーのIcebergテーブルを作成せざるを得ないケースはあった
まとめ
AWS上でIcebergテーブルを作成する方法について紹介した。既にSparkやTrinoで処理を書いているのであればCREATE TABLE
でIcebergテーブルを作るのが一番手に馴染みがあるだろう。一方、軽量・プログラマブルなテーブル管理手段としてPyIcebergも実用に足るレベルかと思う。今回は試せなかったが、そのうちIceberg Rustも検証したい。
Icebergは急速に発展しているエコシステムであり、コミュニティや各ベンダーから次々と新しい機能が出てきている。現状機能が足りない部分も今後アップデートされていくことに期待したい。
参考