ストリーム処理(Stream Processing)はデータ処理の一種であり、従来のバッチ処理(Batch Processing)とは異なり、ストリーム処理はリアルタイムで継続的に流れるデータを処理します。このモデルでは、データが到着するとすぐに処理され、すべてのデータが収集された後に処理が行われるバッチ処理とは異なります。ストリーム処理は、リアルタイム分析、監視、金融リスク管理、IoT データ処理など、低遅延とリアルタイム応答を必要とするアプリケーションシナリオに一般的に使用されます。
背景
ビッグデータ分野では、一般的なストリーム処理フレームワークには Apache Spark Streaming、Apache Flink などがあります。しかし、オープンソースコミュニティの統計によると、ユーザーがストリームシステムを使用する際、実際にアプリケーション開発やビジネスロジックの実装に費やされる時間はわずか 20% であり、残りの 80% はシステム関連の問題対応、運用、調整などに費やされています。これは開発の効率を大幅に低下させます。
本記事では、具体的な使用ケースを通じて、新しい分散アーキテクチャの SQL ストリームデータベース RisingWave を紹介します。RisingWave は増分更新と一貫性を備えた マテリアライズドビュー を提供するストリーム処理データベースであり、ストリーム処理の結果を持続的なデータ構造として保存します。これにより、開発者はマテリアライズドビューをカスケードすることで複雑なストリーム処理ロジックを簡単に記述できるため、ストリーム処理アプリケーションの構築が大幅に簡素化されます。また、RisingWave はストリームを活用してデータベース間のリアルタイムレプリケーション機能を実現し、本記事ではこれを探求し、別のデータベース間レプリケーションの実装方法を提供します。
Demo ケースのアーキテクチャ
RisingWave はオープンソース版と商用版の両方を提供する製品であり、その機能は非常に幅広いため、本記事ですべてを網羅することはできません。そのため、本記事では最も代表的なシナリオを選択し、製品の特徴を具体的に説明します。興味のある読者は RisingWave の公式サイト または GitHub の公式コミュニティ を参照し、本記事の内容を基にさらに興味のある点を掘り下げることができます。
本ケースでは、AWS ネイティブの EKS に RisingWave 2.02 バージョンをデプロイ し、以下のシナリオを説明します。
- AWS ネイティブの EKS に RisingWave クラスタをデプロイ
- AWS マネージド MSK を RisingWave クラスタに接続し、対応するテーブルを作成してデータをインポート
- AWS マネージド Kinesis を RisingWave クラスタに接続し、対応するテーブルを作成してデータをインポート
- AWS マネージド RDS MySQL を使用し、ソースデータベースとターゲットデータベースを作成してデータベース間の全体および CDC レプリケーションを実現
上記のアーキテクチャ図は、AWS EKS 上に RisingWave をデプロイする際の大まかなロジックを示しており、2 つの部分に分かれています。一つ目は、RisingWave の実行を支える基盤となる AWS ネイティブのネットワーク、ストレージ、セキュリティ、およびコンピューティングリソースです。二つ目は、RisingWave の上流および下流のデータソースとして利用できる AWS のネイティブサービスです。たとえば、上流のデータソース として MSK(Amazon Kafka)、Kinesis、Amazon RDS などがあり、下流のデータシンク として DynamoDB、Amazon RDS、Amazon Cache RDS などが含まれます。
また、アーキテクチャ図では、RisingWave クラスタの上流(Sources)および下流(Sinks)が AWS ネイティブサービスをサポートしていることを示しており、詳細については本記事の最後の 附録 を参照してください。
テストデプロイおよびテスト手順
EKS クラスター構築および RisingWave クラスター構築
AWS EKS クラスターの構築は AWS の公式ドキュメントを参照してください:
具体的な EC2 インスタンスタイプとしては、ここでは m5.4xlarge
を選択します(下図参照):
今回のケースで必要となるプラグインの設定については、以下のドキュメントを参照してください:
RisingWave は公式推奨の Helm を使ったデプロイ方法を採用しています:
RisingWave は Amazon EKS と非常に良好に統合されており、Amazon EKS の準備が整っていれば、以下のコマンド一つでインストールを完了できます:
<p>[ec2-user@ip-1
[ec2-user@ip-172-31-62-218 eks]$ helm install -n risingwave --create-namespace --set wait=true -f values.yaml my-risingwave risingwavelabs/risingwave
NAME: my-risingwave
LAST DEPLOYED: Sun Nov 10 12:33:51 2024
NAMESPACE: risingwave
STATUS: deployed
REVISION: 1
TEST SUITE: None
NOTES:
Welcome to fast and modern stream processing with RisingWave!
Check the running status with the following command:
kubectl -n risingwave get pods -l app.kubernetes.io/instance=my-risingwave
Try accessing the SQL console with the following command:
kubectl -n risingwave port-forward svc/my-risingwave 4567:svc
Keep the above command running and open a new terminal window to run the following command:
psql -h localhost -p 4567 -d dev -U root
For more advanced applications, refer to our documentation at: https://www.risingwave.dev
72-31-62-218 eks]$ helm install -n risingwave --create-namespace --set wait=true -f values.yaml my-risingwave risingwavelabs/risingwave<br />NAME: my-risingwave<br />LAST DEPLOYED: Sun Nov 10 12:33:51 2024<br />NAMESPACE: risingwave<br />STATUS: deployed<br />REVISION: 1<br />TEST SUITE: None<br />NOTES:<br />Welcome to fast and modern stream processing with RisingWave!</p><p>Check the running status with the following command:</p><p>kubectl -n risingwave get pods -l app.kubernetes.io/instance=my-risingwave</p><p>Try accessing the SQL console with the following command:</p><p>kubectl -n risingwave port-forward svc/my-risingwave 4567:svc</p><p>Keep the above command running and open a new terminal window to run the following command:</p><p>psql -h localhost -p 4567 -d dev -U root</p><p>For more advanced applications, refer to our documentation at: https://www.risingwave.dev</p>
注意:Helm でデプロイを行う際に、RisingWave が提供する values.yaml
ファイルを使いますが、そのうち以下の2つの部分に注意が必要です:
## @section metaStore.mysql Values for MySQL backend.
mysql:
## @param metaStore.mysql.enabled Enable/disable the meta store backend.
##
enabled: true
## @param metaStore.mysql.host Host of the MySQL server.
##
host: "risingwave.cluster-XXXXXXX.us-east-1.rds.amazonaws.com"
## @param metaStore.mysql.port Port of the MySQL server. Defaults to 3306.
##
port: 3306
## @param metaStore.mysql.database Database of the MySQL server.
##
database: "risingwave"
## @param metaStore.mysql.options Options to connect.
##
options: {}
## @param metaStore.mysql.authentication Authentication information.
##
authentication:
## @param metaStore.mysql.authentication.username Username to connect the DB.
## If existingSecretName is specified, the field is ignored.
##
username: "XXXXXX"
## @param metaStore.mysql.authentication.password Password to connect the DB.
## If existingSecretName is specified, the field is ignored.
##
password: "XXXXXXXX"
## @param metaStore.mysql.authentication.existingSecretName Existing secret name.
## The Secret must contain `username` and `password` keys.
## If it is specified, username and password above are ignored.
##
existingSecretName: ""
この部分は外部の MySQL データベースを metaStore として指定するもので、ここでは EKS と同一リージョンの Amazon RDS を選択しています。その利点は、AWS マネージドデータベースの冗長性を活用して外部ストレージを提供できることで、EKS クラスターの変動に影響されず状態を安定的に保つことができます。また、EKS クラスター内のノードと RDS 間の通信が可能であることを保証する必要があります。ここでは Amazon RDS と EKS を同一の VPC に配置し、レイテンシーを抑えてパフォーマンスを確保しています。実際のプロダクション環境ではニーズに応じて調整してください。
stateStore:
## @param stateStore.dataDirectory RisingWave state store data directory.
## Must be a relative path. Max length is 800 characters.
## Default to "hummock" if set to empty.
##
dataDirectory: "hummock"
## @section stateStore.s3 S3 values.
##
s3:
## @param stateStore.s3.enabled Use S3 state store. Only one state store backend can be enabled.
##
enabled: true
## @param stateStore.s3.endpoint S3 endpoint URL for S3-compatible object storages.
##
endpoint: ""
## @param stateStore.s3.region S3 region.
##
region: "us-east-1"
## @param stateStore.s3.bucket S3 bucket.
##
bucket: "XXXXXXXXXXXX"
## @param stateStore.s3.forcePathStyle Enforce path style requests. If the value is false, the path could be one
## of path style and virtual hosted style, depending on the endpoint format. For more details of the two styles,
## please refer to the documentation: https://docs.aws.amazon.com/AmazonS3/latest/userguide/VirtualHosting.html
##
forcePathStyle: false
authentication:
## @param stateStore.s3.authentication.useServiceAccount Use S3 service account authentication.
## If enabled, accessKey and secretAccessKey will be ignored.
## Otherwise, accessKey and secretAccessKey are required and will be stored in a Secret.
##
useServiceAccount: false
## @param stateStore.s3.authentication.accessKey S3 access key.
##
accessKey: "AKXXXXXXXXXXXXX"
## @param stateStore.s3.authentication.secretAccessKey S3 secret access key.
##
secretAccessKey: "XXXXXXXXXXXXXXXXXXXX"
## @param stateStore.s3.authentication.existingSecretName Use existing Secret for S3 authentication.
## If set, use the existing Secret instead of creating a new one.
## Secret must contain AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY keys.
##
existingSecretName: ""
この部分は stateStore の設定であり、AWS S3 を状態ストレージとして使用するよう設定しています。テスト目的のため、ここでは Access Key によるアクセスを直接選択していますが、実際の運用環境では ServiceAccount を利用するのが推奨です。毎回同じ S3 バケットを再利用する場合、事前に古いデータを削除して新しい環境への影響を防ぐ必要があります。また AK-SK または ServiceAccount に対応するロールには、S3 バケットへの読み書き権限が必要です。
デプロイ完了後、以下のコマンドで RisingWave クラスターの状態を確認し、接続を行います。
[ec2-user@ip-172-31-62-218 ~]$ kubectl -n risingwave get pods -l app.kubernetes.io/instance=my-risingwave
NAME READY STATUS RESTARTS AGE
my-risingwave-compactor-d778ddb5d-wjhkf 1/1 Running 0 41h
my-risingwave-compute-0 1/1 Running 0 41h
my-risingwave-frontend-658c47cff4-nf44h 1/1 Running 0 41h
my-risingwave-meta-0 1/1 Running 0 41h
これで RisingWave クラスターが EKS 上にデプロイ完了しました。次のステップに進みます。
MSK を RisingWave データソースとして使用
AWS マネージド MSK クラスタを設定するためのサンプル設定は以下の通りです:
RisingWave で Kafka ソースを作成します:
CREATE TABLE IF NOT EXISTS jingamz (
cust_id VARCHAR,
month INT,
expenses NUMERIC
) WITH (
connector = 'kafka',
topic = 'risingwave',
properties.bootstrap.server = 'b-1.risingwave.XXXXXXX.kafka.us-east-1.amazonaws.com:9096',
scan.startup.mode = 'earliest',
properties.sasl.mechanism = 'SCRAM-SHA-512',
properties.security.protocol = 'sasl_ssl',
properties.sasl.username = 'jingamz',
properties.sasl.password = 'XXXXXXX'
) FORMAT PLAIN ENCODE JSON;
Kafka クライアントを使って MSK にデータを送信します:
[ec2-user@ip-172-31-62-218 risingwave]$ /home/ec2-user/kafka_2.13-3.5.1/bin/kafka-console-producer.sh --broker-list \
b-1.risingwave.zsob86.c10.kafka.us-east-1.amazonaws.com:9096 \
--producer.config /home/ec2-user/risingwave/client_sasl.properties \
--topic risingwave
>{ "cust_id": 3535353, "month": 12, "expenses": 81.12 }
>{ "cust_id": 3535353, "month": 12, "expenses": 81.12 }
>{ "cust_id": 3535353, "month": 12, "expenses": 81.12 }
>{ "cust_id": 1313131, "month": 12, "expenses": 1313.13 }
>{ "cust_id": 1313131, "month": 10, "expenses": 492.83 }
RisingWave クラスタ内で jingamz
テーブルのデータを確認します:
dev=> select * from jingamz;
cust_id | month | expenses
---------+-------+----------
1313131 | 12 | 1313.13
1313131 | 10 | 492.83
3535353 | 12 | 81.12
3535353 | 12 | 81.12
3535353 | 12 | 81.12
(5 rows)
dev=> select cust_id,sum(expenses) from jingamz group by cust_id;
cust_id | sum
---------+---------
3535353 | 243.36
1313131 | 1805.96
(2 rows)
dev=>
上記のように、ソースの Kafka クラスタから 5 件のデータがリアルタイムで RisingWave クラスタにテーブルとして保存されます。また、RisingWave のテーブルは SQL クエリによる集計をサポートしており、データストリームの統計情報をリアルタイムで集計できます。すべてのプロセスは SQL を使って行われ、psql
クライアントで簡単に操作できます。コードを使わなくても SQL の経験さえあれば、学習曲線は非常に平坦です。
AWS Kinesis をデータソースとして使用
AWS Kinesis データストリームの設定は以下の通りです:
RisingWave で Kinesis ソースを作成します:
dev=> CREATE TABLE IF NOT EXISTS jingamz_kinesis (
vendorId VARCHAR,
pickupDate VARCHAR,
dropoffDate VARCHAR,
passengerCount VARCHAR,
pickupLongitude VARCHAR,
pickupLatitude VARCHAR,
dropoffLongitude VARCHAR,
dropoffLatitude VARCHAR,
storeAndFwdFlag VARCHAR,
gcDistance VARCHAR,
tripDuration VARCHAR,
googleDistance VARCHAR,
googleDuration VARCHAR
) WITH (
connector='kinesis',
stream='input-stream',
aws.region='us-east-1',
aws.credentials.access_key_id = 'XXXXXXXXXXXX',
aws.credentials.secret_access_key = 'XXXXXXXXXXX'
) FORMAT PLAIN ENCODE JSON;
CREATE_TABLE
dev=>
クライアント側で Kinesis データの書き込みをシミュレートします:
[ec2-user@ip-172-31-62-218 kinesis]$ ls
kinesis-data.py
[ec2-user@ip-172-31-62-218 kinesis]$ python3 kinesis-data.py
Total ingested:1,ReqID:ce856857-13d9-887b-9128-3544cac21ba3,HTTPStatusCode:200
Total ingested:2,ReqID:e0c75913-b67f-4f9c-bf6a-04076f64dc44,HTTPStatusCode:200
Total ingested:3,ReqID:d3182e43-1e86-f87d-8cb5-7357c79d6ba5,HTTPStatusCode:200
RisingWave 内のデータを確認します:
dev=> select count(*) from jingamz_kinesis;
count
-------
134
(1 row)
dev=> describe jingamz_kinesis;
Name | Type | Is Hidden | Description
-------------------+-------------------+-----------+------------
vendorid | character varying | false |
pickupdate | character varying | false |
dropoffdate | character varying | false |
passengercount | character varying | false |
pickuplongitude | character varying | false |
pickuplatitude | character varying | false |
dropofflongitude | character varying | false |
dropofflatitude | character varying | false |
storeandfwdflag | character varying | false |
gcdistance | character varying | false |
tripduration | character varying | false |
googledistance | character varying | false |
googleduration | character varying | false |
_row_id | serial | true |
primary key | _row_id | |
distribution key | _row_id | |
table description | jingamz_kinesis | |
(17 rows)
dev=> select * from jingamz_kinesis limit 10;
データが RisingWave に同期され、テーブルに基づいて集計ができます。
Amazon RDS 間で CDC データ同期
Amazon RDS データベースの設定は以下の通りです:
RisingWave 内で RDS ソースを作成し、Source に基づくテーブルおよび Sink のターゲットを作成します。RDS のソーステーブルは jing
データベース内の people
テーブルで、Sink のターゲットは people_rw
です。ここではテストのため、ソーステーブルとターゲットテーブルが同じデータベース内にあります。実際の環境では、設定に応じて調整できます。
dev=> CREATE SOURCE mysql_jing WITH (
connector = 'mysql-cdc',
hostname = 'risingwave.cluster-XXXXXXXXXX.us-east-1.rds.amazonaws.com',
port = '3306',
username = 'admin',
password = 'XXXXXXX',
database.name = 'jing',
server.id = 970344889
);
CREATE_SOURCE
dev=> CREATE TABLE people_rw (
id INT,
first_name CHARACTER VARYING,
last_name CHARACTER VARYING,
email CHARACTER VARYING,
zipcode INT,
city CHARACTER VARYING,
country CHARACTER VARYING,
birthdate DATE,
added TIMESTAMPTZ,
PRIMARY KEY (id)
) FROM mysql_jing TABLE 'jing.people';
CREATE_TABLE
dev=> CREATE SINK s_mysql FROM people_rw WITH (
connector='jdbc',
jdbc.url='jdbc:mysql://risingwave.cluster-XXXXXXXXX.us-east-1.rds.amazonaws.com:3306/jing?user=admin&password=XXXXXXXX',
table.name='people_rw',
type = 'upsert',
primary_key = 'id'
);
CREATE_SINK
クライアントで people
テーブルにデータを書き込んでいることを確認します:
[ec2-user@ip-172-31-62-218 faker]$ ls
db_feeder_with_faker.py db_feeder_with_faker_jingamz.py db_feeder_with_faker_umu.py
[ec2-user@ip-172-31-62-218 faker]$ python3 db_feeder_with_faker_jingamz.py
Error creating table 1050 (42S01): Table 'people' already exists
iteration 10
iteration 20
iteration 30
iteration 40
iteration 50
^CTraceback (most recent call last):
File "/home/ec2-user/faker/db_feeder_with_faker_jingamz.py", line 63, in <module>
time.sleep(2)
KeyboardInterrupt
[ec2-user@ip-172-31-62-218 faker]$
ソーステーブルのデータ量を確認:
mysql> use jing;
Reading table information for completion of table and column names
You can turn off this feature to get a quicker startup with -A
Database changed
mysql> select count(*) from people;
+----------+
| count(*) |
+----------+
| 240 |
+----------+
1 row in set (0.06 sec)
mysql>
RisingWave クラスタ内の people_rw
テーブルのデータ量を確認:
dev=> select count(*) from people_rw;
count
-------
240
(1 row)
dev=>
ターゲットの Amazon RDS データベース内のデータ量を確認:
mysql> use jing;
Database changed
mysql> select count(*) from people_rw;
+----------+
| count(*) |
+----------+
| 240 |
+----------+
1 row in set (0.07 sec)
mysql>
この時点で、ソースデータベース、RisingWave クラスタ、ターゲットデータベースのデータ量が一致していることが確認できます。CDC 同期が成功しました。
データの更新を行い、RisingWave で変更内容がターゲットに反映されることを確認:
dev=> select * from people_rw where id=331;
id | first_name | last_name | email | zipcode | city | country | birthdate | added
-----+------------+-----------+-----------------------------------+---------+-----------+---------+------------+---------------------------
331 | Edward | Wood | bonniepowell@armstrong-waters.net | 4522 | New Scott | Ecuador | 1960-09-24 | 2024-11-10 14:18:06+00:00
(1 row)
dev=> update people_rw set email='jingamz@amazon.com' where id = 331;
UPDATE 1
dev=> select * from people_rw where id=331;
id | first_name | last_name | email | zipcode | city | country | birthdate | added
-----+------------+-----------+--------------------+---------+-----------+---------+------------+---------------------------
331 | Edward | Wood | jingamz@amazon.com | 4522 | New Scott | Ecuador | 1960-09-24 | 2024-11-10 14:18:06+00:00
(1 row)
dev=>
ターゲットデータベースでも変更が反映されることを確認:
mysql> select * from people_rw where id=331;
+-----+------------+-----------+--------------------+---------+-----------+---------+------------+---------------------+
| id | first_name | last_name | email | zipcode | city | country | birthdate | added |
+-----+------------+-----------+--------------------+---------+-----------+---------+------------+---------------------+
| 331 | Edward | Wood | jingamz@amazon.com | 4522 | New Scott | Ecuador | 1960-09-24 | 2024-11-10 14:18:06 |
+-----+------------+-----------+--------------------+---------+-----------+---------+------------+---------------------+
1 row in set (0.06 sec)
mysql>
ターゲットデータベースのデータが変更されていることが確認できました。これにより、RisingWave を使用することで、データの CDC 同期の途中で内容を変更できることが分かります。これは従来の CDC ツールではできなかったことです。同様の方法で、ストリーム処理のデータ内容も変更できます。
結論
上記のテスト手順を通じて、RisingWave が AWS EKS 上に簡単にデプロイできること、AWS の RDS データベース、MSK(Kafka)、Kinesis と連携して安定して動作すること が確認できました。また、RisingWave の公式サイトによると、RisingWave は StarRocks を含むさまざまな分析データベースとも統合でき、さまざまなビジネスシーンに対応できることがわかります。
さらに、RisingWave は オープンソース版に加えて Premium Edition も提供 しており、より多機能で強力なサポートが提供されます。詳細は以下を参照してください:
https://docs.risingwave.com/docs/current/rw-premium-edition-intro/
附録
RisingWave ストリームデータベースに関する詳細な情報は公式サイトを参照してください:
https://docs.risingwave.com/get-started/intro
RisingWave の対応するソース/シンクに関しては公式サイトを参照してください:
https://docs.risingwave.com/integrations/overview
RisingWave ストリームデータベースと Flink の機能およびパフォーマンスの比較については公式サイトを参照してください: