いろいろ勉強したいので勉強のためのプラットフォームを作ってみてる。
特に一番実践したいのが以下の記事のような EventSourcing で構築されたシステム。
event をメッセージブローカーに入れて、各マイクロサービスは最新情報を必要としない限りは、各サービスが必要なデータをメッセージブローカーからマテリアライズして(ここではKTable)参照することでサービスの独立性を高めるような設計。
あとは Go に慣れるため。
こんな感じの slack のまがい物を作ってる
DI的なお作法無知すぎて main.go が悲惨なことになってる。
全体図
とりあえず今の段階の構成図
chat service
command project
普通の Rest API
chatの更新/編集/削除
イベントの保存
一旦すべてのイベントをRDBのイベントテーブルに保存、同じトランザクションでoutboxテーブルにも保存
RDB を Single Source of Truth 扱いにしている。しかし今後は Kafka に情報源を任せたい。
Kafka へイベントのpublish
イベントの保存が成功したら Hub を経由して goroutine 経由で outbox processor を呼び出す
outbox processor は RDB の outbox テーブルから Kafka へ未連携のデータを取得し、Kafka へ publish
Kafka への publish が成功したら RDB の outbox テーブルからレコードを削除
WebSocket project
Kafka subscribe
Kafka を subscribe して、Hub に通知、Broadcat する
Kafka の subscriber が Hub を経由するのは、ws の接続数だけ Kafka のクライアントを作りたくなくて ws 用に 1 つにしたかったから。
server push
Hub から通知を受信したら、ws 経由でサーバープッシュし、リアルタイムで画面に最新情報を反映
query project
ReadModel の構築
Kafka を subscribe して、query ようの ReadModel を構築
query endpoint
画面アクセスのタイミングで Redis から最新 chat 一覧を取得し画面に表示。一度最新を表示できたらその後は WebSocket 経由で最新情報に更新する
今は ReadModel を Redis だけに構築しているが、本当はちゃんと永続化考えなければ。
Redis
クエリのためだけのデータストア
-
chat:${chatId}(string)- chat_id をキーにして、string型で json formatted な chat を保存
- 最初は Hash 型で持たせようとしていたが、Redis の Hash型には MGet 操作がなかったため N+1 をしたくなくて string型に
-
chats:${roomId}(zset)- chatId の sorted sets
- room 毎の chatId をソートしてアプリが持てるようにするため
- 指定された room のソートした chatIds を取得し、取得した chatIds をベースに
chat:${chatId}から MGet することで対象の chat を一括取得
-
chats:${chatId}:history(list)- chat 毎の編集履歴を保持する
-
chat:${chatId}は最新データのみを上書きして持ち、chats:${chatId}:historyは List型で保持し、編集のたびに LPush していく
-
- RDB であれば
chats:${chatId}:history相当のテーブルさえ用意しておけば最新 chat の一覧のみの取得も SQL で表現できそうではあるが、Redis では厳しそう -
chat:${chatId}とchats:${chatId}:historyで両方に最新の chat を保持することになってしまうが、 ReadModel だし Redis なので良いとする
- chat 毎の編集履歴を保持する
room service
ToDo これから作る
多分 chat とは別の境界づけられたコンテキストにしそう。
room には認証認可の機能があって複雑にはなりそう。
chat 登録時に、room と ユーザーの Validation も必要になってくる。
従来であれば
- chat service -(post:chat)> room service
- 登録前の chat情報を post し、room service に対して room と user の validation を依頼する
- chat service -(get:roomId)> room service
- chat service が room service から roomId をキーに所属メンバーを問い合わせ
- chat service が room service から取得した情報をもとに validation 実施
- chat service -(get:userId)> user service (まだない)
- 2番目と同じような感じ。取得のキーが userId になりそうな感じ
がよくあるパターンだった気がする。
今回やりたいのは、
- room にユーザーが所属したときに、所属イベントを Kafka に publish
- chat service が Kafka Streams でイベントを購読し、 KTable にマテリアライズ
- chat 登録時は KTable を参照して Validation
- ユーザーの room 所属と、新規ユーザーの chat ポストは厳密なリアルタイム性は多分なくて大丈夫そうだし

