2
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

バッチETLにさよなら:RisingWave×Snowflakeで実現するリアルタイム分析パイプライン

Last updated at Posted at 2025-06-18

Fresh Data, Faster: Stream Directly from RisingWave to Snowflake.png

Snowflakeウェアハウスにフレッシュで分析可能なデータを届けるのは、時として時間との戦いのように感じられます。信頼性の高い従来型のバッチETLプロセスは、処理に遅延が生じる可能性があり、その結果、インサイトは常に現実より数時間(あるいはそれ以上)遅れてしまいます。もしこのレイテンシを大幅に縮小できたら、しかもすでに変換・整形済みのデータをSnowflakeに直接届けられたとしたら、どうでしょう?

それこそが、RisingWaveの新しいSnowflakeシンクコネクタで実現されることです。

初めての方のために説明すると、RisingWaveは統合型リアルタイムデータプラットフォームであり、イベントストリームデータを到着と同時に取り込み、処理し、変換するよう設計されています。RisingWaveはデータの「行き先」だけでなく、「出どころ」にも非常に柔軟です。Apache KafkaやApache Pulsarといった人気のストリーミングプラットフォームからのデータ取り込みはもちろん、PostgreSQLやMySQLといったトランザクションデータベースからのCDC(変更データキャプチャ)ストリームにも対応。あるいは、アプリケーションやマイクロサービス間のインタラクションから生成されるイベントストリームの取り込みも可能です。RisingWaveは、こうした多様なデータに対して、結合、集約、ウィンドウ処理といったSQLベースの複雑な演算をリアルタイムで行える強力なエンジンなのです。

そして今、Snowflakeシンクコネクタによって、RisingWaveからSnowflakeテーブルへ直接、継続的なデータブリッジを構築することが可能になりました。
つまり、Snowflakeに入るデータと、それをどのように扱うかに、どんな変化が起きるのでしょうか?

  • より早くデータを受け取り、迅速に行動を起こす:
    最も即効性のある効果は、データの鮮度です。バッチジョブで一定間隔ごとにデータを収集・変換・ロードするのを待つ代わりに、RisingWaveは処理済みのデータを継続的にストリーミングします。これにより、Snowflakeに届くデータは常に最新で、ダッシュボードやレポートもリアルタイムの状況を反映します。

  • データ前処理の簡素化:
    多くの重たい変換処理は、データがSnowflakeに到達するにRisingWave内で実行されます。RisingWaveは、複雑なデータ変換、エンリッチメント、集約処理をストリーム上で実施するのが得意です。複数のストリームの結合、ランニングトータルの計算、複雑なロジックによるイベントフィルタリングなどがすべてRisingWave内で処理され、Snowflakeに届くデータはすでに整形され、即時利用可能な状態です。これにより、Snowflake内での変換ロジックが大幅に簡素化され、計算コストの削減にもつながる可能性があります。

内部ではどのように機能するのか?

その仕組みはとてもスムーズです。RisingWaveは指定されたデータソースからデータを取り込み、定義されたリアルタイム変換処理を実行し、その処理済みデータをJSON形式でユーザー管理のS3バケットへ書き出します。そこから、SnowflakeのSnowpipeサービスが自動的かつ継続的にそのデータをターゲットのSnowflakeテーブルへロードします。S3に対応したSnowpipeの既存の自動化機能とシームレスに連携するよう設計されています。

Snowpipeを使うなら、なぜRisingWaveも必要なのか?

この疑問はもっともです。最終的にデータはS3に書き出され、SnowpipeがSnowflakeへのロードを担うのであれば、なぜ最初から生データをS3に送ってSnowpipeだけで済ませないのか?

たしかに、SnowpipeはS3などのステージからSnowflakeテーブルへのファイルロードを効率的かつ自動的に行う優れた仕組みです。生データや準構造化データをデータウェアハウスに取り込むには最適な手段です。

