Java
EventSourcing
CQRS
lagom

Java+microservice+DDD+CQRS+ESなフレームワーク lagom の勉強メモ 3(Read Sideについて)

More than 3 years have passed since last update.


概要

lagom について、CQRSの C(コマンド)の部分と、イベントソーシング、および 実行環境(ConductR)について適当に解説した。

今回は、CQRSのQ(クエリ)について、実際にサンプルコードも作りつつ解説。

Qに関する機能は大別して以下の2つがある。


  • 読み取り専用のデータを作る ReadSideProcessor

  • データを読み取る CassandraSession


参考資料・サンプルソース

自作のサンプルについては、chirper を相当参考にさせていただいて作った。

内容としては、銀行の口座取引を行うものを想定して、

口座作成、入金、出金、入出金履歴の照会を行うというもの。

口座振替のような操作は、考えることが想定以上にあり、ストップしている。


ReadSideProcessor について

CQRS では、コマンド(更新系)とクエリ(参照系)で別々のアーキテクチャや永続化方式を採用してもよいことになっている。

そうすることで、性能特性の異なる更新と参照両方の性能を満たそうとする。

代わりに犠牲になるのは、データの一貫性になる。

更新系のイベントが発生した後、時間差で非同期に参照系のデータを作るようになる。

このような作りを、結果整合性(Eventual Consistency)と呼んだりする。

結果整合性は、マイクロサービス間のデータ連携でも用いられる。

前置きが長くなったが、ReadSideProcessorは イベントの発生後に非同期で行う処理を定義するクラスで、クラスを登録しておけばイベント発生後にlagomが勝手にクラスのメソッドを実行する。

定義する内容は、クエリのためのテーブル作成やオフセットの取得・イベントに対応したレコードの登録になる。

なお、現状ではlagomはクエリ側の永続化装置としてはCassandraのみをサポートしているため、

実際には CassandraReadSideProcessorを継承したクラスを作る必要がある。


CassandraReadSideProcessorのメソッド

CassandraReadSideProcessor では3つのメソッドを定義する。


  • aggregateTag

  • prepare

  • defineEventHandlers


aggregateTagメソッド

こんな実装になる。

java

@Override

public AggregateEventTag<TransactionEvent> aggregateTag() {

return TransactionEventTag.INSTANCE;

}

aggregateTag(集約タグ)とは、どのイベントをこのReadSideProcessorで処理させるかを指定する値となる。

また、集約タグはイベントクラスにも同様に設定をする必要がある。

public interface TransactionEvent extends Jsonable ,AggregateEvent<TransactionEvent>{

/** 集約タグ */
@Override
default AggregateEventTag<TransactionEvent> aggregateTag() {
return TransactionEventTag.INSTANCE;
}

@SuppressWarnings("serial")
@Immutable
@JsonDeserialize
public final class AccountCreated implements TransactionEvent {
// 口座作成で発生するイベント。
// TransactionEvent のdefaultメソッドaggregateTagを暗黙的に実装している。
}
//他のイベントDTOも同じようにTransactionEvent を実装。

イベント用のDTOは、ReadSideProcessorに処理させる場合は、aggregateTagメソッドを実装する必要があり、上の例ではdefaultメソッドを実装したインターフェースを各DTOに実装することで、定義を一箇所にまとめている。

ReadSideProcessorとイベントDTO両者にaggregateTagメソッドを実装して、同じ値を返すものがReadSideProcessorで処理される。今回のサンプルでは、口座作成、入金、出金といったイベントが対象になる。

(Springだと設定ファイルとかアノテーションで済ます内容が、lagomだとこのようなAPI実装とかになっているものが多いという印象)


prepareメソッド

prepareメソッドでは、テーブルの初期化を行い、オフセットを返す。

   @Override

public CompletionStage<Optional<UUID>> prepare(CassandraSession session) {
// prepare read side tables, statement and get event offset.
// @formatter:off
return prepareCreateTables(session).thenCompose(a ->
prepareWriteAccount(session).thenCompose(b ->
prepareWriteHistory(session).thenCompose(c ->
prepareUpdateAccount(session).thenCompose(d ->
prepareWriteOffset(session).thenCompose(e ->
selectOffset(session))))));
// @formatter:on
}


オフセットについて

オフセットとは、ここではタイムスタンプ値のUUIDを指す。

lagomではすべてのイベントにユニークなオフセットが保持されている。

prepareで返すオフセットとは、つまり、このReadSideProcessorが最後に処理したイベントのオフセットとなる。

