#はじめに
goでcqrs+esできないもんだろうか?と思ったのでできるとこからやってみた。
あくまでチャレンジなので厳密なDDDやcqrsの定義は置いておきたい。特にDDDの値オブジェクトとかシリアル化したい場合、goではドメイン内で隠匿するのが面倒くさかったりする気がするのでそこは頑張らない。
#準備
とりあえず準備として共有して使う物をcommonパッケージとしてまとめてみる。
集約ルート
aggregate.go
package common
import (
"sort"
"github.com/google/uuid"
"github.com/pkg/errors"
)
type AggregateContext interface {
AggregateID() string
StreamVersion() int64
SetStreamVersion(int64)
UncommittedEvents() []EventContext
AppendUncommittedEvent(EventContext)
CommitEvent(EventContext)
CommittedEvents() []EventContext
Replay([]*StoredEvent) error
}
func NewAggregateID() (string, error) {
id, err := uuid.NewUUID()
if err != nil {
return "", errors.Wrap(err, "IDの生成に失敗しました")
}
return id.String(), nil
}
type AggregateBase struct {
aggregateID string
streamVersion int64
uncommittedEventMap map[string]EventContext
committedEventMap map[string]EventContext
}
func NewAggregateBase(aggregateID string) *AggregateBase {
return &AggregateBase{
aggregateID: aggregateID,
streamVersion: int64(0),
uncommittedEventMap: make(map[string]EventContext),
committedEventMap: make(map[string]EventContext),
}
}
func (a *AggregateBase) AggregateID() string {
return a.aggregateID
}
func (a *AggregateBase) StreamVersion() int64 {
return a.streamVersion
}
func (a *AggregateBase) SetStreamVersion(value int64) {
a.streamVersion = value
}
func (a *AggregateBase) UncommittedEvents() []EventContext {
if a.uncommittedEventMap == nil {
a.uncommittedEventMap = make(map[string]EventContext)
}
results := make([]EventContext, 0, len(a.uncommittedEventMap))
for _, event := range a.uncommittedEventMap {
results = append(results, event)
}
sort.SliceStable(results, func(i, j int) bool {
return results[i].GetOccurredOn() < results[j].GetOccurredOn()
})
return results
}
func (a *AggregateBase) AppendUncommittedEvent(event EventContext) {
if a.uncommittedEventMap == nil {
a.uncommittedEventMap = make(map[string]EventContext)
}
a.uncommittedEventMap[event.GetEventID()] = event
}
func (a *AggregateBase) CommittedEvents() []EventContext {
if a.committedEventMap == nil {
a.committedEventMap = make(map[string]EventContext)
}
results := make([]EventContext, 0, len(a.committedEventMap))
for _, event := range a.committedEventMap {
results = append(results, event)
}
sort.SliceStable(results, func(i, j int) bool {
return results[i].GetOccurredOn() < results[j].GetOccurredOn()
})
return results
}
func (a *AggregateBase) CommitEvent(event EventContext) {
if a.uncommittedEventMap == nil {
a.uncommittedEventMap = make(map[string]EventContext)
}
delete(a.uncommittedEventMap, event.GetEventID())
if a.committedEventMap == nil {
a.committedEventMap = make(map[string]EventContext)
}
a.committedEventMap[event.GetEventID()] = event
}
大まかに説明すると...
- AggregateIDが一意なID。DDD的には集約毎に値オブジェクトにした方がいいんだろうなとは思う。
- StreamVersionがイベントソーシングのバージョン管理で永続化されるとSetStreamVersionでインクリメントされる。
- UncommittedEventsが適用前のイベントで、イベントがAppendUncommittedEventでここに突っ込まれる。
- CommittedEventsが適用済のイベントで、永続化やリプレイ時にここに適用された際にCommitEventでイベントが入る。
- Replayは永続化ストアから再生する時に呼ばれる。これは各集約ルートで実装。
イベント
event.go
package common
import (
"github.com/google/uuid"
"github.com/pkg/errors"
)
type EventContext interface {
GetEventID() string
GetEventType() string
GetOccurredOn() int64
}
func NewEventID() (string, error) {
id, err := uuid.NewUUID()
if err != nil {
return "", errors.Wrap(err, "IDの生成に失敗しました")
}
return id.String(), nil
}
特に説明することもないけど、なんでインターフェースでGet付けてんのかというと実装時に名前が被るから。
永続化ストア
persistence.go
package common
type PersistenceContext interface {
ReplayAggregate(a AggregateContext) error
Save(a AggregateContext) error
}
type FakePersistence struct{}
func (p *FakePersistence) ReplayAggregate(a AggregateContext) error {
storedEvents := make([]*StoredEvent, 0)
return a.Replay(storedEvents)
}
func (p *FakePersistence) Save(a AggregateContext) error {
for _, e := range a.UncommittedEvents() {
/*
storedEvent := &StoredEvent {
AggregateID: a.AggregateID(),
StreamVersion: a.StreamVersion(),
OccurredOn: e.OccurredOn(),
EventType: e.EventType(),
Data: []byte(e),
}
db save stored event
*/
a.CommitEvent(e)
}
return nil
}
type PersistenceQueryContext interface {
QueryEvents(id string, base, limit int64) ([]*StoredEvent, error)
}
type FakePersistenceQuery struct{}
func (p *FakePersistenceQuery) QueryEvents(id string, base, limit int64) ([]*StoredEvent, error) {
storedEvents := make([]*StoredEvent, 0)
/*
load stored events from db
*/
return storedEvents, nil
}
type StoredEvent struct {
AggregateID string
StreamVersion int64
OccurredOn int64
EventType string
Data []byte
}
Fakeが中々苦しい。実際はCassandra(gocql)でやってみたりしてますが、そこまで書くと長くなりすぎるのでFakeで割愛。
- FakePersistenceをコマンド側が利用し、集約ルートの永続化やリプレイを行います。
- FakePersistenceQueryをクエリ側が利用し、指定された範囲(baseとlimit)のバージョンのイベントを取得できるようにします。
- goではC#や、Javaのようにblobデータから単純にstructに戻せないのでStoredEventにラップして、再生時はその中のEventType(構造体名)を見てイベントを元のstructに戻します。
メッセージング
messaging.go
package common
import (
"context"
"github.com/google/uuid"
"github.com/pkg/errors"
)
type MessageContext interface {
GetMessageID() string
GetMessageType() string
}
func NewMessageID() (string, error) {
id, err := uuid.NewUUID()
if err != nil {
return "", errors.Wrap(err, "IDの生成に失敗しました")
}
return id.String(), nil
}
type MessagingProducerContext interface {
Publish(m MessageContext) error
}
type FakeMessagingProducer struct{}
func (p *FakeMessagingProducer) Publish(m MessageContext) error {
/*
create message for messaging system
message header = m.MessageType()
message body = []byte(m)
messaging system send message
*/
return nil
}
type MessagingConsumerContext interface {
Consume(ctx context.Context, msg chan<- MessageContext) error
}
type FakeMessagingConsumer struct{}
func (c *FakeMessagingConsumer) Consume(ctx context.Context, msg chan<- MessageContext) error {
/*
for {
select {
case <-ctx.Done():
return ctx.Err()
default:
delivery = from messaging system
msg <- delivery to MessageContext
}
}
*/
return nil
}
これまたFakeが苦しいですが、こっちも実際にはRabbitmq(amqp)を使ってみたりしてます。CosumeにChannelを入れてるのはそのためです。
- FakeMessagingProducerが送信側です。
- FakeMessagingConsumerが受信側です。Channelを使って受け待ちします。
- こちらもstructを再生するためにヘッダー等にMessageType(構造体名)と入れます。
おわりに
とりあえず今回は準備編としてここまで。
何かFake周りのでせいで実装できるか不安になってきたのと、何かコード自体が重苦しいのでもっとgoらしく軽い感じにできんかなぁと思ったりもします・・・。