はじめに
Cassandra Day Tokyo
今年、2023年6月1日に、Cassandra Dayが日本でも開催されます。
Cassandra Dayは、昨年、ベルリン、ロンドン、アムステルダム、ハノイ、ジャカルタ、ヒューストン、サンタクララ、シアトル、シンガポールでも開催されました。
今回の東京での開催に向けて、Apache Cassandraに関する記事を発表していきます。
Apache Cassandraについて
Apache Cassandraとは、一言でいうなら、オープンソースの分散データベース管理システムです。
他の分散データベース管理システム同様、複数の汎用サーバーを用いて、ひとつのデータベースを構築します(開発などの目的のため、一つのサーバーのみで構成することも可能です)。
ここでは、詳しい説明は割愛し、興味のある方へのご紹介の役割は、公式サイトやWikipediaに譲ります。
Cassandraリアクティブスタイルプログラミング~紹介
CassandraのJavaドライバーは、リアクティブ クエリの組み込みサポートを提供します。
この記事は、2023年5月時点で最新のJavaドライバー4.15をベースにしています。
ReactiveSession
はCqlSession
インターフェイスを拡張し 、リアクティブ ストリームで表現されたリクエストを実行するための特殊なメソッドを追加します。
Reactive Streams API
Java Driver 4.15のリアクティブ機能は、Reactive Streams APIをベースにしています。
(3.xでは、GoogleのJavaライブラリーGuavaをベースにしていました)
ドライバーはそのライブラリに依存していますが、アプリケーションがリアクティブ クエリをまったく使用しない場合は、それを除外して実行時の依存関係の数を最小限に抑えることができます。
歴史的な理由から、リアクティブ関連のドライバー タイプは、先頭にdse
(DataStax Enterpriseの略称)が付いたパッケージに存在します。
とはいえ、リアクティブクエリは、オープンソースのApache Cassandraでも機能します。
リアクティブ実行モデルは、ノンブロッキング方式で実装されています。 詳細については、ノンブロッキング プログラミングのマニュアルを参照してください。
概要
ReactiveSession
は、2 つのパブリック メソッドを公開します。
-
ReactiveResultSet executeReactive(String query)
; -
ReactiveResultSet executeReactive(Statement<?> statement)
;
どちらのメソッドも、通常のResultSet
のリアクティブ ストリーム バージョンである ReactiveResultSet
を返します。
リアクティブスタイルプログラミングの文脈では、ReactiveResultSet
はクエリ結果を扱う「パブリッシャー」の役割となります。
ReactiveResultSet
に「サブスクライブ」する際、次の 2 つのことに留意する必要があります。
-
ReactiveResultSet
ドライバーによって返されるすべての実装は、デフォルトでは、コールド、ユニキャスト、単一サブスクリプションのみのパブリッシャーです。つまり、複数のサブスクライバーの利用はサポートされていません。複数のダウンストリーム サブスクライバーでそれらを使用する必要がある場合は、そのようなパブリッシャーによって生成された結果をキャッシュすることを検討します。(ドキュメントでは、キャッシングの例が紹介されています)。 -
ReactiveResultSet
は、内部的に、IO スレッドを用います。サブスクライバーの実装者は、 Reactive Streams 仕様ルール 2.2を順守し、負荷の高い計算を実行したり、onNext
呼び出し内で呼び出しをブロックしたりしないようにすることが推奨されます。これらを実行すると、パフォーマンスに影響を与える可能性があります。サブスクライバーでは、データを処理ロジックに非同期的にディスパッチする必要があります。
基本的な使い方
リアクティブスタイルのREAD
次の例は、テーブルから読み取り、返されたすべての行をコンソールに出力します。
try (CqlSession session = ...) {
Flux.from(session.executeReactive("SELECT ..."))
.doOnNext(System.out::println)
.blockLast();
} catch (DriverException e) {
e.printStackTrace();
}
リアクティブスタイルのWRITE
次の例は、クエリをコンソールに出力した後、テーブルに行を挿入します。
try (CqlSession session = ...) {
Flux.just("INSERT ...", "INSERT ...", "INSERT ...", ...)
.doOnNext(System.out::println)
.flatMap(session::executeReactive)
.blockLast();
} catch (DriverException e) {
e.printStackTrace();
}
ステートメントがリアクティブに実行される場合、実際のリクエストはReactiveResultSet
が サブスクライブされている場合にのみトリガーされることに注意してください。
つまり、executeReactive
メソッドが戻るとき、 まだ何も実行されていません。これが、上記の書き込みの例でflatMap
を使用している理由です。このメソッドは、executeReactive
メソッド呼び出しによって返された それぞれのサブスクライブを処理します。
たとえば、次のコードでは、ReactiveResultSetsession.executeReactiveReactiveResultSet
がサブスクライブされていないため、クエリは実行されません。
// DON'T DO THIS
Flux.just("INSERT INTO ...")
// The returned ReactiveResultSet is not subscribed to
.doOnNext(session::executeReactive)
.blockLast();
クエリ メタデータへのアクセス
ReactiveResultSet
は、リクエストの実行とクエリのメタデータに関する有用な情報を公開します。
Publisher<? extends ColumnDefinitions> getColumnDefinitions();
Publisher<? extends ExecutionInfo> getExecutionInfos();
Publisher<Boolean> wasApplied();
上記のパブリッシャーのコンテンツを調べるには、単にサブスクライブします。これらのパブリッシャーは、クエリ自体が完了する前に実行できないことに注意してください。クエリが失敗すると、これらのパブリッシャーは同じエラーで失敗します。
次の例では、クエリを実行し、使用可能なすべてのメタデータをコンソールに出力します。
ReactiveResultSet rs = session.executeReactive("SELECT ...");
// execute the query first
Flux.from(rs).blockLast();
// then retrieve query metadata
System.out.println("Column definitions: ");
Mono.from(rs.getColumnDefinitions()).doOnNext(System.out::println).block();
System.out.println("Execution infos: ");
Flux.from(rs.getExecutionInfos()).doOnNext(System.out::println).blockLast();
System.out.println("Was applied: ");
Mono.from(rs.wasApplied()).doOnNext(System.out::println).block();
行レベルでクエリ メタデータを検査することも可能です。リアクティブクエリの実行によって返される各行は、Row
クラスのリアクティブ実装であるReactiveRow
です。
ReactiveRow
では、 ReactiveResultSet
と同じ種類のクエリ メタデータと実行情報が公開されます が、個々の行ごとに次のようになります。
ColumnDefinitions getColumnDefinitions();
ExecutionInfo getExecutionInfo();
boolean wasApplied();
次の例では、クエリを実行し、返された行ごとに、その行を処理したコーディネーターを出力します。次に、クエリを満たすために接続されたすべてのコーディネーターを取得し、それらをコンソールに出力します。
Iterable<Node> coordinators = Flux.from(session.executeReactive("SELECT ..."))
.doOnNext(
row ->
System.out.printf(
"Row %s was obtained from coordinator %s%n",
row,
row.getExecutionInfo().getCoordinator()))
.map(ReactiveRow::getExecutionInfo)
// dedup by coordinator (note: this is dangerous on a large result set)
.groupBy(ExecutionInfo::getCoordinator)
.map(GroupedFlux::key)
.toIterable();
System.out.println("Contacted coordinators: " + coordinators);
最後に
本稿では、CassandraデータベースのJavaドライバーがリアクティブスタイルプログラミングに対応していることを紹介し、その基本的な使い方を説明しました。
また別の機会にさらに進んだ内容についても紹介したいと思っています。