    //オフセット取得メソッド

private CompletionStage<Optional<UUID>> selectOffset(CassandraSession session) {
return session.selectOne("SELECT offset FROM account_offset")
.thenApply(optionalRow -> optionalRow.map(r -> r.getUUID("offset")));
}

prepareが呼び出されるのは、アプリケーションの初期化時。

そのときに、オフセットを取得し、Cassandraに永続化しているイベント一覧からオフセット以降に追加されたイベント一覧を取得し、参照データ作成を行うようになる。

これは、障害から回復したときや環境以降したときに、今までに発生したイベントとそこから作られる参照系データを正しく同期させるためだ。

つまりは、Cassandraのイベント一覧(messagesテーブル)のデータさえあれば、いつでも参照用データは復元できる。

参照データの構造に変更があった際は、参照用テーブルとオフセットを削除してアプリケーションを再起動すれば、それで参照データが再作成される。

そのため、オフセットは参照データ作成時に更新しないといけない。

(または、何度参照データの作成処理が動いても、影響が起きないようなつくりにしないといけない)

注意する点として、lagomはイベントのオフセットは渡してくれるが、それをどうやって保存して、どうやって取得するかは自分で実装しないといけない。

上記コードの、account_offsetテーブルは自作のテーブルになる。


テーブル作成

オフセット保持用のテーブルや、クエリ用のテーブルのprepareメソッドの中で作成する。

CassandraSessionは、SpringのJDBCテンプレートのようなAPIで、任意のCQL(cassandraのSQL)を発行できる。

    // テーブル作成メソッド

private CompletionStage<Done> prepareCreateTables(CassandraSession session) {
// @formatter:off
return session.executeCreateTable(
"CREATE TABLE IF NOT EXISTS account ("
+ "account_id text, name text, balance bigint, "
+ "PRIMARY KEY (account_id))")
.thenCompose(a -> session.executeCreateTable(
"CREATE TABLE IF NOT EXISTS transaction_history ("
+ "account_id text, at timestamp, amount bigint, type text, "
+ "PRIMARY KEY (account_id, at))")
.thenCompose(b -> session.executeCreateTable(
"CREATE TABLE IF NOT EXISTS account_offset ("
+ "partition int, offset timeuuid, "
+ "PRIMARY KEY (partition))")));
// @formatter:on
}

また、データ作成処理を効率化するため、事前にINSERT文をPreparedStatement化しておく。

ステートメントは、ReadSideProcessorのフィールドに持っておく。

    private CompletionStage<Done> prepareWriteOffset(CassandraSession session) {

return session.prepare("INSERT INTO account_offset (partition, offset) VALUES (1,?)").thenApply(ps -> {
this.writeOffset = ps;
return Done.getInstance();
});
}


defineEventHandlers

defineEventHandlers メソッドでは、イベントの内容に応じたデータ作成処理を記述する。

前述したとおり、このメソッドはアプリケーションでイベントが発生したときと、

アプリケーションの初期化時にprepareメソッドで取得したオフセット以降に発生したイベントがある場合の両方で実行される。

