この記事では、RisingWave と PuppyGraph を使用してリアルタイムのグラフ分析パイプラインを構築する方法を探ります。これにより、私たちはこれらの課題を克服し、効率的でリアルタイムなグラフデータの処理と分析を実現できます。まず、RisingWave と PuppyGraph について詳しく見ていきましょう。
RisingWave: リアルタイムデータストリーミングとCDCのバックボーン
RisingWave は、PostgreSQL と完全互換性を持つモダンな SQL ベースのデータプラットフォームで、現代のデータ駆動型アプリケーションの動的なニーズに応えるよう設計されています。多用途性を念頭に置いて設計されたこのプラットフォームは、イベント駆動型アーキテクチャ、リアルタイムETLパイプライン、継続的な分析サービス、AI 用のフィーチャーストアをサポートし、データの価値を活用しようとする組織にとって欠かせないツールです。RisingWave は、ライブイベントストリームの処理、CDC(変更データキャプチャ)を通じたデータベースの変更の取得、または時系列データの分析など、さまざまなソースから新鮮で一貫性のあるインサイトを提供することに優れています。
その高性能アーキテクチャは、サブ秒の遅延で結果を生成するように設計されており、迅速な意思決定が求められるシナリオにおいて重要な能力です。ストリーム処理とバッチ処理の両方の強みをシームレスに統合することで、RisingWave はユーザーがリアルタイムデータと履歴データを単一の統合フレームワーク内で簡単に取り込み、結合し、分析できるようにします。この統合により、複雑なデータワークフローが簡素化され、分析の俊敏性が向上します。
さらに、RisingWave のクラウドネイティブな設計により、増加するワークロードに対応するためにスケールアップが容易であり、信頼性を確保します。マネージドクラウド環境でもセルフホストインフラストラクチャでも、プラットフォームの弾力的なスケーラビリティと信頼性の高いパフォーマンスにより、最も要求の厳しいデータアプリケーションにも対応できます。本質的に、RisingWave は組織がリアルタイムのインサイトを解放し、イノベーションを推進するための柔軟で効率的なソリューションを提供し、現代のデータ処理の進化する風景に適応します。
PuppyGraph: リレーショナルデータをリアルタイムでグラフとしてクエリ
PuppyGraph は、業界で初めてのリアルタイム、ゼロETLのグラフクエリエンジンで、データチームが既存のリレーショナルデータストアを統一されたグラフモデルとして10分以内にクエリできるようにし、従来のグラフデータベースのコスト、遅延、メンテナンスの課題を回避します。モダンなデータ駆動型アプリケーション向けに設計された PuppyGraph は、データの移行や複製の複雑さを排除し、データが新鮮で最新であることを保証します。このゼロETLアプローチにより、組織はデータ複製による追加コストや遅延を発生させることなく、既存のデータストアを活用できます。
迅速なパフォーマンスとスケーラビリティを実現するように設計された PuppyGraph は、テラバイト規模のデータセットやペタバイト規模のデータを管理し、数十億の相互接続された関係を扱うことができます。人気のあるグラフクエリ言語であるGremlin と openCypher をサポートしており、PuppyGraph は複雑なデータ関係を簡単に探索し、従来見逃されがちな隠れたパターンを発見する手助けをします。このグラフとリレーショナルのパラダイムのシームレスな統合により、複雑なワークフローが簡素化され、詐欺検出 や サイバーセキュリティ からテレメトリーログ分析に至るまで、さまざまなユースケースにおける分析の俊敏性が向上します。
PuppyGraph は、ビジネスが相互接続されたデータにアクセスして分析する方法を変革し、データの整合性と運用のシンプルさを保ちながら、貴重なインサイトを即座に見つけることを可能にします。
RisingWave + PuppyGraph: リレータイムグラフ分析のためのリレーショナルデータ
RisingWave と PuppyGraph のシームレスな統合により、高速なデータ処理と深いリレーショナルインサイトを結びつける強力なリアルタイムグラフ分析パイプラインが構築されます。RisingWave は、ライブデータストリーム、変更データキャプチャイベント、および履歴レコードを一つの統一されたフレームワーク内で継続的にキャプチャし処理します。これにより、組織は新鮮で一貫性のあるデータにサブ秒の遅延で即時にアクセスでき、動的な環境での迅速な意思決定が可能になります。
この強力なデータ基盤を元に、PuppyGraph は ETL なしでその出力を包括的なグラフモデルに変換します。複数のデータソースを単一のグラフとして直接クエリすることで、データチームは従来の SQL 分析では見逃される可能性のある複雑な関係や隠れたパターンを発見することができます。
デモ
次に、この仕組みがどのように機能するのかを、ステップバイステップで示すデモで確認してみましょう。詳細な手順と資料は GitHub リポジトリ にあります。デモデータは LDBC Financial Benchmark から派生しています。このベンチマークは、グラフデータベースのワークロードとデータセットを定義し、詐欺防止やリスク管理といった現実世界の金融シナリオをシミュレートしています。
前提条件
サービスの起動
以下のコマンドを実行して、パイプラインを含むサービス(Kafka、RisingWave、PuppyGraph)を起動します。
docker compose up -d
Python 仮想環境の作成
Python 仮想環境を作成し、confluent_kafka
パッケージをインストールします。
python3 -m venv myvenv
source myvenv/bin/activate
pip install confluent-kafka
Kafka トピックの作成
Python スクリプト topics.py
を実行して、トピックを作成します。
python topics.py -c
RisingWave と Kafka の接続
PostgreSQL クライアントを使用して、rw_kafka.sql
の SQL コマンドを実行します。
psql -h localhost -p 4566 -d dev -U root -f rw_kafka.sql
これにより、各ストリームのソースと対応するマテリアライズドビューが作成されます。たとえば、kafka-Account
トピックは以下の SQL コマンドを使用します:
CREATE SOURCE IF NOT EXISTS account_stream (
"label" varchar,
"accountId" bigint,
"createTime" timestamptz,
"isBlocked" boolean,
"accountType" varchar,
"nickname" varchar,
"phonenum" varchar,
"email" varchar,
"freqLoginType" varchar,
"lastLoginTime" timestamptz,
"accountLevel" varchar
)
WITH (
connector='kafka',
topic='kafka-Account',
properties.bootstrap.server='kafka:9092',
scan.startup.mode='earliest'
)
FORMAT PLAIN ENCODE JSON;
そして
CREATE MATERIALIZED VIEW IF NOT EXISTS account_mv AS
SELECT * FROM account_stream;
スナップショットデータのインポート
Python スクリプト topics.py
を実行して、スナップショットデータをインポートします。
python topics.py -s
RisingWave でのクエリ(オプション)
PostgreSQL クライアントを介して RisingWave でスナップショットデータを確認できます。
psql -h localhost -p 4566 -d dev -U root
たとえば、テーブル、ビュー、シーケンスをリスト表示:
\d
グラフのモデリング
PuppyGraph の Web UI にログインします:http://localhost:8081。以下の認証情報を使用します:
-
ユーザー名:puppygraph
-
パスワード:puppygraph123
スキーマをアップロード:Web UI で「Upload Graph Schema JSON」の下の schema.json
ファイルを選択し、「Upload」をクリックします。
PuppyGraph でのクエリ
スナップショットデータに対して Gremlin や Cypher のクエリを試すことができます。
-
左側のクエリパネルに移動します。Gremlin クエリタブでは、Gremlin を使ってグラフをクエリするインタラクティブな環境が提供されます。
-
各クエリの後、グラフパネルをクリアして次のクエリを実行し、クリーンな可視化を保ちます。ページ右上の「Clear」をクリックしてください。
-
Cypher クエリの場合は、Graph Notebook と Cypher Console を使用できます。
たとえば:
- Gremlin:アカウントの数を取得
g.V().hasLabel('Account').count()
- Cypher:アカウントの数を取得
MATCH (x:Account) RETURN count(x)
増分データのインポート
Python スクリプト topics.py
を実行して、増分データをインポートします。
python topics.py -i
PuppyGraph でのリアルタイムクエリ
PuppyGraph で引き続きクエリを実行し、新しいデータが追加されると、クエリ結果がどのように変化するかを確認できます。
クリーンアップと解体
コンテナとネットワークを停止して削除するには、次のコマンドを実行します:
docker compose down -v
結論
要約すると、RisingWave と PuppyGraph を使用して、リアルタイムのグラフ分析パイプラインを構築しました。RisingWave はサブ秒の遅延でリアルタイムデータをキャプチャして処理し、PuppyGraph は ETL なしで関係データを統一されたグラフビューに変換します。デモでは、Kafka からのデータ取り込みからグラフモデリング、クエリ実行までの全プロセスを示し、インタラクティブなグラフ可視化 UI を通じて即座に実行可能な洞察を提供しました。
この共同構成に興味がある場合は、無料で始められます—RisingWave Cloud を試して、PuppyGraph Developer Edition を今すぐダウンロードしてみてください!