Edited at

Java+microservice+DDD+CQRS+ESなフレームワーク lagom の勉強メモ 1 (サービスの構成)

More than 3 years have passed since last update.

lightbend社(旧 typesafe社) から先日 lagom というフレームワークがリリースされました。

DDD, CQRS, ES などに興味があったのでちょっと調べてみることにしました。

長くてまとまりがないですが、調査したことをメモとしてまとめてみます。


lagom とは

lagom は Java によるマイクロサービス指向のフレームワークです。

以下が特徴のようです。


  • DDDの境界付けられたコンテキストによってサービスを分割し、それぞれのサービスは akkaの各プロダクトによって作られた実行環境で互いに(クラスタ構成であっても)連携する。

  • 各サービスの実装はCQRS(コマンドクエリ責務分離)とES(イベントソーシング)の非同期呼び出しが基本となっていて、永続装置として cassandra がデフォルトになっている。

  • 開発環境でのホットリロード(ソース変更の反映)、dockerコンテナによる本番環境のシミュレーションがサポートされている。

現時点(2016/3)では Java API のみが提供されていますが、Scala API も準備中らしいです。

Java を優先した理由については、「Java 界隈に モノシリックなシステムをマイクロサービスに移行させるツールが望まれているから、そして Scala を使う企業は、lagomがなくてもマイクロサービスを導入できる下地があるから」だそうです ここから適当に翻訳

Java EE に対するアンチテーゼっぽい立ち位置ですが、 Spring Boot + Spring Cloud や AXON Framework なんかが競合になるんですかね。


どうやってlagomを勉強していくか

以下の流れで行こうと思っている。


  1. サンプルアプリを実行、読込む (いまここ)

  2. ドキュメントを読む

  3. 何かつくってみる


私について


知っている・やっていること


知らないこと・やってないこと


  • akka の関する知見

  • cassandra に関する知見

  • マイクロサービス的なシステムを実際に作ったことはない


簡単な用語解説


  • CQRS - コマンドクエリ責務分析。アプリケーションをコマンド(Write Side) と クエリ(Read Side) に分けることで、変更系と参照系それぞれの特性に合わせたものを作ること。コマンドは外部からの操作命令で、コマンドごとに必要なパラメータを定義したDTO を作る。

  • ES - イベントソーシング。イベント(CRQSではコマンドの発行に対応)1つ1つを永続化すること。対義語はステートソーシングで、DBなどに現時点の最新情報のみを永続化していくような方式にあたる。 ES では操作内容がログとして残るため、状態の変更や復元が容易。

  • エンティティ - lagom のエンティティは DDD の文脈でのエンティティを指していると思われる。ID で一意に識別されるデータで、コマンドで処理する基本的な単位になっていて、集約ルートにも該当すると思われる。


lagom のインストールとサンプル実行

公式のGetting Startにインストールとサンプルを実行するまでの手順が載っている。

Play Frameworkをやっていればおなじみの、Activator や SBT を使うスタイル。

インストールして、 sbt runAll とすれば、サンプルアプリケーションが起動する。

cassandra も組み込みで提供されるので、他に用意するものはない。

サンプルを起動し、/api/Hello/:id エンドポイントにGET すると、応答が帰ってくる。

$ curl http://localhost:9000/api/hello/Alice

Hello, Alice

Getting Start に書いてあるのはこれだけなので、感動することはないのだけど、サンプルソースを見ていくともう少し機能がある。

POST リクエストで、id ごとに、応答文を変更することができる。

message をプロパティに持つ JSON 文字列をPOSTすることで、Hello の部分を変更できる。

$ curl -X POST -d  '{"message":"Hi"}' http://localhost:9000/api/hello/Alice

{ "done" : true }
$ curl http://localhost:9000/api/hello/Alice
Hi, Alice

$ curl http://localhost:9000/api/hello/Bob
Hello, Bob

もう一つ、websocket による非同期型のデータの送受ができる。

var ws = new WebSocket("ws://localhost:9000/hellostream");

// コールバックを設定
ws.onmessage=function(res){console.log(res.data)}
// メッセージ送信
ws.send("Alice");
// 非同期でコンソールに、"Hi, Alice"が出力
ws.send("Bob");
// 非同期でコンソールに、"Hello, Bob"が出力

