0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

3つのコマンドでストリーミング対応 Iceberg テーブルを作成する

Last updated at Posted at 2025-07-25

3つのコマンドでストリーミング対応 Iceberg テーブルを作成する

Apache Iceberg はデータ・レイクハウス向けのオープンテーブル形式で、信頼性やタイムトラベルといった機能が特徴です。しかし、多くのチームでは Iceberg カタログの初期セットアップが障壁になることがあります。

従来は、別途コンポーネント(JDBC カタログ用の PostgreSQL データベース、AWS Glue カタログ、Nessie などの REST サービス)をプロビジョニング・管理する必要があり、このセットアップが運用の手間となり、データパイプライン構築が遅れる要因になっていました。

この課題を解決するため、RisingWave は現在 Hosted Iceberg Catalog を提供しています。この機能により、セットアップの複雑さを単一の構成パラメータに削減できます。

本ガイドでは、3つのステップでストリーミング対応の Iceberg テーブルを作成する方法を説明します。

Prerequisites

開始にあたり、以下の2点が必要です:

  1. 動作中の RisingWave インスタンス(RisingWave CloudDocker Compose セットアップ で簡単に開始可能)
  2. オブジェクトストア(S3, GCS, Azure など)へのアクセスおよび必要な認証情報

Step 1: Hosted Catalog を使った Connection の作成

まず、RisingWave に Iceberg データの保存先を指示し、「カタログは自分で管理してほしい」と伝える必要があります。それには Iceberg Connection を作成します。

ポイントはここ:hosted_catalog = true。この一行により、RisingWave は内部のメタストアを完全な Iceberg カタログとして扱います。外部データベースの URI や認証情報の指定は不要です。

CREATE CONNECTION my_iceberg_connection
WITH (
    type = 'iceberg',
    warehouse.path = 's3://your-bucket/iceberg-warehouse',
    s3.access.key = 'your-access-key',
    s3.secret.key = 'your-secret-key',
    s3.endpoint = 'your-s3-endpoint',
    s3.region = '<your-region>', -- 例えば 'us-east-1'
    s3.path.style.access = 'true',
    hosted_catalog = true  -- ここが魔法の一行です!
);

この1つのコマンドで、外部に出ることなく JDBC 互換の Iceberg カタログが RisingWave 内でプロビジョニングされます。

S3 をオブジェクトストレージとして例示していますが、GCS、Azure、あるいは S3 互換のストレージも利用可能です。詳細は RisingWave ドキュメントの Object storage configuration を参照してください。

Step 2: Iceberg テーブルを作成

Connection を用意したら、Iceberg 形式でデータを書き込むテーブルを作成しましょう。

まずセッションに先ほどの Connection を設定します。その後、標準の CREATE TABLE コマンドに ENGINE = iceberg を追加することで、Iceberg 形式による保存が指定できます。

-- セッションに Connection を設定
SET iceberg_engine_connection = 'public.my_iceberg_connection';
-- テーブルを作成
CREATE TABLE machine_sensors (
    sensor_id INT PRIMARY KEY,
    temperature DOUBLE,
    reading_ts TIMESTAMP
) 
WITH (commit_checkpoint_interval = 1)
ENGINE = iceberg;

これで Iceberg 対応テーブルがストリーミングデータの受け皿として準備されました。


Step 3: データをストリームし、クエリする

通常の RisingWave テーブルと同様に、INSERT 文や Kafka トピックからの CREATE SINK を使ってデータを投入できます。

INSERT INTO machine_sensors 
VALUES 
    (101, 25.5, NOW()),
    (102, 70.2, NOW());

その後、以下のようにクエリするとデータを確認できます:

SELECT * FROM machine_sensors;

--  sensor_id | temperature |         reading_ts
-- -----------+-------------+----------------------------
--        101 |        25.5 | 2024-05-21 10:30:00.123...
--        102 |        70.2 | 2024-05-21 10:30:01.456...

これにより、データは S3 バケット上に標準 Iceberg フォーマットで永続化されます。


なぜこれが重要か

わずか数分、3つのシンプルなコマンドで、ストリーミングパイプラインがオープンなレイクハウステーブルに書き込まれる環境が整いました。

やらずに済んだこと:

  • PostgreSQL や MySQL の別コンポーネントをプロビジョニングする必要がなかった
  • それらの DB 用のネットワーク設定/ユーザー資格情報の設定が不要だった
  • AWS Glue カタログ用の IAM 権限の管理が不要だった
  • Nessie のような REST カタログサービスを展開/運用する必要もなかった

hosted_catalog = true フラグによって、カタログのセットアップを一気に簡素化し、ストリーミングレイクハウスへの参入障壁を大きく下げています。開発者はインフラ管理ではなく、データアプリケーションの構築に集中できます。

また、この内部カタログは標準 JDBC カタログの実装なので、Spark や Trino, Flink といった他ツールによるアクセスも可能。ベンダーロックインすることなく、相互運用性が保証されます

Ready to try it for yourself? Check out our official Iceberg documentation or join our community on Slack to ask questions and share what you build.

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?