RisingWave v2.2 のリリースを発表できることを嬉しく思います。本バージョンでは、ストリーム処理の体験を向上させる強力な新機能と改善が含まれています。このリリースには、再利用可能な接続、Webhook ソースコネクタ、新しい SQL 関数などが追加されました。
本バージョンの注目すべき機能について詳しく見ていきましょう。v2.2 の全アップデートの詳細については、リリースノート をご覧ください。
RisingWaveについて詳しく知りたい方は、こちらをご覧ください:What is RisingWave?
Kafka コネクタおよびスキーマレジストリの再利用可能な接続
v2.2 では、AWS PrivateLink に使用されていた旧接続方式が非推奨になりました。これまでの方式では、RisingWave クラスターとは異なる VPC 内にある Kafka サービスからメッセージを取得したり、メッセージを配信したりする場合に接続を設定する必要がありました。
新しい接続メカニズムでは、Kafka のソース、シンク、スキーマレジストリ間でブローカーアドレスやセキュリティプロトコルの詳細などのコネクタプロパティを再利用できるようになりました。この方式では、クラスターごとに一度だけ認証情報を指定すれば済むため、複数のソースのセットアップが簡素化され、機密情報へのアクセスを最小限に抑えることでセキュリティが強化されます。
接続を作成するには、CREATE CONNECTION
コマンドを使用し、必要なパラメータを指定します。
CREATE CONNECTION kafka_conn1 WITH (
type = 'kafka',
properties.bootstrap.server = 'localhost:9092'
);
必須パラメータは type
と properties.bootstrap.server
のみですが、SSL/SASL 認証、プライベートリンク接続、AWS 認証に関する追加パラメータもサポートされています。
この接続を使用して Kafka ソース、テーブル、またはシンクを作成できます。同じクラスター内の異なるトピックからデータを取り込む Kafka ソースを作成する場合、この方法を利用することで設定が大幅に簡素化されます。
CREATE SOURCE kafka1 (
id int,
name varchar,
email varchar,
age int
) WITH (
connector = 'kafka',
connection = 'kafka_conn1',
topic = 'topic1',
scan.startup.mode='latest'
) FORMAT PLAIN ENCODE JSON;
さらに、CREATE CONNECTION
コマンドを使用してスキーマレジストリにも接続できます。
CREATE CONNECTION schema_1 WITH (
type = 'schema_registry',
schema.registry = 'http://...',
schema.registry.username = 'superuser',
schema.registry.password = 'pass123'
);
詳細情報:
クエリ最適化のための EXPLAIN FORMAT
RisingWave の EXPLAIN
コマンドは、SELECT
ステートメントの実行プランを表示します。特に複雑なクエリでは、この機能を使用することで、クエリの最適化方法を分析できます。どの処理が最も多くのリソースを消費しているのかを簡単に特定でき、可能であれば回避策を検討できます。また、クエリが期待どおりに動作しない場合、EXPLAIN
コマンドを使えばクエリ設計の問題点を特定できます。
今回のアップデートでは、新しい FORMAT
オプションを導入しました。これにより、実行プランを JSON、TEXT、XML、YAML などの異なる形式で表示できるようになります。これにより、用途に応じた最適なフォーマットを選択できます。
例えば、以下のクエリに対して EXPLAIN
コマンドを FORMAT
オプションなしで使用すると、フォーマットされていない出力が表示されます。
CREATE TABLE t(v1 int);
EXPLAIN CREATE MATERIALIZED VIEW m1 AS SELECT * FROM t;
---
StreamMaterialize { columns: [v1, t._row_id(hidden)], stream_key: [t._row_id], pk_columns: [t._row_id], pk_conflict: NoCheck }
└─StreamTableScan { table: t, columns: [v1, _row_id] }
FORMAT
オプションを使用すると、クエリ実行プランを JSON 形式で表示できます。
EXPLAIN(physical, FORMAT json) CREATE MATERIALIZED VIEW m1 AS SELECT * FROM t;
---
{
"name": "StreamMaterialize",
"fields": {
"columns": [
"v1",
"t._row_id(hidden)"
],
"pk_columns": [
"t._row_id"
],
"pk_conflict": "NoCheck",
"stream_key": [
"t._row_id"
]
},
"children": [
{
"name": "StreamTableScan",
"fields": {
"columns": [
"v1",
"_row_id"
],
"table": "t"
},
"children": []
}
]
}
詳細情報:
ALTER...SWAP WITH...
を使用したスムーズな更新
ALTER ... SWAP WITH...
コマンドを使用すると、オブジェクトの定義をシームレスに交換でき、依存オブジェクトが影響を受けることなく更新やメンテナンスを実行できます。この機能は、オブジェクトのバージョンをダウンタイムなしで切り替える場合に特に有効です。
例えば、以下のような定義を持つマテリアライズドビュー mv_old
があるとします。
CREATE MATERIALIZED VIEW mv_old AS
SELECT
product_category,
SUM(order_amount) AS total_sales
FROM
orders
GROUP BY
product_category;
この total_sales
の計算方法を変更したいが、mv_old
に依存する他のマテリアライズドビューやシンクがある場合、通常の DROP
と CREATE
ではダウンタイムが発生する可能性があります。
このような場合、新しいマテリアライズドビュー mv_new
を作成し、ALTER...SWAP WITH...
を使用して mv_old
を mv_new
に置き換えれば、ダウンタイムなしで更新が可能です。
CREATE MATERIALIZED VIEW mv_new AS
SELECT
product_category,
SUM(order_amount - discount_amount) AS total_sales
FROM
orders
GROUP BY
product_category;
ALTER MATERIALIZED VIEW mv_old SWAP WITH mv_new;
ALTER...SWAP WITH...
コマンドは、以下のデータベースオブジェクトに対してサポートされています。
- テーブル
- マテリアライズドビュー
- 通常のビュー
- ソース
- シンク
- サブスクリプション
詳細情報:
Webhook からのデータ取り込み
Webhook ソースから RisingWave へデータを取り込む機能は、Premium Edition の一部として提供されます。RisingWave Premium の詳細については、Everything You Want to Know about RisingWave Premium をご覧ください。
RisingWave は Webhook のデスティネーション(受信先)として機能し、外部の HTTP リクエストをリアルタイムで直接取り込み、処理できるようになりました。新しい Webhook コネクタを使用することで、中間の Kafka クラスターを維持する必要がなくなり、データパイプラインにおける複雑さが軽減されます。これにより、リアルタイム処理や自動化が重要なアプリケーションを簡単に構築できます。
Webhook は、以下のような様々なシナリオで活用できます。
- アラートや通知の送信
- データの同期
- CI/CD パイプラインのトリガー
RisingWave の Webhook コネクタを使用すると、以下のソースからイベントを取り込むことができます。
- GitHub
- Segment
- HubSpot
- Amazon EventBridge
- RudderStack
CREATE TABLE
コマンドを使用すれば、Webhook リクエストを受け付けるテーブルをすぐに作成できます。以下の例では、SECRET
を使用して受信リクエストを検証しています。
CREATE SECRET test_secret WITH (backend = 'meta') AS 'secret_value';
CREATE TABLE wbhtable (
data JSONB
) WITH (
connector = 'webhook'
) VALIDATE SECRET test_secret AS secure_compare(
headers->>'{header of signature}',
{signature generation expressions}
);
詳細情報:
MySQL テーブル値関数(TVF)
このバージョンでは、新しいテーブル値関数(TVF)mysql_query
が導入されました。この関数を使用すると、CDC ソースを作成せずに MySQL テーブルからデータを必要に応じて取得できます。
CDC ソースのセットアップが不要な場合、mysql_query
テーブル値関数(TVF)は軽量な代替手段となります。この関数は、静的または更新頻度の低い MySQL テーブルをクエリするのに特に有用で、CDC ソースと比較して計算コストを削減できます。
例えば、MySQL に以下の users
テーブルがあるとします。
first_name | last_name | age | city
-----------+-----------+-----+------
Aaron | Jones | 24 | NYC
Shelly | Doe | 36 | SF
Taylor | Smith | 52 | NYC
mysql_query
関数を使用すると、テーブルからデータを取得できます。また、関数の戻り値を CREATE TABLE
コマンドでテーブルとして作成することも可能です。
SELECT * FROM mysql_query(
'localhost',
'3336',
'superuser',
'password123',
'mydb',
'SELECT * FROM users WHERE age >= 25;'
);
---
first_name | last_name | age | city
-----------+-----------+-----+------
Shelly | Doe | 36 | SF
Taylor | Smith | 52 | NYC
詳細情報:
新しい Rust 製 PostgreSQL シンク
RisingWave では、新たに Rust 製のネイティブ PostgreSQL シンクコネクタが導入され、従来の JDBC API への依存が不要になりました。この新しいコネクタにより、パフォーマンスが向上し、PostgreSQL との接続をより柔軟に制御できるようになりました。新コネクタは、upsert
と append-only
の両方のシンクタイプをサポートしています。
Rust 製シンクコネクタを JDBC シンクコネクタの代わりに使用するには、streaming.developer
設定 stream_switch_jdbc_pg_to_native
を true
に設定します。
その後、通常どおり CREATE SINK
コマンドを使用して PostgreSQL シンクを作成できます。
CREATE SINK rust_pg_sink FROM sink_content WITH (
connector = 'postgres',
host = 'localhost',
port = 4566,
user = 'superuser',
password = 'pass123',
database = 'dev',
table = 'sink_table',
type = 'append_only'
);
詳細情報:
まとめ
以上が v2.2 の主要な新機能です。ソース・シンクコネクタの更新を含むすべてのアップデートについては、リリースノート をご覧ください。
来月も新機能の追加と改善を予定していますので、ぜひご期待ください。また、RisingWave GitHub リポジトリ で最新の開発状況や今後のリリース予定をチェックしてください。