というわけで、lagom は REST(同期) または Webscoket(非同期) で 文字列・JSON を扱うフレームワーク。


サンプルコードのプロジェクト構成

サンプルコードは、SBTプロジェクト形式で提供される。プロジェクトには4つのサブプロジェクトがある。


  • helloworld-api

  • helloworld-impl

  • hellostream-api

  • hellostream-impl

1つのサービスについてapiとimplの2つのプロジェクトで構成される。

サンプルコードには、helloworldとhellostreamの2つのサービスがあることになる。


helloworld プロジェクトの中身


サービス定義でエンドポイントの設定を行う

APIプロジェクトでは、サービスが提供するメソッドとエンドポイントの対応をdescriptorメソッドにDSLで記述する。

Service Descriptor と呼ばれる。

// サンプルコードから抜粋

public interface HelloService extends Service {

ServiceCall<String, NotUsed, String> hello();

ServiceCall<String, GreetingMessage, Done> useGreeting();

@Override
default Descriptor descriptor() {
// @formatter:off
return named("helloservice").with(
restCall(Method.GET, "/api/hello/:id", hello()),
restCall(Method.POST, "/api/hello/:id", useGreeting())
).withAutoAcl(true);
// @formatter:on
}
}

これをみると、 GET /api/hello/:id に helloメソッド、 POST /api/hello/:id に useGreetingメソッドを対応させていることがわかる。

また、サービスメソッドは、 ServiceCall を返すメソッドになる。

型パラメータは、それぞれ、エンティティのIDの型,クライアントからの入力の型、クライアントへの出力の型を示す。

ServiceCall の実装は、Implプロジェクトで行う。


サービス実装・イベント処理の定義

Implプロジェクトでは、サービス実装やコマンド・イベント処理を定義する


サービス実装は、エンティティにコマンドを発行する

HelloServiceImpl の 更新系メソッドの useGreeingはこんな感じの実装になっている。


public class HelloServiceImpl implements HelloService {
// レジストリ
private final PersistentEntityRegistry persistentEntityRegistry;

// 依存関係は、DIで設定
@Inject
public HelloServiceImpl(PersistentEntityRegistry persistentEntityRegistry) {
this.persistentEntityRegistry = persistentEntityRegistry;
persistentEntityRegistry.register(HelloWorld.class);
}
// helloメソッドは省略
@Override
public ServiceCall<String, GreetingMessage, Done> useGreeting() {
return (id, request) -> {

// Look up the hello world entity for the given ID.
PersistentEntityRef<HelloCommand> ref = persistentEntityRegistry.refFor(HelloWorld.class, id);

// Tell the entity to use the greeting message specified.
return ref.ask(new UseGreetingMessage(request.message));
};
}
}

サービスでは、レジストリからエンティティをロードし、 エンティティにコマンドを発行するのが基本になる。

ServiceCall の実装は、 ID,Requestをパラメータに取り、CompletionStage を返すラムダ式として定義する。

上の例ではわかりづらいですが、 return文にある ref.askメソッドは CompletionStage を返します。

CompletionStage は Java8 で追加された非同期処理用API CompletableFuture の一部です。

CompletableFuture は合成可能な非同期計算を表すもので、 Scala の Future やJS の Promiseに近いもの。

lagom の処理は基本的に非同期処理として記述する必要がある。

また、レジストリなどライブラリに依存するものや他のサービスなどは、DI(Guice)で取得する。


エンティティで、コマンドからイベントの永続化とステート更新を行う

サンプルコードのエンティティである、HelloWorld のソースを抜粋します。

まず宣言部。

public class HelloWorld extends PersistentEntity<HelloCommand, HelloEvent, WorldState> {

PersistentEntity を継承する。 型パラメータは、コマンドの型、イベントの型、ステートの型

エンティティでは、コマンドを外部から受け付けると、コマンドに対応したイベントを作成して永続化を行い、イベントからステートを更新するというロジックを記述する。

ステートはイベントにより状態が変わっていくデータを指している。

サンプルコードでは、応答文に含まれる "Hello" や "Hi" などのメッセージを持つオブジェクト。

コマンド、イベント、ステートに関する処理は、 Behavior で行う。

