目次
記事について
- 当記事はリアクティブ宣言・CQRSの概要を説明後、Scala/Akka(typed)での実装を例に解説していきます。
- 対象の読者としてScalaプログラマ・Akkaで全体像が掴めない人を想定しております。(対象外の方にも理解出来るよう精一杯努力してはいます。)
- 注意点としてこの記事ではDDDに関する説明は行いません。DDDとCQRSパターンの関係については申し訳ありませんが、別の方の記事をご参照ください。
- 調査した内容を扱いますが解説に不備がある場合は当記事にコメント、もしくは@AerosmithBzまでご連絡ください。
- リアクティブ宣言のおさらいについてはリアクティブ宣言の要約となりますので、一読された事のある方は飛ばしていただいて構いません。
Akkaのライセンス変更について
AkkaはVersion 2.7.xからライセンスがBSL1.1に変更されます。
商用利用する際はご自身でお調べになってからご使用ください。
簡単な自己紹介
初めましての方は初めまして。
あさだ(HN)と申しまして、社内・SES等でWEBシステムの開発をしているエンジニア1年生です。
現在案件先ではPythonやPHPを主に使用していてScalaに触れ合う機会が無く、日々悲しみを歌にしております。
Scala歴としては約1年半ほどで、Akkaにつきましては一年ほど継続して開発(学習)しております。
GitHubについてはこちら。
リアクティブ宣言のおさらい
CQRSを学ぶにあたって根底の概念・思想であるリアクティブ宣言を把握しておいてほうが良いと思いますので、今回はリアクティブ宣言を噛み砕いて簡単に解説させていただきます。
ユーザーが期待するシステムとは何か
リアクティブ宣言本文には前書きとして以下のように記載されている。
ほんの数年前、巨大アプリケーションは数十のサーバから構成され、数秒の応答時間と数時間のオフラインメンテナンスを許容し、データは数ギガバイトだった。今日のアプリケーションは、モバイル機器から数千のマルチコアプロセッサによって動作するクラウドベースのクラスタまで、あらゆる機器上に配備される。ユーザはミリ秒の応答時間と 100% の稼働率を期待する。データはペタバイト単位で測定される。昨日のソフトウェアアーキテクチャは、今日の要求を全く満たしていない。
リアクティブ宣言 https://www.reactivemanifesto.org/ja
要約すると昔と比べ、システムは肥大化し、ユーザーの舌は肥え、データ量はペタバイト単位で扱われるようになった。
昨今のシステムは変化についていけておらず、ユーザーの期待する振る舞いを行えていない。
さてどのように解決していこうか。というお話です。
サーバー数 | 期待される応答時間 | データ量 | |
---|---|---|---|
昔 | 少 | 遅 | 数ギガ |
今 | 多 | 早 | ペタバイト単位 |
リアクティブシステムを構成する4要素
求めるものは、即応性と、耐障害性と、弾力性と、メッセージ駆動とを備えたシステムだ。我々はこれをリアクティブシステム (Reactive Systems) と呼ぶ。
リアクティブ宣言 https://www.reactivemanifesto.org/ja*
本文では上述の問題を解決するシステムとしてリアクティブシステムというアーキテクチャを宣言しており、構成する要素として即応性・耐障害性・弾力性・メッセージ駆動を掲げております。
それぞれ、
- 即応性
システムは可能な限りすみやかに応答する。 - 耐障害性
システムは障害に直面しても即応性を保ち続ける。 - 弾力性
システムはワークロードが変動しても即応性を保ち続ける。 - メッセージ駆動
リアクティブシステムは非同期なメッセージパッシングに依ってコンポーネント間の境界を確立する。
と解説付がされており、要約すると、壊れず拡張しやすくレスポンスの早いシステムを非同期メッセージングを使用して構築し問題を解決しようというのがリアクティブシステムの概要となります。
引用元:https://www.reactivemanifesto.org/ja
手段として、なぜメッセージ駆動か
本文には以下のように記載されております。
メッセージ駆動 (Message Driven): リアクティブシステムは非同期なメッセージパッシングに依ってコンポーネント間の境界を確立する。これによって、疎結合性、隔離性、位置透過性を保証すると共に、エラーをメッセージとして委譲する手段を確保する。明示的なメッセージパッシングは負荷の管理と弾力性を可能とする。また、システム内にメッセージキューを作成して監視し、必要ならバックプレッシャーを適用することでフロー制御が可能になる。通信の手段として位置透過なメッセージングを使うことで、通信がクラスタを跨ぐ場合も単一のホスト内の場合も、同じ構成とセマンティクスで障害を管理できる。ノンブロッキング通信により、受信側はアクティブ時のみリソースを消費できるのでシステムのオーバヘッドを抑制できる。
リアクティブ宣言 https://www.reactivemanifesto.org/ja※位置透過性とはランタイムインスタンスとその参照の分離を指す用語。
つまり、メッセージ駆動による非同期なメッセージパッシングを用いる事で大きく以下の4つを満たすのが狙いになります。
- 疎結合性を保証する。
- 明示的なメッセージングを行う為、各コンポーネントの管理が容易になる。
- 位置透過性により弾力性あるリソース管理が可能になる。
- 位置透過なメッセージングを使用する事で構成に共通項目が生まれコストを削減する事ができる。
位置透過性含め、この辺りは説明だけではイメージしづらいかと思いますので実装し、全体像を把握する事を強くお勧めします。
実装と解説
上の文章量が想像の5倍くらいになってしまったので駆け足で。
- CQRSについて
- Akkaについて
- サンプルプロジェクトの概要
- ソースと解説
CQRSについて
CQRS(Command Query Responsibility Segregation: コマンドクエリ責務分離)とは言葉通り
Command(書込)とQuery(読込)を分離し、スケーラビリティを確保する考えで、書込みを行う際、イベントという形でDBにデータを格納する イベントソーシングパターン(EventSourcing) とよく一緒に実装されます。
今記事での具体的なフローとしては、
- Command側がユーザーの書込みをイベントという形で書込用NoSQL(下図Cassandra)に書込む。
- Projectionがイベントを元に読込用RDBMS(下図Aurora)にデータを反映させる。
- Read APIがデータを取得する。
- ソースと解説
Akkaについて
ここではサンプルプロジェクトの実装、ひいてはAkkaを使用する上で理解しておくべきClusterSchardingについて解説します。
-
Cluster Sharding
ClusterSharding自体はクラスタリングにおける一つの考え方で、AkkaではAkka Clusterにて提供されている機能。ノード毎に各Actorに対応するShardを作成しShardにてActorを管理しメッセージはShardを介して送受信が行われる。
各Shard間でノード毎のActor数を指定の戦略で調整する事が出来る為、リアクティブ宣言で定義されている位置透過性を再現する事が可能であり、クラスターを構成するノードにてエラーが発生した際などにはActorのリバランス(再配置)が行われる。
また、今回解説するサンプルプロジェクトではレプリケーションを扱っていないが、ノードの接続が切れた時の対策としてAkka Persistenceが提供するレプリケーションの実装を推奨します。
サンプルプロジェクトの概要
今回はCQRSの全体像を把握する事を観点とする為、アプリケーション側は簡単なカウンターアプリの実装とさせていただきます。
また、ここではWriteAPI、Projectionの解説を行い、ReadAPIについてはDBにクエリを投げる事が出来れば特に指定は無いので今回解説は省きます。
WriteAPI 図
WriteAPI フローチャート
Projection 図
ソースと解説
ソースの全文はGitHubをご参照ください。
Domain
case class Counter(number: Int = 0) {
// カウントアップした値を持つカウンターを返却する。
def countUp(number: Int): Counter =
this.copy(number = this.number + number)
}
- ドメインは後々Actorにステートとして状態を保存し更新出来るようにしたいのでコピーを返すようにします。
Aggregate Protocol
// Serialize用に定義
trait CborSerializable
object CounterAggregateProtocol {
sealed trait CounterCommand extends CborSerializable
// ActorRef[_]::ask用のリプライ先のActorRefをコマンドに格納する。
final case class CountUp(id: String, n: Int)(val replyTo: ActorRef[CountUpReply]) extends CounterCommand
sealed trait CountUpReply extends CborSerializable
final case class CountUpSucceededReply(counter: Counter) extends CountUpReply
final case class CountUpFailedReply(e: Exception) extends CountUpReply
}
- Aggregateで送受信するメッセージを定義する。
- CborSerializableについては公式Documentを参照ください。
Aggregate
object CounterAggregate {
// ①
private def commandHandler(ctx: ActorContext[CounterAggregateProtocol.CounterCommand]):
(Counter, CounterAggregateProtocol.CounterCommand) => Effect[CounterEvent, Counter] =
(_, command) =>
command match {
case cmd@CounterAggregateProtocol.CountUp(id, n) =>
Effect.persist(CounterEvent.CountUpped(id, n)).thenReply(cmd.replyTo) { state =>
CounterAggregateProtocol.CountUpSucceededReply(state)
}
}
// ②
private def eventHandler: (Counter, CounterEvent) => Counter =
(state, event) =>
event match {
case CounterEvent.CountUpped(_, n) => state.countUp(n)
}
def apply(persistenceId: PersistenceId): Behavior[CounterAggregateProtocol.CounterCommand] =
Behaviors.setup { ctx =>
EventSourcedBehavior(
// ③
persistenceId = persistenceId,
// ④
emptyState = Counter(),
// コマンドハンドラとイベントハンドラを定義する。
commandHandler = this.commandHandler(ctx),
eventHandler = this.eventHandler)
// ⑤
.withTagger(_ => Set(CounterTags.Single))
// ⑥
.withRetention(RetentionCriteria.snapshotEvery(numberOfEvents = 10, keepNSnapshots = 3))
// ⑦
.onPersistFailure(SupervisorStrategy.restartWithBackoff(200.millis, 5.seconds, 0.1))
}
}
- ①コマンド受信時の処理を定義する。Effect::persistでイベントを永続化する。
- ②イベント受信時の処理を定義する。
- ③persitenceId...永続化する際のプライマリキー。参照する際はこの値を元にアクターを取得する。
- ④アクター開始のステートの値。空の状態を定義する。
- ⑤タグの定義。本番では単一のタグのみでの実装はおやめください。タグについてはこちら。
- ⑥スナップショットを作成する頻度を定義。
- ⑦SupervisorStrategy...障害時の振る舞いを定義。
Projection Handler
※ProjectionについてはWirteApi内で処理を行なっても構いませんが、今回は別プロジェクトに切り出しております。
final class CounterProjectionHandler(system: ActorSystem[_])
extends Handler[EventEnvelope[CounterEvent]] {
private implicit val ec: ExecutionContext = system.executionContext
override def process(envelope: EventEnvelope[CounterEvent]): Future[Done] = {
envelope.event match {
case CounterEvent.CountUpped(id, number) =>
val row = CounterRow(id, number)
// 読込用DBにイベントを反映させている。
CounterRepository.insert(row).map(_ => Done)
}
}
}
- イベント受信通知時の処理を定義する。
- 主に読込用DBにデータを保存したりKafkaなどにデータを渡す。
Projection
※ProjectionについてはWirteApi内で処理を行なっても構いませんが、今回は別プロジェクトに切り出しております。
object CounterProjection extends App {
def apply(): Behavior[String] = Behaviors.setup { context =>
implicit val system: ActorSystem[_] = context.system
implicit val ec: ExecutionContextExecutor = system.executionContext
val sourceProvider =
// ①
EventSourcedProvider
.eventsByTag[CounterEvent](
system,
readJournalPluginId = CassandraReadJournal.Identifier,
tag = CounterTags.Single)
val projection =
// ②
CassandraProjection
.atLeastOnce(
projectionId = ProjectionId("counters", CounterTags.Single),
sourceProvider,
handler = () => new CounterProjectionHandler(system))
.withSaveOffset(afterEnvelopes = 1, afterDuration = 500.millis)
// ③
ClusterSingleton(system).init(
SingletonActor(
ProjectionBehavior(projection),
projection.projectionId.id)
.withStopMessage(ProjectionBehavior.Stop))
Behaviors.empty
}
ActorSystem(CounterProjection(), "akkaCqrsCounter_projection")
StdIn.readLine()
}
- ①タグとReadJounalPluginを元にSourceを取得する。
- ②Projectionを設定する。
- ③今回はClusterSingletonを採用。
まとめ
最後は少し駆け足になってしまいましたが、記事については随時更新していこうかと思います。(ここまでで6時間くらいかかってるから許して。)
ここまで学習してきた所感として今回記事を作成した背景にもなってくるのですが、やはりAkkaのこの辺りは記事が少なくエンジニア歴の浅い自分としては学習がわりかし困難でした。
助言を下さった皆様には本当に感謝致します。
是非この記事が昔の自分を導くものになればと思います。
以上、ありがとうございました。