LoginSignup
3
3

More than 5 years have passed since last update.

golangでcqrs+esにチャレンジしてみる 実装編

Last updated at Posted at 2017-11-28

はじめに

 前回、下準備したものの続きです。
 今回は下準備したものを使ってTodoアプリっぽく実装してみます。ついでにAkkaProto.Actorをヒントにアクターシステムっぽくしてみたいと思います。コマンド側とクエリー側それぞれのアクターがコマンド処理やメッセージ処理を行います。

前回からの変更

 前回のを使ってと言いながらいきなり変更します。だって前回のFake永続化、Fakeメッセージングでは実装できなかったんですもの・・・。

persistence.go
package common

import (
    "encoding/json"
    "sort"

    "go.uber.org/zap"
)

type InMemoryDB struct {
    data   map[string][]*StoredEvent
    logger *zap.SugaredLogger
}

func NewInMemoryDB(logger *zap.SugaredLogger) *InMemoryDB {
    return &InMemoryDB{
        data:   make(map[string][]*StoredEvent),
        logger: logger,
    }
}

func (db *InMemoryDB) GetByID(id string) []*StoredEvent {
    results := make([]*StoredEvent, 0)
    results = append(results, db.data[id]...)
    sort.SliceStable(results, func(i, j int) bool {
        switch {
        case results[i].StreamVersion < results[j].StreamVersion:
            return true
        case results[i].StreamVersion > results[j].StreamVersion:
            return false
        default:
            return results[i].OccurredOn < results[j].OccurredOn
        }
    })
    return results
}

func (db *InMemoryDB) Save(e *StoredEvent) {
    db.data[e.AggregateID] = append(db.data[e.AggregateID], e)
}

type PersistenceContext interface {
    ReplayAggregate(a AggregateContext) error
    Save(a AggregateContext) error
}

type FakePersistence struct {
    db     *InMemoryDB
    logger *zap.SugaredLogger
}

func NewFakePersistence(db *InMemoryDB, logger *zap.SugaredLogger) *FakePersistence {
    return &FakePersistence{
        db:     db,
        logger: logger,
    }
}

func (p *FakePersistence) ReplayAggregate(a AggregateContext) error {
    storedEvents := p.db.GetByID(a.AggregateID())
    if err := a.Replay(storedEvents); err != nil {
        return err
    }
    return nil
}

func (p *FakePersistence) Save(a AggregateContext) error {
    for _, e := range a.UncommittedEvents() {
        d, err := json.Marshal(e)
        if err != nil {
            return err
        }
        storedEvent := &StoredEvent{
            AggregateID:   a.AggregateID(),
            StreamVersion: a.StreamVersion(),
            OccurredOn:    e.GetOccurredOn(),
            EventType:     e.GetEventType(),
            Data:          d,
        }
        p.db.Save(storedEvent)
        a.CommitEvent(e)
    }
    a.SetStreamVersion(a.StreamVersion() + 1)
    return nil
}

type PersistenceQueryContext interface {
    QueryEvents(id string, base, limit int64) ([]*StoredEvent, error)
}

type FakePersistenceQuery struct {
    db     *InMemoryDB
    logger *zap.SugaredLogger
}

func NewFakePersistenceQuery(db *InMemoryDB, logger *zap.SugaredLogger) *FakePersistenceQuery {
    return &FakePersistenceQuery{
        db:     db,
        logger: logger,
    }
}

func (p *FakePersistenceQuery) QueryEvents(id string, base, limit int64) ([]*StoredEvent, error) {
    results := make([]*StoredEvent, 0)
    storedEvents := p.db.GetByID(id)
    for _, storedEvent := range storedEvents {
        if storedEvent.StreamVersion >= base && storedEvent.StreamVersion <= limit {
            results = append(results, storedEvent)
        }
    }
    return results, nil
}

type StoredEvent struct {
    AggregateID   string
    StreamVersion int64
    OccurredOn    int64
    EventType     string
    Data          []byte
}
messaging.go
package common

import (
    "context"
    "encoding/json"

    "github.com/google/uuid"
    "github.com/pkg/errors"
    "go.uber.org/zap"
)

type InMemoryMessage struct {
    Header string
    Data   []byte
}

type MessageContext interface {
    GetMessageID() string
    GetMessageType() string
}

func NewMessageID() (string, error) {
    id, err := uuid.NewUUID()
    if err != nil {
        return "", errors.Wrap(err, "IDの生成に失敗しました")
    }

    return id.String(), nil
}

type MessagingProducerContext interface {
    Publish(m MessageContext) error
}

