はじめに
Cassandra Day Tokyo
今年、2023年6月1日に、Cassandra Dayが日本でも開催されます。
Cassandra Dayは、昨年、ベルリン、ロンドン、アムステルダム、ハノイ、ジャカルタ、ヒューストン、サンタクララ、シアトル、シンガポールでも開催されました。
今回の東京での開催に向けて、Apache Cassandraに関する記事を発表していきます。
Apache Cassandraについて
Apache Cassandraとは、一言でいうなら、オープンソースの分散データベース管理システムです。
他の分散データベース管理システム同様、複数の汎用サーバーを用いて、ひとつのデータベースを構築します(開発などの目的のため、一つのサーバーのみで構成することも可能です)。
ここでは、詳しい説明は割愛し、興味のある方へのご紹介の役割は、公式サイトやWikipediaに譲ります。
Cassandra リアクティブ スタイル プログラミング
本稿の内容は、以下のドキュメントを元にしています。
Cassandra リアクティブ スタイル プログラミングの紹介と基本的な使い方については、本稿執筆者による以下の記事を参照ください。
高度なトピック
クエリ結果のキャッシュ
ReactiveResultSet
をサブスクライブすることは一度のみ可能です。これは意図的な設計上の決定です。そうしないと、ユーザーが同じクエリに 2 回目をサブスクライブするときに、利用者の意図とは異なり、同じクエリの、1回目のサブスクライブとは異なる実行が再びトリガーされる可能性があるからです。
テーブル列のすべての値の平均と合計の両方を計算したいとします。最も素朴なアプローチは、2 つのフローを作成し、両方にサブスクライブすることです。
2回クエリすることを避けようという意図による、以下のコードは正常に動きません。
// DON'T DO THIS
ReactiveResultSet rs = session.executeReactive("SELECT n FROM ...");
double avg = Flux.from(rs)
.map(row -> row.getLong(0))
.reduce(0d, (a, b) -> (a + b / 2.0))
.block();
// will fail with IllegalStateException
long sum = Flux.from(rs)
.map(row -> row.getLong(0))
.reduce(0L, (a, b) -> a + b)
.block();
上記の 2 番目の Flux
は、rs
がすでにサブスクライブされているため、IllegalStateException
をカプセル化する onError
シグナルで即座に終了します。
テーブルを 2 回クエリすることを回避しながら、この制限を回避するための最も簡単な方法は、ほとんどのリアクティブ ライブラリが提供するcache
を使用することです。
Flux<Long> rs = Flux.from(session.executeReactive("SELECT n FROM ..."))
.map(row -> row.getLong(0))
.cache();
double avg = rs
.reduce(0d, (a, b) -> (a + b / 2.0))
.block();
long sum = rs
.reduce(0L, (a, b) -> a + b)
.block();
上記のコードは問題なく動作します。
オペレータcache
はReactiveResultSet
を一度のみサブスクライブし、結果をキャッシュして、キャッシュされた結果をダウンストリーム サブスクライバに提供します。
ただし、これは、結果セットが小さく、メモリに収まる場合にのみ可能です。
キャッシュの戦略が採用できない場合、ほとんどのリアクティブ ライブラリは、アップストリーム サブスクリプションを多くのサブスクライバーにオンザフライでマルチキャストするオペレーターも提供します。
上記の例は、次のように別のアプローチで書き直すことができます。
Flux<Long> rs = Flux.from(session.executeReactive("SELECT n FROM ..."))
.map(row -> row.getLong(0))
.publish() // multicast upstream to all downstream subscribers
.autoConnect(2); // wait until two subscribers subscribe
long sum = rs
.reduce(0L, (a, b) -> a + b)
.block();
double avg = rs
.reduce(0d, (a, b) -> (a + b / 2.0))
.block();
上記の例では、publish
オペレーターはすべてのonNext
シグナルをすべてのサブスクライバーにマルチキャストします。autoConnect(2)
オペレーターは、アップストリーム ソースにサブスクライブする (そして実際のクエリ実行をトリガーする) 前に、2 つのサブスクリプションを取得するまで待機するようにpublish
に指示します。
このアプローチは、結果をメモリにキャッシュする必要がないため、大規模な結果セットに適しています。