Go
golang
EventSourcing
CQRS

golangでcqrs+esにチャレンジしてみる 準備編

はじめに

 goでcqrs+esできないもんだろうか?と思ったのでできるとこからやってみた。
 あくまでチャレンジなので厳密なDDDやcqrsの定義は置いておきたい。特にDDDの値オブジェクトとかシリアル化したい場合、goではドメイン内で隠匿するのが面倒くさかったりする気がするのでそこは頑張らない。

準備

 とりあえず準備として共有して使う物をcommonパッケージとしてまとめてみる。

集約ルート

aggregate.go
package common

import (
    "sort"

    "github.com/google/uuid"
    "github.com/pkg/errors"
)

type AggregateContext interface {
    AggregateID() string
    StreamVersion() int64
    SetStreamVersion(int64)
    UncommittedEvents() []EventContext
    AppendUncommittedEvent(EventContext)
    CommitEvent(EventContext)
    CommittedEvents() []EventContext
    Replay([]*StoredEvent) error
}

func NewAggregateID() (string, error) {
    id, err := uuid.NewUUID()
    if err != nil {
        return "", errors.Wrap(err, "IDの生成に失敗しました")
    }

    return id.String(), nil
}

type AggregateBase struct {
    aggregateID         string
    streamVersion       int64
    uncommittedEventMap map[string]EventContext
    committedEventMap   map[string]EventContext
}

func NewAggregateBase(aggregateID string) *AggregateBase {
    return &AggregateBase{
        aggregateID:         aggregateID,
        streamVersion:       int64(0),
        uncommittedEventMap: make(map[string]EventContext),
        committedEventMap:   make(map[string]EventContext),
    }
}

func (a *AggregateBase) AggregateID() string {
    return a.aggregateID
}

func (a *AggregateBase) StreamVersion() int64 {
    return a.streamVersion
}

func (a *AggregateBase) SetStreamVersion(value int64) {
    a.streamVersion = value
}

func (a *AggregateBase) UncommittedEvents() []EventContext {
    if a.uncommittedEventMap == nil {
        a.uncommittedEventMap = make(map[string]EventContext)
    }
    results := make([]EventContext, 0, len(a.uncommittedEventMap))
    for _, event := range a.uncommittedEventMap {
        results = append(results, event)
    }
    sort.SliceStable(results, func(i, j int) bool {
        return results[i].GetOccurredOn() < results[j].GetOccurredOn()
    })
    return results
}

func (a *AggregateBase) AppendUncommittedEvent(event EventContext) {
    if a.uncommittedEventMap == nil {
        a.uncommittedEventMap = make(map[string]EventContext)
    }
    a.uncommittedEventMap[event.GetEventID()] = event
}

func (a *AggregateBase) CommittedEvents() []EventContext {
    if a.committedEventMap == nil {
        a.committedEventMap = make(map[string]EventContext)
    }
    results := make([]EventContext, 0, len(a.committedEventMap))
    for _, event := range a.committedEventMap {
        results = append(results, event)
    }
    sort.SliceStable(results, func(i, j int) bool {
        return results[i].GetOccurredOn() < results[j].GetOccurredOn()
    })
    return results
}

func (a *AggregateBase) CommitEvent(event EventContext) {
    if a.uncommittedEventMap == nil {
        a.uncommittedEventMap = make(map[string]EventContext)
    }
    delete(a.uncommittedEventMap, event.GetEventID())
    if a.committedEventMap == nil {
        a.committedEventMap = make(map[string]EventContext)
    }
    a.committedEventMap[event.GetEventID()] = event
}

 大まかに説明すると...

  • AggregateIDが一意なID。DDD的には集約毎に値オブジェクトにした方がいいんだろうなとは思う。
  • StreamVersionがイベントソーシングのバージョン管理で永続化されるとSetStreamVersionでインクリメントされる。
  • UncommittedEventsが適用前のイベントで、イベントがAppendUncommittedEventでここに突っ込まれる。
  • CommittedEventsが適用済のイベントで、永続化やリプレイ時にここに適用された際にCommitEventでイベントが入る。
  • Replayは永続化ストアから再生する時に呼ばれる。これは各集約ルートで実装。

イベント

event.go
package common

import (
    "github.com/google/uuid"
    "github.com/pkg/errors"
)