type FakeMessagingProducer struct {
    channel chan<- *InMemoryMessage
    logger  *zap.SugaredLogger
}

func NewFakeMessagingProducer(ch chan<- *InMemoryMessage, logger *zap.SugaredLogger) *FakeMessagingProducer {
    return &FakeMessagingProducer{
        channel: ch,
        logger:  logger,
    }
}

func (p *FakeMessagingProducer) Publish(m MessageContext) error {
    d, err := json.Marshal(m)
    if err != nil {
        return err
    }
    msg := &InMemoryMessage{
        Header: m.GetMessageType(),
        Data:   d,
    }
    p.channel <- msg
    p.logger.Infow("publish message", "message", msg)
    return nil
}

type MessagingConsumerContext interface {
    Consume(ctx context.Context, msg chan<- *InMemoryMessage) error
}

type FakeMessagingConsumer struct {
    channel <-chan *InMemoryMessage
    logger  *zap.SugaredLogger
}

func NewFakeMessagingConsumer(ch <-chan *InMemoryMessage, logger *zap.SugaredLogger) *FakeMessagingConsumer {
    return &FakeMessagingConsumer{
        channel: ch,
        logger:  logger,
    }
}

func (c *FakeMessagingConsumer) Consume(ctx context.Context, msg chan<- *InMemoryMessage) error {
    for {
        select {
        case <-ctx.Done():
            return ctx.Err()
        case m := <-c.channel:
            msg <- m
            c.logger.Infow("consume message", "message", m)
        }
    }
}

 
 デバッグ用にロガーを突っ込んでますが、気にしないでください。Mapを利用したインメモリーデータベースを用意し、そこにイベントを集積します。メッセージングはチャネルを使ってインメモリーで通信します。PubSub(1対多送信)ではなくなってますが今回はこれで通します。

コマンド側

 cqrsのコマンド側をcommandパッケージに準備します。
 まずは集約ルートとしてTodoを定義します。

todo.go
package command

import (
    "errors"

    "github.com/lightstaff/go-dddcqrses/common"
    "github.com/lightstaff/go-dddcqrses/events"
)

type Todo struct {
    *common.AggregateBase
    message   string
    completed bool
}

func NewTodo(id string) *Todo {
    return &Todo{
        AggregateBase: common.NewAggregateBase(id),
    }
}

func (t *Todo) Message() string {
    return t.message
}

func (t *Todo) Completed() bool {
    return t.completed
}

func (t *Todo) RaiseEvent(e common.EventContext, n bool) error {
    switch e := e.(type) {
    case *events.TodoRegistered:
        t.message = e.Message
        t.completed = e.Completed
    case *events.TodoMessageChanged:
        t.message = e.Message
    case *events.TodoCompleted:
        t.completed = e.Completed
    default:
        return errors.New("unknown event")
    }

    if n {
        t.AppendUncommittedEvent(e)
    }

    return nil
}

func (t *Todo) Replay(storedEvents []*common.StoredEvent) error {
    for _, storedEvent := range storedEvents {
        e, err := events.EventConverter(storedEvent.EventType, storedEvent.Data)
        if err != nil {
            return err
        }

        if err := t.RaiseEvent(e, false); err != nil {
            return err
        }

        t.CommitEvent(e)
        t.SetStreamVersion(storedEvent.StreamVersion)
    }

    return nil
}

 ドメイン貧血症感満載ですが、サンプルだからよしとします。
 前回定義した「AggregateBase」を埋め込み、そちらに定義していなかった「Repaly」メソッドを実装しています。「RaiseEvent」でイベントが反映される感じです。

 続いてTodoを操作するアクターを定義します。

todo_actor.go
package command

import (
    "errors"

    "go.uber.org/zap"

    "github.com/lightstaff/go-dddcqrses/common"
    "github.com/lightstaff/go-dddcqrses/events"
    "github.com/lightstaff/go-dddcqrses/messages"
)

type (
    TodoRegistry struct {
        Message   string
        Completed bool
    }

    TodoMessageChange struct {
        AggregateID string
        Message     string
    }

    TodoComplete struct {
        AggregateID string
        Completed   bool
    }
)

type TodoActor struct {
    persistence common.PersistenceContext
    producer    common.MessagingProducerContext
    logger      *zap.SugaredLogger
}

func NewTodoActor(persistence common.PersistenceContext, producer common.MessagingProducerContext, logger *zap.SugaredLogger) *TodoActor {
    return &TodoActor{
        persistence: persistence,
        producer:    producer,
        logger:      logger,
    }
}