  @Override

public Behavior initialBehavior(Optional<WorldState> snapshotState) {
// ステートの初期状態
BehaviorBuilder b = newBehaviorBuilder(
snapshotState.orElse(new WorldState("Hello", LocalDateTime.now().toString())));
  // コマンドの処理。イベントの生成と永続化を行い、コマンドの戻り値を設定。
b.setCommandHandler(UseGreetingMessage.class, (cmd, ctx) ->
// In response to this command, we want to first persist it as a
// GreetingMessageChanged event
ctx.thenPersist(new GreetingMessageChanged(cmd.message),
// Then once the event is successfully persisted, we respond with done.
evt -> ctx.reply(Done.getInstance())));

/*
* イベントの処理。イベントオブジェクトと現在のステートから新しいステートを作る。
*/

b.setEventHandler(GreetingMessageChanged.class,
evt ->{
WorldState current = state(); // Debug
System.out.println("event came.!curret="+current);
return new WorldState(evt.message, LocalDateTime.now().toString());
}
);

setCommandHandler メソッドで、コマンドごとに行う処理をラムダ式で記述する。

コマンドの処理では、イベントの作成と永続化を行い、戻り値を返している。

setEventHandler メソッドで、イベントに対する処理をラムダ式で記述する。

イベントオブジェクトのパラメータや、stateメソッドで取得できる現在のステートから、

新しいステートを戻り値にする。

lagomでは基本的にイミュータブルでオブジェクトを扱うため、ステートの更新も新しいステートの作成で行う。

Behavior のポイントの1つは、コマンド処理とイベント処理の記述場所が分かれていることだと思う。イベント処理でステート更新を行うことで、コマンドを受けつけたとき以外にも、永続化されたイベントの一覧をイベント処理に順次適用することで、ステートの最新状況が復元できることがわかりやすくなっていると思う。

もう1つのポイントは、ステートの意味。ステートは、エンティティの最新の状態を持つ、アプリケーションとして意味のある内容を持つデータ(DDD的に集約でいいのかな?)になる。

従来型のアプリケーションであれば、最新の状態はDBなどから引っ張ってくるものになるが、lagomではメモリ上のステートが最新となる。

メモリのデータを信用してもよいの? と感じるが、 ドキュメントを見る限り、akka-persistenceやakka-cluster により、耐障害性を持ち複数クラスタであっても一意性が保証されるらしい。

というわけで、サービス、エンティティ、Behavior、cassandra の関係をまとめた図は以下の通り。

lagom-2-service.png


イベント永続化の中身

イベントを永続化すると何が起きているのかを、cassandraにアクセスして調べてみた。

(cassandra 3.0.2 (cql spec 3.3.1) のcqlで調査できた。)

cassandraのキースペース(Oracleでいうスキーマ)はサービスごとに作成される。

$ c:\Python27\python cqlsh.py localhost 4000

Connected to Test Cluster at localhost:4000.
[cqlsh 5.0.1 | Cassandra 3.0.2 | CQL spec 3.3.1 | Native protocol v4]

cqlsh> use helloworld_impl;
cqlsh:helloworld_impl> describe tables;

messages config snapshots metadata

messages,config,snapshots,metadata の4つのテーブルがあり、 messages が永続化されたイベントを蓄積するテーブルになっていた。

ついでに、snapshots はイベント復元の効率を改善するため、定期的(デフォルトでは100件のイベント)に、複数のイベントを集約した内容を保持するテーブルになっている。

messages テーブルの主要な属性のみを出力してみた。

cqlsh:helloworld_impl> select persistence_id,sequence_nr , event, ser_manifest from messages;

persistence_id | sequence_nr | event | ser_manifest
------------------+-------------+--------------------------------------------+----------------------------------------------------------
HelloWorldAlice | 1 | 0x7b226d657373616765223a224869227d | sample.helloworld.impl.HelloEvent$GreetingMessageChanged
HelloWorldAlice | 2 | 0x7b226d657373616765223a2248656c6c6f227d | sample.helloworld.impl.HelloEvent$GreetingMessageChanged

テーブルには、IDごとにイベントの発生ごとに、イベントメッセージ(0x7b22,,,は16進数化されたJSON文字列で、{"message":"Hi"} のような内容が記録されている)と、イベントクラス(GreetingMessageChanged) が記録されている。

そんなわけで、この内容からイベントオブジェクトを再生し、ステートを復元できている。

というわけで、 helloworld プロジェクトの内容については以上。

CQRSの コマンドと、イベントソーシングの単純なサンプルとなっている。


hellostream プロジェクトの中身

hellostream プロジェクトは、websocketのサンプルで示した /hellostream エンドポイントに対応したサービスになっている。


websocket を提供するには Source を使う

API プロジェクトの Service Descriptor は以下の通り

public interface HelloStream extends Service {

ServiceCall<NotUsed, Source<String, NotUsed>, Source<String, NotUsed>> stream();

@Override
default Descriptor descriptor() {
return named("hellostream").with(namedCall("hellostream", stream())
).withAutoAcl(true); // これを追加しないとエンドポイントが有効にならない。
}
}

websocket を扱う場合、 ServiceCallの型パラメータに、 Source を設定する。

Source は akka の API でストリーム形式(連続的に発生するデータ)を扱うものっぽい。


他のサービスをインジェクションして使う

hellostream-impl をみてみる。

hellostream は、helloworld のようにコマンドやイベントを扱っているわけではなく、 helloworld プロジェクトのサービスをインジェクションして委譲しているようになっている。

public class HelloStreamImpl implements HelloStream {

private final HelloService helloService;

@Inject
public HelloStreamImpl(HelloService helloService) {
this.helloService = helloService;
}

@Override
public ServiceCall<NotUsed, Source<String, NotUsed>, Source<String, NotUsed>> stream() {
return (id, hellos) -> completedFuture(
hellos.mapAsync(8, name -> {
return helloService.hello().invoke(name, NotUsed.getInstance());
}
));
}
}

他のサービスの連携をインジェクションだけで済ませられるのが、lagom(というか akkaの位置透過性だろうか)のすごいところだと思う。

というのも、2つのサービスは独立しているので、それぞれ個別のポートで稼動しているためだ。

通常なら、REST APIなどで接続する必要があるが、そういった処理が内部で隠蔽されている。

本番環境では、クラスタ構成を採用でき、2つのサービスが別々のクラスタにあっても問題なく動く。


その他の機能

サンプルコードには無いが、lagomには以下のような機能があるため、そのうち使ってみる。


  • PubSub イベントの発生を他のサービスに連携する

  • ReadSide API - クエリ側のAPIや、クエリ側に最適化したテーブルを作成するAPI

  • Circuit Breaker - サービス連携の異常時に代理応答を返してサービス全体の異常を防ぐ仕組み


個人的な感想とまとめ

akkaに関する知見がないので、lagom の機能なのか akka の機能なのかが区別がつかず、色々と困ることがあった。

CQRSとESの挙動を追っているだけで、色々と発見があり調べていて面白かった。

DBに最新の状態だけを保持する方法だと、履歴を持ったりするときに色々と困った経験があったので、ES が必要とされる場面は結構多いと感じます(ES オンリーだけでいけるとは思いませんが)。

なお、DDDの経験がないので個人的に疑問に思っているのが、複数エンティティをまたぐ操作ってどうするんだろうかと。

サービスのメソッドはエンティティのIDを受け取って動くので、どうするのがいいんだろうかと考えている。

実用的なところを考えると、プログラムの記述に、Java8のdefaultメソッド・ラムダ式・CompletableFuture を多用し、Java API とは言いながら Scala っぽいスタイルを要求されるので、Javaだけの経験者にとっては、プログラムを書くだけでも結構難しいんじゃないかとは思う。

他に、現時点では cassandra のみがサポートされている状況というのも、現行のモノシリックなアプリケーションから移行するケースなどでは厳しさを感じる。一応拡張が可能なようにはなっているので、今後の改善が期待できる部分ではありますが。

あとは、結果整合性を許容できるかですね。

長くなったので、サービス全体の取りまとめとか、本番環境へのデプロイなんかは次に。