はじめに
前回、下準備したものの続きです。
今回は下準備したものを使ってTodoアプリっぽく実装してみます。ついでにAkkaやProto.Actorをヒントにアクターシステムっぽくしてみたいと思います。コマンド側とクエリー側それぞれのアクターがコマンド処理やメッセージ処理を行います。
前回からの変更
前回のを使ってと言いながらいきなり変更します。だって前回のFake永続化、Fakeメッセージングでは実装できなかったんですもの・・・。
package common
import (
"encoding/json"
"sort"
"go.uber.org/zap"
)
type InMemoryDB struct {
data map[string][]*StoredEvent
logger *zap.SugaredLogger
}
func NewInMemoryDB(logger *zap.SugaredLogger) *InMemoryDB {
return &InMemoryDB{
data: make(map[string][]*StoredEvent),
logger: logger,
}
}
func (db *InMemoryDB) GetByID(id string) []*StoredEvent {
results := make([]*StoredEvent, 0)
results = append(results, db.data[id]...)
sort.SliceStable(results, func(i, j int) bool {
switch {
case results[i].StreamVersion < results[j].StreamVersion:
return true
case results[i].StreamVersion > results[j].StreamVersion:
return false
default:
return results[i].OccurredOn < results[j].OccurredOn
}
})
return results
}
func (db *InMemoryDB) Save(e *StoredEvent) {
db.data[e.AggregateID] = append(db.data[e.AggregateID], e)
}
type PersistenceContext interface {
ReplayAggregate(a AggregateContext) error
Save(a AggregateContext) error
}
type FakePersistence struct {
db *InMemoryDB
logger *zap.SugaredLogger
}
func NewFakePersistence(db *InMemoryDB, logger *zap.SugaredLogger) *FakePersistence {
return &FakePersistence{
db: db,
logger: logger,
}
}
func (p *FakePersistence) ReplayAggregate(a AggregateContext) error {
storedEvents := p.db.GetByID(a.AggregateID())
if err := a.Replay(storedEvents); err != nil {
return err
}
return nil
}
func (p *FakePersistence) Save(a AggregateContext) error {
for _, e := range a.UncommittedEvents() {
d, err := json.Marshal(e)
if err != nil {
return err
}
storedEvent := &StoredEvent{
AggregateID: a.AggregateID(),
StreamVersion: a.StreamVersion(),
OccurredOn: e.GetOccurredOn(),
EventType: e.GetEventType(),
Data: d,
}
p.db.Save(storedEvent)
a.CommitEvent(e)
}
a.SetStreamVersion(a.StreamVersion() + 1)
return nil
}
type PersistenceQueryContext interface {
QueryEvents(id string, base, limit int64) ([]*StoredEvent, error)
}
type FakePersistenceQuery struct {
db *InMemoryDB
logger *zap.SugaredLogger
}
func NewFakePersistenceQuery(db *InMemoryDB, logger *zap.SugaredLogger) *FakePersistenceQuery {
return &FakePersistenceQuery{
db: db,
logger: logger,
}
}
func (p *FakePersistenceQuery) QueryEvents(id string, base, limit int64) ([]*StoredEvent, error) {
results := make([]*StoredEvent, 0)
storedEvents := p.db.GetByID(id)
for _, storedEvent := range storedEvents {
if storedEvent.StreamVersion >= base && storedEvent.StreamVersion <= limit {
results = append(results, storedEvent)
}
}
return results, nil
}
type StoredEvent struct {
AggregateID string
StreamVersion int64
OccurredOn int64
EventType string
Data []byte
}
package common
import (
"context"
"encoding/json"
"github.com/google/uuid"
"github.com/pkg/errors"
"go.uber.org/zap"
)
type InMemoryMessage struct {
Header string
Data []byte
}
type MessageContext interface {
GetMessageID() string
GetMessageType() string
}
func NewMessageID() (string, error) {
id, err := uuid.NewUUID()
if err != nil {
return "", errors.Wrap(err, "IDの生成に失敗しました")
}
return id.String(), nil
}
type MessagingProducerContext interface {
Publish(m MessageContext) error
}
type FakeMessagingProducer struct {
channel chan<- *InMemoryMessage
logger *zap.SugaredLogger
}
func NewFakeMessagingProducer(ch chan<- *InMemoryMessage, logger *zap.SugaredLogger) *FakeMessagingProducer {
return &FakeMessagingProducer{
channel: ch,
logger: logger,
}
}
func (p *FakeMessagingProducer) Publish(m MessageContext) error {
d, err := json.Marshal(m)
if err != nil {
return err
}
msg := &InMemoryMessage{
Header: m.GetMessageType(),
Data: d,
}
p.channel <- msg
p.logger.Infow("publish message", "message", msg)
return nil
}
type MessagingConsumerContext interface {
Consume(ctx context.Context, msg chan<- *InMemoryMessage) error
}
type FakeMessagingConsumer struct {
channel <-chan *InMemoryMessage
logger *zap.SugaredLogger
}
func NewFakeMessagingConsumer(ch <-chan *InMemoryMessage, logger *zap.SugaredLogger) *FakeMessagingConsumer {
return &FakeMessagingConsumer{
channel: ch,
logger: logger,
}
}
func (c *FakeMessagingConsumer) Consume(ctx context.Context, msg chan<- *InMemoryMessage) error {
for {
select {
case <-ctx.Done():
return ctx.Err()
case m := <-c.channel:
msg <- m
c.logger.Infow("consume message", "message", m)
}
}
}
デバッグ用にロガーを突っ込んでますが、気にしないでください。Mapを利用したインメモリーデータベースを用意し、そこにイベントを集積します。メッセージングはチャネルを使ってインメモリーで通信します。PubSub(1対多送信)ではなくなってますが今回はこれで通します。
コマンド側
cqrsのコマンド側をcommandパッケージに準備します。
まずは集約ルートとしてTodoを定義します。
package command
import (
"errors"
"github.com/lightstaff/go-dddcqrses/common"
"github.com/lightstaff/go-dddcqrses/events"
)
type Todo struct {
*common.AggregateBase
message string
completed bool
}
func NewTodo(id string) *Todo {
return &Todo{
AggregateBase: common.NewAggregateBase(id),
}
}
func (t *Todo) Message() string {
return t.message
}
func (t *Todo) Completed() bool {
return t.completed
}
func (t *Todo) RaiseEvent(e common.EventContext, n bool) error {
switch e := e.(type) {
case *events.TodoRegistered:
t.message = e.Message
t.completed = e.Completed
case *events.TodoMessageChanged:
t.message = e.Message
case *events.TodoCompleted:
t.completed = e.Completed
default:
return errors.New("unknown event")
}
if n {
t.AppendUncommittedEvent(e)
}
return nil
}
func (t *Todo) Replay(storedEvents []*common.StoredEvent) error {
for _, storedEvent := range storedEvents {
e, err := events.EventConverter(storedEvent.EventType, storedEvent.Data)
if err != nil {
return err
}
if err := t.RaiseEvent(e, false); err != nil {
return err
}
t.CommitEvent(e)
t.SetStreamVersion(storedEvent.StreamVersion)
}
return nil
}
ドメイン貧血症感満載ですが、サンプルだからよしとします。
前回定義した「AggregateBase」を埋め込み、そちらに定義していなかった「Repaly」メソッドを実装しています。「RaiseEvent」でイベントが反映される感じです。
続いてTodoを操作するアクターを定義します。
package command
import (
"errors"
"go.uber.org/zap"
"github.com/lightstaff/go-dddcqrses/common"
"github.com/lightstaff/go-dddcqrses/events"
"github.com/lightstaff/go-dddcqrses/messages"
)
type (
TodoRegistry struct {
Message string
Completed bool
}
TodoMessageChange struct {
AggregateID string
Message string
}
TodoComplete struct {
AggregateID string
Completed bool
}
)
type TodoActor struct {
persistence common.PersistenceContext
producer common.MessagingProducerContext
logger *zap.SugaredLogger
}
func NewTodoActor(persistence common.PersistenceContext, producer common.MessagingProducerContext, logger *zap.SugaredLogger) *TodoActor {
return &TodoActor{
persistence: persistence,
producer: producer,
logger: logger,
}
}
func (t *TodoActor) Act(command interface{}) error {
switch command := command.(type) {
case *TodoRegistry:
aggregateID, err := common.NewAggregateID()
if err != nil {
return err
}
entity := NewTodo(aggregateID)
e, err := events.NewTodoRegistered(aggregateID, command.Message, command.Completed)
if err != nil {
return err
}
if err := entity.RaiseEvent(e, true); err != nil {
return err
}
if err := t.persistence.Save(entity); err != nil {
return err
}
m, err := messages.NewTodoEventOccurred(entity.AggregateID(), entity.StreamVersion())
if err != nil {
return err
}
if err := t.producer.Publish(m); err != nil {
return err
}
case *TodoMessageChange:
entity := NewTodo(command.AggregateID)
e, err := events.NewTodoMessageChanged(command.AggregateID, command.Message)
if err != nil {
return err
}
if err := entity.RaiseEvent(e, true); err != nil {
return err
}
if err := t.persistence.Save(entity); err != nil {
return err
}
m, err := messages.NewTodoEventOccurred(entity.AggregateID(), entity.StreamVersion())
if err != nil {
return err
}
if err := t.producer.Publish(m); err != nil {
return err
}
case *TodoComplete:
entity := NewTodo(command.AggregateID)
e, err := events.NewTodoCompleted(command.AggregateID, command.Completed)
if err != nil {
return err
}
if err := entity.RaiseEvent(e, true); err != nil {
return err
}
if err := t.persistence.Save(entity); err != nil {
return err
}
m, err := messages.NewTodoEventOccurred(entity.AggregateID(), entity.StreamVersion())
if err != nil {
return err
}
if err := t.producer.Publish(m); err != nil {
return err
}
}
return errors.New("unknown command")
}
頭で定義された各コマンドによって処理を分岐しています。「TodoRegistry」コマンドで作成、「TodoMessageChange」コマンでTodoのメッセージを変更。「TodoComplete」コマンドでTodoを完了させます。Proto.Actorのアクターはもっといろいろ管理させたりもできますが、今回はそこまでやりません。
クエリー側
cqrsのクエリー側をqueryパッケージに準備します。
DTOの永続化もインメモリーで定義します。
package query
import "go.uber.org/zap"
type QueryDBContext interface {
FindByID(id string) *TodoQuery
Save(entity *TodoQuery)
}
type FakeQueryDB struct {
data map[string]*TodoQuery
logger *zap.SugaredLogger
}
func NewFakeQueryDB(logger *zap.SugaredLogger) *FakeQueryDB {
return &FakeQueryDB{
data: make(map[string]*TodoQuery),
logger: logger,
}
}
func (db *FakeQueryDB) FindByID(id string) *TodoQuery {
if entity, ok := db.data[id]; ok {
return entity
}
return nil
}
func (db *FakeQueryDB) Save(entity *TodoQuery) {
db.data[entity.AggregateID] = entity
}
特にコメント無し。
次にDTOとして「TodoQuery」モデルを定義します。
package query
import (
"github.com/lightstaff/go-dddcqrses/common"
"github.com/lightstaff/go-dddcqrses/events"
)
type TodoQuery struct {
AggregateID string
Message string
Completed bool
StreamVersion int64
}
func (t *TodoQuery) ApplyEvent(e common.EventContext) error {
switch e := e.(type) {
case *events.TodoRegistered:
t.Message = e.Message
t.Completed = e.Completed
case *events.TodoMessageChanged:
t.Message = e.Message
case *events.TodoCompleted:
t.Completed = e.Completed
}
return nil
}
「ApplyEvent」メソッドは下のアクターによって呼ばれます。
こちらはクエリー側のアクターです。
package query
import (
"github.com/lightstaff/go-dddcqrses/common"
"github.com/lightstaff/go-dddcqrses/events"
"github.com/lightstaff/go-dddcqrses/messages"
"go.uber.org/zap"
)
type TodoActor struct {
persistenceQuery common.PersistenceQueryContext
queryDB QueryDBContext
logger *zap.SugaredLogger
}
func NewTodoActor(persistenceQuery common.PersistenceQueryContext, queryDB QueryDBContext, logger *zap.SugaredLogger) *TodoActor {
return &TodoActor{
persistenceQuery: persistenceQuery,
queryDB: queryDB,
logger: logger,
}
}
func (t *TodoActor) Act(msg *messages.TodoEventOccurred) error {
target := t.queryDB.FindByID(msg.AggregateID)
if target == nil {
target = &TodoQuery{
AggregateID: msg.AggregateID,
}
}
storedEvents, err := t.persistenceQuery.QueryEvents(msg.AggregateID, target.StreamVersion, msg.StreamVersion)
if err != nil {
return err
}
for _, storedEvent := range storedEvents {
e, err := events.EventConverter(storedEvent.EventType, storedEvent.Data)
if err != nil {
return err
}
target.ApplyEvent(e)
target.StreamVersion = storedEvent.StreamVersion
t.logger.Infow("apply event", "event", e)
}
t.queryDB.Save(target)
return nil
}
アクターはメッセージを受け取り、永続化ストアのクエリから取得したイベントをDTOに流し、保存します。
メイン
最後にそれぞれを動作させるエントリポイントとしてmainを定義します。
package main
import (
"context"
"os"
"os/signal"
"github.com/lightstaff/go-dddcqrses/command"
"github.com/lightstaff/go-dddcqrses/common"
"github.com/lightstaff/go-dddcqrses/messages"
"github.com/lightstaff/go-dddcqrses/query"
"go.uber.org/zap"
)
func main() {
logger, err := zap.NewProduction()
if err != nil {
panic(err)
}
defer logger.Sync()
sugar := logger.Sugar()
inMemoryDB := common.NewInMemoryDB(sugar)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
delivery := make(chan *common.InMemoryMessage)
errc := make(chan error)
go func() {
persistence := common.NewFakePersistence(inMemoryDB, sugar)
producer := common.NewFakeMessagingProducer(delivery, sugar)
commandActor := command.NewTodoActor(persistence, producer, sugar)
commandActor.Act(&command.TodoRegistry{
Message: "test message",
Completed: false,
})
sugar.Info("command actor action completed")
}()
go func() {
defer close(delivery)
persistenceQuery := common.NewFakePersistenceQuery(inMemoryDB, sugar)
consumer := common.NewFakeMessagingConsumer(delivery, sugar)
queryDB := query.NewFakeQueryDB(sugar)
queryActor := query.NewTodoActor(persistenceQuery, queryDB, sugar)
go func() {
if err := consumer.Consume(ctx, delivery); err != nil {
errc <- err
}
}()
for {
select {
case <-ctx.Done():
sugar.Info("call context cancel")
return
case m := <-delivery:
msg, err := messages.MessageConveter(m.Header, m.Data)
if err != nil {
errc <- err
}
sugar.Infow("receive message", "message", msg)
switch msg := msg.(type) {
case *messages.TodoEventOccurred:
if err := queryActor.Act(msg); err != nil {
errc <- err
}
sugar.Info("query actor action completed")
sugar.Infow("now todo", "todo", queryDB.FindByID(msg.AggregateID))
}
}
}
}()
go func() {
if err := <-errc; err != nil {
sugar.Errorw("error happend", "error", err)
cancel()
}
}()
quit := make(chan os.Signal, 1)
signal.Notify(quit, os.Interrupt)
<-quit
}
今回はインメモリーで永続化やメッセージングをしているのでコマンドとクエリーを分割できませんが、そのへんを別の機構で実装すればそれぞれを独立することが可能なはずです。
Proto.Actorのリモート周りを使うのも手ですが、protocol-bufferの知識があった方が良いのとホスト側でリモートのインスタンスを立てなきゃいけないので今回は見送りました。
おわりに
何とかかんとか実装してみたけど、cqrs(DDD含めて)+esとgolangの相性が良いかと言われると微妙な気がする。C#やらJava(Scala)に比べて抽象化はしずらいし(埋め込みでは解決できないこともある)、諸々の命名がいまいち長ったらしくなってしまってgolang感が・・・。
成果物:https://github.com/lightstaff/go-dddcqrses
参考資料
- エヴァンス本
- 実践ドメイン駆動設計
- .NETのエンタープライズアプリケーションアーキテクチャ第2版
- Akka
- Proto.Actor