しかし、RisingWaveの導入によって得られる決定的な違いは、データがS3に到達するに何が起きるかという点にあります:

  1. リアルタイムかつ複雑な変換処理:
    RisingWaveはデータをストリーム中に処理・変換します。これにより、たとえばKafkaからのユーザークリックストリームとCDCストリームからのユーザー属性データをSQLで結合する、複雑な集約処理を行う、詳細なフィルタリングロジックを適用する、データをリッチ化する──といった高度な操作を、すべて出力前に実現可能です。一方、Snowpipe単体では単なるファイルロードしかできず、変換はSnowflake内で行う必要があり、その分レイテンシが増し、別途計算リソースも必要になります。

  2. ステートフル処理とマテリアライズドビュー:
    RisingWaveは、到着した新しいデータに応じてインクリメンタルに更新されるマテリアライズドビューを維持することができます。これにより、セッション分割やリアルタイムのリーダーボードといった複雑な分析ビューを事前に計算可能です。Snowflakeに取り込まれるのは生データではなく、こうした整形済みかつ即時クエリ可能なビューになります。

  3. ソースとの直接統合と初期処理:
    RisingWaveはKafka、Pulsar、CDCストリームなどの多様なソースと直接接続可能で、取り込みプロトコルの処理やデシリアライズ、初期フィルタリングやスキーマ検証といった処理も複雑な変換の前に実行可能です。Snowpipeをこれらのソースと直接連携させようとすると、別のシステムやカスタムコードで一旦S3ファイルに落とす必要があります。

  4. Snowflake内のデータ整備・レイテンシ削減:
    RisingWaveからSnowflakeへはすでに変換・集約され、分析準備が整った状態でデータが届くため、Snowflake内での追加処理は大幅に簡素化または不要になります。これにより、インサイトを得るまでの時間が短縮され、Snowflake上のコンピュートコストも削減できる可能性があります。

はじめに:導入の概要

導入は、いくつかの重要なステップから成ります。まずはS3バケットの詳細情報を用意し、SnowpipeがそのS3ロケーションを監視するように設定する必要があります(Snowflake公式のAmazon S3向けSnowpipe自動化ガイドが参考になります)。

次に、RisingWave内でシンクを作成します。たとえば、すでにストリーミングデータを処理しているマテリアライズドビュー ss_mv がある場合、以下のように設定します:

CREATE SINK snowflake_sink
FROM ss_mv -- マテリアライズドビューまたはソース
WITH (
    connector = 'snowflake',
    type = 'append-only', -- アップサート対応も可能!後述参照
    s3.bucket_name = 'your-s3-bucket-name',
    s3.credentials.access = 'your-aws-access-key',
    s3.credentials.secret = 'your-aws-secret-key',
    s3.region_name = 'your-s3-bucket-region',
    s3.path = 'path/to/data/', -- オプション:バケット内の特定パス
    force_append_only = 'true' -- 追記のみが必要な場合
);

更新や削除(アップサート)はどうするのか?

RisingWaveで AS CHANGELOG を使ってマテリアライズドビューを定義すると、RisingWaveは新規行だけでなく、更新や削除も追跡するようになります。このとき、__op(操作タイプ:insert, delete, update_before, update_after)や __row_id(変更の順序付け用)といった特殊な列が付与されます。

CREATE SINK snowflake_sink as WITH sub AS changelog FROM user_behaviors
SELECT
user_id,
target_id,
event_timestamp AT TIME ZONE 'America/Indiana/Indianapolis' as event_timestamp,
changelog_op AS __op,
_changelog_row_id::bigint AS __row_id
FROM
sub WITH (
connector = 'snowflake',
type = 'append-only',
s3.bucket_name = 'EXAMPLE_S3_BUCKET',
s3.credentials.access = 'EXAMPLE_AWS_ACCESS',
s3.credentials.secret = 'EXAMPLE_AWS_SECRET',
s3.region_name = 'EXAMPLE_REGION',
s3.path = 'EXAMPLE_S3_PATH',
);

そしてSnowflake側では、以下のようにDynamic Tableを定義することで、常に最新状態を保つビューを自動的に構築できます:

-- Snowflake内で実行
CREATE OR REPLACE DYNAMIC TABLE current_user_behaviors
TARGET_LAG = '1 minute' -- 鮮度の指定(例:1分以内)
WAREHOUSE = your_snowflake_warehouse
AS
SELECT *
FROM (
    SELECT *,
    ROW_NUMBER() OVER (PARTITION BY primary_key_column ORDER BY __row_id DESC) as rn
    FROM your_staging_table_fed_by_snowpipe
)
WHERE rn = 1 AND (__op = 1 OR __op = 3); -- __op=1(Insert)、__op=3(Update_after)

提供状況

Snowflakeシンクコネクタは、RisingWaveのPremium Edition機能のひとつです。RisingWave Cloudでは追加費用なしで利用可能です。セルフホスト環境でも、4コア以下のデプロイメントであれば無料でPremium Editionのすべての機能を使用できます。4コアを超える場合は、ライセンスキーが必要となります。

データからインサイトまでのパイプラインを加速させませんか?

もはや古くなったデータを待つ必要はありません。RisingWaveと新しいSnowflakeシンクコネクタを組み合わせれば、常に最新で継続的に更新され、あらかじめ整形されたデータによって、Snowflakeでの分析がパワーアップします。

私たちは、あなたがSnowflake上でリアルタイムデータをどのように活用し、新たな可能性を切り開いていくかを楽しみにしています!

2
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?