0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

RisingWave v2.2の新機能:Kafka接続の再利用、WebHook連携、SQL最適化の進化

Posted at

RisingWave v2.2 のリリースを発表できることを嬉しく思います。本バージョンでは、ストリーム処理の体験を向上させる強力な新機能と改善が含まれています。このリリースには、再利用可能な接続、Webhook ソースコネクタ、新しい SQL 関数などが追加されました。

本バージョンの注目すべき機能について詳しく見ていきましょう。v2.2 の全アップデートの詳細については、リリースノート をご覧ください。

RisingWaveについて詳しく知りたい方は、こちらをご覧ください:What is RisingWave?

Kafka コネクタおよびスキーマレジストリの再利用可能な接続

v2.2 では、AWS PrivateLink に使用されていた旧接続方式が非推奨になりました。これまでの方式では、RisingWave クラスターとは異なる VPC 内にある Kafka サービスからメッセージを取得したり、メッセージを配信したりする場合に接続を設定する必要がありました。

新しい接続メカニズムでは、Kafka のソース、シンク、スキーマレジストリ間でブローカーアドレスやセキュリティプロトコルの詳細などのコネクタプロパティを再利用できるようになりました。この方式では、クラスターごとに一度だけ認証情報を指定すれば済むため、複数のソースのセットアップが簡素化され、機密情報へのアクセスを最小限に抑えることでセキュリティが強化されます。

接続を作成するには、CREATE CONNECTION コマンドを使用し、必要なパラメータを指定します。

CREATE CONNECTION kafka_conn1 WITH (
	type = 'kafka',
	properties.bootstrap.server = 'localhost:9092'
);

必須パラメータは typeproperties.bootstrap.server のみですが、SSL/SASL 認証、プライベートリンク接続、AWS 認証に関する追加パラメータもサポートされています。

この接続を使用して Kafka ソース、テーブル、またはシンクを作成できます。同じクラスター内の異なるトピックからデータを取り込む Kafka ソースを作成する場合、この方法を利用することで設定が大幅に簡素化されます。

CREATE SOURCE kafka1 (
	id int,
	name varchar,
	email varchar,
	age int
) WITH (
	connector = 'kafka',
	connection = 'kafka_conn1',
	topic = 'topic1',
	scan.startup.mode='latest'
) FORMAT PLAIN ENCODE JSON;

さらに、CREATE CONNECTION コマンドを使用してスキーマレジストリにも接続できます。

CREATE CONNECTION schema_1 WITH (
  type = 'schema_registry',
  schema.registry = 'http://...',
  schema.registry.username = 'superuser',
  schema.registry.password = 'pass123'
);

詳細情報:

クエリ最適化のための EXPLAIN FORMAT

RisingWave の EXPLAIN コマンドは、SELECT ステートメントの実行プランを表示します。特に複雑なクエリでは、この機能を使用することで、クエリの最適化方法を分析できます。どの処理が最も多くのリソースを消費しているのかを簡単に特定でき、可能であれば回避策を検討できます。また、クエリが期待どおりに動作しない場合、EXPLAIN コマンドを使えばクエリ設計の問題点を特定できます。

今回のアップデートでは、新しい FORMAT オプションを導入しました。これにより、実行プランを JSON、TEXT、XML、YAML などの異なる形式で表示できるようになります。これにより、用途に応じた最適なフォーマットを選択できます。

例えば、以下のクエリに対して EXPLAIN コマンドを FORMAT オプションなしで使用すると、フォーマットされていない出力が表示されます。

CREATE TABLE t(v1 int);

EXPLAIN CREATE MATERIALIZED VIEW m1 AS SELECT * FROM t;
---
StreamMaterialize { columns: [v1, t._row_id(hidden)], stream_key: [t._row_id], pk_columns: [t._row_id], pk_conflict: NoCheck }
└─StreamTableScan { table: t, columns: [v1, _row_id] }

FORMAT オプションを使用すると、クエリ実行プランを JSON 形式で表示できます。

EXPLAIN(physical, FORMAT json) CREATE MATERIALIZED VIEW m1 AS SELECT * FROM t;
---
  {
   "name": "StreamMaterialize",
   "fields": {
     "columns": [
       "v1",
       "t._row_id(hidden)"
     ],
     "pk_columns": [
       "t._row_id"
     ],
     "pk_conflict": "NoCheck",
     "stream_key": [
       "t._row_id"
     ]
   },
   "children": [
     {
       "name": "StreamTableScan",
       "fields": {
         "columns": [
           "v1",
           "_row_id"
         ],
         "table": "t"
       },
       "children": []
     }
   ]
 }

詳細情報:

ALTER...SWAP WITH... を使用したスムーズな更新

ALTER ... SWAP WITH... コマンドを使用すると、オブジェクトの定義をシームレスに交換でき、依存オブジェクトが影響を受けることなく更新やメンテナンスを実行できます。この機能は、オブジェクトのバージョンをダウンタイムなしで切り替える場合に特に有効です。

例えば、以下のような定義を持つマテリアライズドビュー mv_old があるとします。

CREATE MATERIALIZED VIEW mv_old AS
SELECT 
    product_category, 
    SUM(order_amount) AS total_sales