func (t *TodoActor) Act(command interface{}) error {
    switch command := command.(type) {
    case *TodoRegistry:
        aggregateID, err := common.NewAggregateID()
        if err != nil {
            return err
        }
        entity := NewTodo(aggregateID)
        e, err := events.NewTodoRegistered(aggregateID, command.Message, command.Completed)
        if err != nil {
            return err
        }
        if err := entity.RaiseEvent(e, true); err != nil {
            return err
        }
        if err := t.persistence.Save(entity); err != nil {
            return err
        }
        m, err := messages.NewTodoEventOccurred(entity.AggregateID(), entity.StreamVersion())
        if err != nil {
            return err
        }
        if err := t.producer.Publish(m); err != nil {
            return err
        }
    case *TodoMessageChange:
        entity := NewTodo(command.AggregateID)
        e, err := events.NewTodoMessageChanged(command.AggregateID, command.Message)
        if err != nil {
            return err
        }
        if err := entity.RaiseEvent(e, true); err != nil {
            return err
        }
        if err := t.persistence.Save(entity); err != nil {
            return err
        }
        m, err := messages.NewTodoEventOccurred(entity.AggregateID(), entity.StreamVersion())
        if err != nil {
            return err
        }
        if err := t.producer.Publish(m); err != nil {
            return err
        }
    case *TodoComplete:
        entity := NewTodo(command.AggregateID)
        e, err := events.NewTodoCompleted(command.AggregateID, command.Completed)
        if err != nil {
            return err
        }
        if err := entity.RaiseEvent(e, true); err != nil {
            return err
        }
        if err := t.persistence.Save(entity); err != nil {
            return err
        }
        m, err := messages.NewTodoEventOccurred(entity.AggregateID(), entity.StreamVersion())
        if err != nil {
            return err
        }
        if err := t.producer.Publish(m); err != nil {
            return err
        }
    }
    return errors.New("unknown command")
}

 頭で定義された各コマンドによって処理を分岐しています。「TodoRegistry」コマンドで作成、「TodoMessageChange」コマンでTodoのメッセージを変更。「TodoComplete」コマンドでTodoを完了させます。Proto.Actorのアクターはもっといろいろ管理させたりもできますが、今回はそこまでやりません。

クエリー側

 cqrsのクエリー側をqueryパッケージに準備します。
 DTOの永続化もインメモリーで定義します。

query_db.go
package query

import "go.uber.org/zap"

type QueryDBContext interface {
    FindByID(id string) *TodoQuery
    Save(entity *TodoQuery)
}

type FakeQueryDB struct {
    data   map[string]*TodoQuery
    logger *zap.SugaredLogger
}

func NewFakeQueryDB(logger *zap.SugaredLogger) *FakeQueryDB {
    return &FakeQueryDB{
        data:   make(map[string]*TodoQuery),
        logger: logger,
    }
}

func (db *FakeQueryDB) FindByID(id string) *TodoQuery {
    if entity, ok := db.data[id]; ok {
        return entity
    }
    return nil
}

func (db *FakeQueryDB) Save(entity *TodoQuery) {
    db.data[entity.AggregateID] = entity
}

 特にコメント無し。
 次にDTOとして「TodoQuery」モデルを定義します。

todo_query.go
package query

import (
    "github.com/lightstaff/go-dddcqrses/common"
    "github.com/lightstaff/go-dddcqrses/events"
)

type TodoQuery struct {
    AggregateID   string
    Message       string
    Completed     bool
    StreamVersion int64
}

func (t *TodoQuery) ApplyEvent(e common.EventContext) error {
    switch e := e.(type) {
    case *events.TodoRegistered:
        t.Message = e.Message
        t.Completed = e.Completed
    case *events.TodoMessageChanged:
        t.Message = e.Message
    case *events.TodoCompleted:
        t.Completed = e.Completed
    }
    return nil
}

 「ApplyEvent」メソッドは下のアクターによって呼ばれます。
 こちらはクエリー側のアクターです。

todo_actor.go
package query

import (
    "github.com/lightstaff/go-dddcqrses/common"
    "github.com/lightstaff/go-dddcqrses/events"
    "github.com/lightstaff/go-dddcqrses/messages"
    "go.uber.org/zap"
)

type TodoActor struct {
    persistenceQuery common.PersistenceQueryContext
    queryDB          QueryDBContext
    logger           *zap.SugaredLogger
}

