LoginSignup
0
0

【Road to Cassandra Day】Cassandra リアクティブ スタイル プログラミング~高度なトピック②クエリ結果のキャッシュ

Posted at

はじめに

Cassandra Day Tokyo

今年、2023年6月1日に、Cassandra Dayが日本でも開催されます。
Cassandra Dayは、昨年、ベルリン、ロンドン、アムステルダム、ハノイ、ジャカルタ、ヒューストン、サンタクララ、シアトル、シンガポールでも開催されました。

今回の東京での開催に向けて、Apache Cassandraに関する記事を発表していきます。

image.png

Apache Cassandraについて

Apache Cassandraとは、一言でいうなら、オープンソースの分散データベース管理システムです。

他の分散データベース管理システム同様、複数の汎用サーバーを用いて、ひとつのデータベースを構築します(開発などの目的のため、一つのサーバーのみで構成することも可能です)。

ここでは、詳しい説明は割愛し、興味のある方へのご紹介の役割は、公式サイトやWikipediaに譲ります。

Cassandra リアクティブ スタイル プログラミング

本稿の内容は、以下のドキュメントを元にしています。

Cassandra リアクティブ スタイル プログラミングの紹介と基本的な使い方については、本稿執筆者による以下の記事を参照ください。

高度なトピック

クエリ結果のキャッシュ

ReactiveResultSetをサブスクライブすることは一度のみ可能です。これは意図的な設計上の決定です。そうしないと、ユーザーが同じクエリに 2 回目をサブスクライブするときに、利用者の意図とは異なり、同じクエリの、1回目のサブスクライブとは異なる実行が再びトリガーされる可能性があるからです。

テーブル列のすべての値の平均と合計の両方を計算したいとします。最も素朴なアプローチは、2 つのフローを作成し、両方にサブスクライブすることです。

2回クエリすることを避けようという意図による、以下のコードは正常に動きません。

// DON'T DO THIS
ReactiveResultSet rs = session.executeReactive("SELECT n FROM ...");
double avg = Flux.from(rs)
    .map(row -> row.getLong(0))
    .reduce(0d, (a, b) -> (a + b / 2.0))
    .block();
// will fail with IllegalStateException
long sum = Flux.from(rs)
    .map(row -> row.getLong(0))
    .reduce(0L, (a, b) -> a + b)
    .block();

上記の 2 番目の Flux は、rs がすでにサブスクライブされているため、IllegalStateException をカプセル化する onError シグナルで即座に終了します。

テーブルを 2 回クエリすることを回避しながら、この制限を回避するための最も簡単な方法は、ほとんどのリアクティブ ライブラリが提供するcacheを使用することです。

Flux<Long> rs = Flux.from(session.executeReactive("SELECT n FROM ..."))
    .map(row -> row.getLong(0))
    .cache();
double avg = rs
    .reduce(0d, (a, b) -> (a + b / 2.0))
    .block();
long sum = rs
    .reduce(0L, (a, b) -> a + b)
    .block();

上記のコードは問題なく動作します。

オペレータcacheReactiveResultSetを一度のみサブスクライブし、結果をキャッシュして、キャッシュされた結果をダウンストリーム サブスクライバに提供します。

ただし、これは、結果セットが小さく、メモリに収まる場合にのみ可能です。

キャッシュの戦略が採用できない場合、ほとんどのリアクティブ ライブラリは、アップストリーム サブスクリプションを多くのサブスクライバーにオンザフライでマルチキャストするオペレーターも提供します。

上記の例は、次のように別のアプローチで書き直すことができます。

Flux<Long> rs = Flux.from(session.executeReactive("SELECT n FROM ..."))
    .map(row -> row.getLong(0))
    .publish()       // multicast upstream to all downstream subscribers
    .autoConnect(2); // wait until two subscribers subscribe
long sum = rs
    .reduce(0L, (a, b) -> a + b)
    .block();
double avg = rs
    .reduce(0d, (a, b) -> (a + b / 2.0))
    .block();

上記の例では、publishオペレーターはすべてのonNextシグナルをすべてのサブスクライバーにマルチキャストします。autoConnect(2)オペレーターは、アップストリーム ソースにサブスクライブする (そして実際のクエリ実行をトリガーする) 前に、2 つのサブスクリプションを取得するまで待機するようにpublishに指示します。

このアプローチは、結果をメモリにキャッシュする必要がないため、大規模な結果セットに適しています。

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0