FROM 
    orders
GROUP BY 
    product_category;

この total_sales の計算方法を変更したいが、mv_old に依存する他のマテリアライズドビューやシンクがある場合、通常の DROPCREATE ではダウンタイムが発生する可能性があります。

このような場合、新しいマテリアライズドビュー mv_new を作成し、ALTER...SWAP WITH... を使用して mv_oldmv_new に置き換えれば、ダウンタイムなしで更新が可能です。

CREATE MATERIALIZED VIEW mv_new AS
SELECT 
    product_category, 
    SUM(order_amount - discount_amount) AS total_sales
FROM 
    orders
GROUP BY 
    product_category;

ALTER MATERIALIZED VIEW mv_old SWAP WITH mv_new;

ALTER...SWAP WITH... コマンドは、以下のデータベースオブジェクトに対してサポートされています。

  • テーブル
  • マテリアライズドビュー
  • 通常のビュー
  • ソース
  • シンク
  • サブスクリプション

詳細情報:

Webhook からのデータ取り込み

Webhook ソースから RisingWave へデータを取り込む機能は、Premium Edition の一部として提供されます。RisingWave Premium の詳細については、Everything You Want to Know about RisingWave Premium をご覧ください。

RisingWave は Webhook のデスティネーション(受信先)として機能し、外部の HTTP リクエストをリアルタイムで直接取り込み、処理できるようになりました。新しい Webhook コネクタを使用することで、中間の Kafka クラスターを維持する必要がなくなり、データパイプラインにおける複雑さが軽減されます。これにより、リアルタイム処理や自動化が重要なアプリケーションを簡単に構築できます。

Webhook は、以下のような様々なシナリオで活用できます。

  • アラートや通知の送信
  • データの同期
  • CI/CD パイプラインのトリガー

RisingWave の Webhook コネクタを使用すると、以下のソースからイベントを取り込むことができます。

  • GitHub
  • Segment
  • HubSpot
  • Amazon EventBridge
  • RudderStack

CREATE TABLE コマンドを使用すれば、Webhook リクエストを受け付けるテーブルをすぐに作成できます。以下の例では、SECRET を使用して受信リクエストを検証しています。

CREATE SECRET test_secret WITH (backend = 'meta') AS 'secret_value';

CREATE TABLE wbhtable (
  data JSONB
) WITH (
  connector = 'webhook'
) VALIDATE SECRET test_secret AS secure_compare(
  headers->>'{header of signature}',
  {signature generation expressions}
);

詳細情報:

MySQL テーブル値関数(TVF)

このバージョンでは、新しいテーブル値関数(TVF)mysql_query が導入されました。この関数を使用すると、CDC ソースを作成せずに MySQL テーブルからデータを必要に応じて取得できます。

CDC ソースのセットアップが不要な場合、mysql_query テーブル値関数(TVF)は軽量な代替手段となります。この関数は、静的または更新頻度の低い MySQL テーブルをクエリするのに特に有用で、CDC ソースと比較して計算コストを削減できます。

例えば、MySQL に以下の users テーブルがあるとします。

first_name | last_name | age | city
-----------+-----------+-----+------
Aaron      | Jones     | 24  | NYC
Shelly     | Doe       | 36  | SF
Taylor     | Smith     | 52  | NYC       

mysql_query 関数を使用すると、テーブルからデータを取得できます。また、関数の戻り値を CREATE TABLE コマンドでテーブルとして作成することも可能です。

SELECT * FROM mysql_query(
	'localhost',
	'3336',
	'superuser',
	'password123',
	'mydb',
	'SELECT * FROM users WHERE age >= 25;'
);
---
first_name | last_name | age | city
-----------+-----------+-----+------
Shelly     | Doe       | 36  | SF
Taylor     | Smith     | 52  | NYC   

詳細情報:

新しい Rust 製 PostgreSQL シンク

RisingWave では、新たに Rust 製のネイティブ PostgreSQL シンクコネクタが導入され、従来の JDBC API への依存が不要になりました。この新しいコネクタにより、パフォーマンスが向上し、PostgreSQL との接続をより柔軟に制御できるようになりました。新コネクタは、upsertappend-only の両方のシンクタイプをサポートしています。

Rust 製シンクコネクタを JDBC シンクコネクタの代わりに使用するには、streaming.developer 設定 stream_switch_jdbc_pg_to_nativetrue に設定します。

その後、通常どおり CREATE SINK コマンドを使用して PostgreSQL シンクを作成できます。

CREATE SINK rust_pg_sink FROM sink_content WITH (
    connector = 'postgres',
    host = 'localhost',
    port = 4566,
    user = 'superuser',
    password = 'pass123',
    database = 'dev',
    table = 'sink_table',
    type = 'append_only'
);

詳細情報:

まとめ

以上が v2.2 の主要な新機能です。ソース・シンクコネクタの更新を含むすべてのアップデートについては、リリースノート をご覧ください。

来月も新機能の追加と改善を予定していますので、ぜひご期待ください。また、RisingWave GitHub リポジトリ で最新の開発状況や今後のリリース予定をチェックしてください。

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?