   @Override

public EventHandlers defineEventHandlers(EventHandlersBuilder builder) {
// when Account created, insert account table;
builder.setEventHandler(TransactionEvent.AccountCreated.class, (ev, offset) ->{
BoundStatement st = writeAccount.bind()
.setString("account_id", ev.id)
.setString("name", ev.name);

BoundStatement stOffset = writeOffset.bind(offset);

return completedStatements(Arrays.asList(st, stOffset));
});
// 他のイベントDTOも同様に定義していく。

EventHandlersBuilderのsetEventHandlerメソッドに、イベントクラスとイベントに対応したラムダ式を記載し、ラムダ式ではPreparedStamentにデータをバインドして、Statementの一覧を返すようにする。

Statemntの実行はlagomが非同期で行う。

ここでは、オフセットの更新を忘れないようにすることと、

参照データを作成するための情報はイベントDTOしかないということの2点。

特に後者が面倒。

更新系の操作(エンティティのステート更新)では、イベントDTOと現在のステートの情報を使って新しいステートを更新することができるが、

参照系では現在のステートの情報は見れない。

そのため、参照データを作る情報が足りない場合、イベントDTOに持つ情報を増やすか、SELECT文を一度発行する必要が出てくる。面倒。


CassandraSessionについて

クエリ側処理では、CassandraSession を用いてデータ取得を行う。

サービスクラスでCassandraSessionを用いる場合、DIを使う。

ついでに、ReadSideProcessorを有効にする場合も、サービスクラスのコンストラクタで行う。


private final PersistentEntityRegistry persistentEntityRegistry;
private final CassandraSession db;

@Inject
public BankServiceImpl(PersistentEntityRegistry persistentEntityRegistry,CassandraReadSide readSide,
CassandraSession db) {
this.persistentEntityRegistry = persistentEntityRegistry;
persistentEntityRegistry.register(AccountEntity.class);
// CassandraSession をDI
this.db = db;
// ReadSideProcessorの登録もここで行う。
readSide.register(TransactionEventProcessor.class);
}


クエリの実装

クエリはどうやって実装するのかというと、恐ろしいことに CQLを直接書くことになる。

CQRSのQは単純な実装でよいという考えを素直にあらわしたものだと思う。

このような処理をサービスに書きたくないのであれば、DAOなどの仕組みを導入すればよいだろう。

  @Override

public ServiceCall<String, NotUsed, PSequence<MoneyTransaction>> getHistory() {
return (id, req) -> {
CompletionStage<PSequence<MoneyTransaction>> result
= db.selectAll("SELECT * FROM transaction_history WHERE account_id = ? order by at DESC", id)
.thenApply(rows -> {
List<MoneyTransaction> list = rows.stream().map(r -> new MoneyTransaction(r.getString("type"), r.getLong("amount"),
toLocalDateTime(r.getTimestamp("at")))).collect(Collectors.toList());
return TreePVector.from(list);
});

return result;
};
}

CassandraSessionのselect系メソッドは、呼出し後にラムダ式でResultSetから戻り値のデータ型へのマッピングを定義する。

あと、CassandraSessionの注意点として、クエリの結果はCompletableFutureのような非同期計算となることがあげられる。

データ変換などはmapなどの関数合成で行う。

この辺は、Slickなんかを使っている人ならわかるんじゃないかと思う。


ReadSideProcessorとクエリを試す

上記で実装した内容を試してみます。

口座作成コマンドを実行する。

curl -X POST -d '{"id":"001", "name":"test"}' http://localhost:9000/api/account

イベントの永続化はこの時点で行われる。

何秒かたってReadSideProcessorが実行され、Accountテーブルにデータが挿入される。

cqlsh:bank_impl> select * from account;

account_id | balance | name
------------+---------+------
001 | 0 | test

Accountテーブル挿入後、このテーブルからデータを読み込むクエリを実行できる。

$ curl  http://localhost:9000/api/account/001

{"id":"001","name":"test","balance":0,"transactionsOfDay":[]}

ついで、入金、出金

#入金-1000

$ curl -X PUT -d '{"amount":1000}' http://localhost:9000/api/account/001/deposit
#出金-500
curl -X PUT -d '{"amount":500}' http://localhost:9000/api/account/001/withdrawal

すると、イベント永続化の後、何秒か経ってから、Accountテーブルの残高が更新され、取引履歴が挿入される。

cqlsh:bank_impl> select * from account;

account_id | balance | name
------------+---------+------
001 | 500 | test

cqlsh:bank_impl> select * from transaction_history;

account_id | at | amount | type
------------+--------------------------+--------+------------
001 | 2016-04-18 04:15:48+0000 | 1000 | DEPOSIT
001 | 2016-04-18 04:16:38+0000 | 500 | WITHDRAWAL

履歴参照用のクエリも実行可能になる。

$ curl  http://localhost:9000/api/account/001/history

[{"type":"WITHDRAWAL","amount":500,"at":[2016,4,18,13,16,38,494000000]},
{"type":"DEPOSIT","amount":1000,"at":[2016,4,18,13,15,48,411000000]}]

というわけで、CQRS のコマンドとクエリ両方を連携させることができた。

クエリ側の反映がリアルタイムでないというのが、従来のアーキテクチャにない特徴だと思う。


まとめ

ReadSide のAPIを使うことで、CQRS + EventSourcing 方式のアプリケーションをどうやって作るかという具体的なイメージがやっと沸いたと感じた。

実プロジェクトに適用できるかは別として、興味をそそられる仕組みだと思う。

障害とか移行時のデータ復元もちゃんと考えてあるし。

作っていて思ったのは、このサンプルはサービスは今のところ1つにしているのだけど、

多分、口座作成などのコマンドと、入金・出金などのコマンドはそれぞれ別々のサービスにしたほうがいいだろうなと感じている。

いろいろと機能を増やしていくにあたって、口座そのものを管理する機能は共通的な機能として別にあったほうがいいのと、lagomだとサービス分けてもそんなに修正負荷は高くなささそうなのが理由となる。

口座振替のような複数口座をまたがる処理はどうしたらよいのかというのは、いろいろと考えることがでてきたので次回にまとめ対と思う。