type EventContext interface {
    GetEventID() string
    GetEventType() string
    GetOccurredOn() int64
}

func NewEventID() (string, error) {
    id, err := uuid.NewUUID()
    if err != nil {
        return "", errors.Wrap(err, "IDの生成に失敗しました")
    }

    return id.String(), nil
}

 特に説明することもないけど、なんでインターフェースでGet付けてんのかというと実装時に名前が被るから。

永続化ストア

persistence.go
package common

type PersistenceContext interface {
    ReplayAggregate(a AggregateContext) error
    Save(a AggregateContext) error
}

type FakePersistence struct{}

func (p *FakePersistence) ReplayAggregate(a AggregateContext) error {
    storedEvents := make([]*StoredEvent, 0)
    return a.Replay(storedEvents)
}

func (p *FakePersistence) Save(a AggregateContext) error {
    for _, e := range a.UncommittedEvents() {
        /*
            storedEvent := &StoredEvent {
                AggregateID: a.AggregateID(),
                StreamVersion: a.StreamVersion(),
                OccurredOn: e.OccurredOn(),
                EventType: e.EventType(),
                Data: []byte(e),
            }

            db save stored event
        */

        a.CommitEvent(e)
    }

    return nil
}

type PersistenceQueryContext interface {
    QueryEvents(id string, base, limit int64) ([]*StoredEvent, error)
}

type FakePersistenceQuery struct{}

func (p *FakePersistenceQuery) QueryEvents(id string, base, limit int64) ([]*StoredEvent, error) {
    storedEvents := make([]*StoredEvent, 0)

    /*
        load stored events from db
    */

    return storedEvents, nil
}

type StoredEvent struct {
    AggregateID   string
    StreamVersion int64
    OccurredOn    int64
    EventType     string
    Data          []byte
}

 Fakeが中々苦しい。実際はCassandra(gocql)でやってみたりしてますが、そこまで書くと長くなりすぎるのでFakeで割愛。

  • FakePersistenceをコマンド側が利用し、集約ルートの永続化やリプレイを行います。
  • FakePersistenceQueryをクエリ側が利用し、指定された範囲(baseとlimit)のバージョンのイベントを取得できるようにします。
  • goではC#や、Javaのようにblobデータから単純にstructに戻せないのでStoredEventにラップして、再生時はその中のEventType(構造体名)を見てイベントを元のstructに戻します。

メッセージング

messaging.go
package common

import (
    "context"

    "github.com/google/uuid"
    "github.com/pkg/errors"
)

type MessageContext interface {
    GetMessageID() string
    GetMessageType() string
}

func NewMessageID() (string, error) {
    id, err := uuid.NewUUID()
    if err != nil {
        return "", errors.Wrap(err, "IDの生成に失敗しました")
    }

    return id.String(), nil
}

type MessagingProducerContext interface {
    Publish(m MessageContext) error
}

type FakeMessagingProducer struct{}

func (p *FakeMessagingProducer) Publish(m MessageContext) error {
    /*
        create message for messaging system
        message header = m.MessageType()
        message body = []byte(m)
        messaging system send message
    */

    return nil
}

type MessagingConsumerContext interface {
    Consume(ctx context.Context, msg chan<- MessageContext) error
}

type FakeMessagingConsumer struct{}

func (c *FakeMessagingConsumer) Consume(ctx context.Context, msg chan<- MessageContext) error {
    /*
        for {
            select {
            case <-ctx.Done():
                return ctx.Err()
            default:
                delivery = from messaging system
                msg <- delivery to MessageContext
            }
        }
    */

    return nil
}

 これまたFakeが苦しいですが、こっちも実際にはRabbitmq(amqp)を使ってみたりしてます。CosumeにChannelを入れてるのはそのためです。

  • FakeMessagingProducerが送信側です。
  • FakeMessagingConsumerが受信側です。Channelを使って受け待ちします。
  • こちらもstructを再生するためにヘッダー等にMessageType(構造体名)と入れます。

おわりに

 とりあえず今回は準備編としてここまで。
 何かFake周りのでせいで実装できるか不安になってきたのと、何かコード自体が重苦しいのでもっとgoらしく軽い感じにできんかなぁと思ったりもします・・・。