func NewTodoActor(persistenceQuery common.PersistenceQueryContext, queryDB QueryDBContext, logger *zap.SugaredLogger) *TodoActor {
    return &TodoActor{
        persistenceQuery: persistenceQuery,
        queryDB:          queryDB,
        logger:           logger,
    }
}

func (t *TodoActor) Act(msg *messages.TodoEventOccurred) error {
    target := t.queryDB.FindByID(msg.AggregateID)
    if target == nil {
        target = &TodoQuery{
            AggregateID: msg.AggregateID,
        }
    }
    storedEvents, err := t.persistenceQuery.QueryEvents(msg.AggregateID, target.StreamVersion, msg.StreamVersion)
    if err != nil {
        return err
    }
    for _, storedEvent := range storedEvents {
        e, err := events.EventConverter(storedEvent.EventType, storedEvent.Data)
        if err != nil {
            return err
        }
        target.ApplyEvent(e)
        target.StreamVersion = storedEvent.StreamVersion
        t.logger.Infow("apply event", "event", e)
    }
    t.queryDB.Save(target)
    return nil
}

 アクターはメッセージを受け取り、永続化ストアのクエリから取得したイベントをDTOに流し、保存します。

メイン

 最後にそれぞれを動作させるエントリポイントとしてmainを定義します。

main.go
package main

import (
    "context"
    "os"
    "os/signal"

    "github.com/lightstaff/go-dddcqrses/command"
    "github.com/lightstaff/go-dddcqrses/common"
    "github.com/lightstaff/go-dddcqrses/messages"
    "github.com/lightstaff/go-dddcqrses/query"
    "go.uber.org/zap"
)

func main() {
    logger, err := zap.NewProduction()
    if err != nil {
        panic(err)
    }
    defer logger.Sync()

    sugar := logger.Sugar()

    inMemoryDB := common.NewInMemoryDB(sugar)

    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    delivery := make(chan *common.InMemoryMessage)
    errc := make(chan error)

    go func() {
        persistence := common.NewFakePersistence(inMemoryDB, sugar)
        producer := common.NewFakeMessagingProducer(delivery, sugar)
        commandActor := command.NewTodoActor(persistence, producer, sugar)
        commandActor.Act(&command.TodoRegistry{
            Message:   "test message",
            Completed: false,
        })
        sugar.Info("command actor action completed")
    }()

    go func() {
        defer close(delivery)

        persistenceQuery := common.NewFakePersistenceQuery(inMemoryDB, sugar)
        consumer := common.NewFakeMessagingConsumer(delivery, sugar)
        queryDB := query.NewFakeQueryDB(sugar)
        queryActor := query.NewTodoActor(persistenceQuery, queryDB, sugar)

        go func() {
            if err := consumer.Consume(ctx, delivery); err != nil {
                errc <- err
            }
        }()

        for {
            select {
            case <-ctx.Done():
                sugar.Info("call context cancel")
                return
            case m := <-delivery:
                msg, err := messages.MessageConveter(m.Header, m.Data)
                if err != nil {
                    errc <- err
                }
                sugar.Infow("receive message", "message", msg)
                switch msg := msg.(type) {
                case *messages.TodoEventOccurred:
                    if err := queryActor.Act(msg); err != nil {
                        errc <- err
                    }
                    sugar.Info("query actor action completed")
                    sugar.Infow("now todo", "todo", queryDB.FindByID(msg.AggregateID))
                }
            }
        }
    }()

    go func() {
        if err := <-errc; err != nil {
            sugar.Errorw("error happend", "error", err)
            cancel()
        }
    }()

    quit := make(chan os.Signal, 1)
    signal.Notify(quit, os.Interrupt)
    <-quit
}

 今回はインメモリーで永続化やメッセージングをしているのでコマンドとクエリーを分割できませんが、そのへんを別の機構で実装すればそれぞれを独立することが可能なはずです。
 Proto.Actorのリモート周りを使うのも手ですが、protocol-bufferの知識があった方が良いのとホスト側でリモートのインスタンスを立てなきゃいけないので今回は見送りました。

おわりに

 何とかかんとか実装してみたけど、cqrs(DDD含めて)+esとgolangの相性が良いかと言われると微妙な気がする。C#やらJava(Scala)に比べて抽象化はしずらいし(埋め込みでは解決できないこともある)、諸々の命名がいまいち長ったらしくなってしまってgolang感が・・・。

 成果物:https://github.com/lightstaff/go-dddcqrses

参考資料

  • エヴァンス本
  • 実践ドメイン駆動設計
  • .NETのエンタープライズアプリケーションアーキテクチャ第2版
  • Akka
  • Proto.Actor
3
3
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
3
3