3
3

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

golangでcqrs+esにチャレンジしてみる 実装編

Last updated at Posted at 2017-11-28

はじめに

 前回、下準備したものの続きです。
 今回は下準備したものを使ってTodoアプリっぽく実装してみます。ついでにAkkaProto.Actorをヒントにアクターシステムっぽくしてみたいと思います。コマンド側とクエリー側それぞれのアクターがコマンド処理やメッセージ処理を行います。

前回からの変更

 前回のを使ってと言いながらいきなり変更します。だって前回のFake永続化、Fakeメッセージングでは実装できなかったんですもの・・・。

persistence.go
package common

import (
	"encoding/json"
	"sort"

	"go.uber.org/zap"
)

type InMemoryDB struct {
	data   map[string][]*StoredEvent
	logger *zap.SugaredLogger
}

func NewInMemoryDB(logger *zap.SugaredLogger) *InMemoryDB {
	return &InMemoryDB{
		data:   make(map[string][]*StoredEvent),
		logger: logger,
	}
}

func (db *InMemoryDB) GetByID(id string) []*StoredEvent {
	results := make([]*StoredEvent, 0)
	results = append(results, db.data[id]...)
	sort.SliceStable(results, func(i, j int) bool {
		switch {
		case results[i].StreamVersion < results[j].StreamVersion:
			return true
		case results[i].StreamVersion > results[j].StreamVersion:
			return false
		default:
			return results[i].OccurredOn < results[j].OccurredOn
		}
	})
	return results
}

func (db *InMemoryDB) Save(e *StoredEvent) {
	db.data[e.AggregateID] = append(db.data[e.AggregateID], e)
}

type PersistenceContext interface {
	ReplayAggregate(a AggregateContext) error
	Save(a AggregateContext) error
}

type FakePersistence struct {
	db     *InMemoryDB
	logger *zap.SugaredLogger
}

func NewFakePersistence(db *InMemoryDB, logger *zap.SugaredLogger) *FakePersistence {
	return &FakePersistence{
		db:     db,
		logger: logger,
	}
}

func (p *FakePersistence) ReplayAggregate(a AggregateContext) error {
	storedEvents := p.db.GetByID(a.AggregateID())
	if err := a.Replay(storedEvents); err != nil {
		return err
	}
	return nil
}

func (p *FakePersistence) Save(a AggregateContext) error {
	for _, e := range a.UncommittedEvents() {
		d, err := json.Marshal(e)
		if err != nil {
			return err
		}
		storedEvent := &StoredEvent{
			AggregateID:   a.AggregateID(),
			StreamVersion: a.StreamVersion(),
			OccurredOn:    e.GetOccurredOn(),
			EventType:     e.GetEventType(),
			Data:          d,
		}
		p.db.Save(storedEvent)
		a.CommitEvent(e)
	}
	a.SetStreamVersion(a.StreamVersion() + 1)
	return nil
}

type PersistenceQueryContext interface {
	QueryEvents(id string, base, limit int64) ([]*StoredEvent, error)
}

type FakePersistenceQuery struct {
	db     *InMemoryDB
	logger *zap.SugaredLogger
}

func NewFakePersistenceQuery(db *InMemoryDB, logger *zap.SugaredLogger) *FakePersistenceQuery {
	return &FakePersistenceQuery{
		db:     db,
		logger: logger,
	}
}

func (p *FakePersistenceQuery) QueryEvents(id string, base, limit int64) ([]*StoredEvent, error) {
	results := make([]*StoredEvent, 0)
	storedEvents := p.db.GetByID(id)
	for _, storedEvent := range storedEvents {
		if storedEvent.StreamVersion >= base && storedEvent.StreamVersion <= limit {
			results = append(results, storedEvent)
		}
	}
	return results, nil
}

type StoredEvent struct {
	AggregateID   string
	StreamVersion int64
	OccurredOn    int64
	EventType     string
	Data          []byte
}
messaging.go
package common

import (
	"context"
	"encoding/json"

	"github.com/google/uuid"
	"github.com/pkg/errors"
	"go.uber.org/zap"
)

type InMemoryMessage struct {
	Header string
	Data   []byte
}

type MessageContext interface {
	GetMessageID() string
	GetMessageType() string
}

func NewMessageID() (string, error) {
	id, err := uuid.NewUUID()
	if err != nil {
		return "", errors.Wrap(err, "IDの生成に失敗しました")
	}

	return id.String(), nil
}

type MessagingProducerContext interface {
	Publish(m MessageContext) error
}

type FakeMessagingProducer struct {
	channel chan<- *InMemoryMessage
	logger  *zap.SugaredLogger
}

func NewFakeMessagingProducer(ch chan<- *InMemoryMessage, logger *zap.SugaredLogger) *FakeMessagingProducer {
	return &FakeMessagingProducer{
		channel: ch,
		logger:  logger,
	}
}

func (p *FakeMessagingProducer) Publish(m MessageContext) error {
	d, err := json.Marshal(m)
	if err != nil {
		return err
	}
	msg := &InMemoryMessage{
		Header: m.GetMessageType(),
		Data:   d,
	}
	p.channel <- msg
	p.logger.Infow("publish message", "message", msg)
	return nil
}

type MessagingConsumerContext interface {
	Consume(ctx context.Context, msg chan<- *InMemoryMessage) error
}

type FakeMessagingConsumer struct {
	channel <-chan *InMemoryMessage
	logger  *zap.SugaredLogger
}

func NewFakeMessagingConsumer(ch <-chan *InMemoryMessage, logger *zap.SugaredLogger) *FakeMessagingConsumer {
	return &FakeMessagingConsumer{
		channel: ch,
		logger:  logger,
	}
}

func (c *FakeMessagingConsumer) Consume(ctx context.Context, msg chan<- *InMemoryMessage) error {
	for {
		select {
		case <-ctx.Done():
			return ctx.Err()
		case m := <-c.channel:
			msg <- m
			c.logger.Infow("consume message", "message", m)
		}
	}
}

 
 デバッグ用にロガーを突っ込んでますが、気にしないでください。Mapを利用したインメモリーデータベースを用意し、そこにイベントを集積します。メッセージングはチャネルを使ってインメモリーで通信します。PubSub(1対多送信)ではなくなってますが今回はこれで通します。

コマンド側

 cqrsのコマンド側をcommandパッケージに準備します。
 まずは集約ルートとしてTodoを定義します。

todo.go
package command

import (
	"errors"

	"github.com/lightstaff/go-dddcqrses/common"
	"github.com/lightstaff/go-dddcqrses/events"
)

type Todo struct {
	*common.AggregateBase
	message   string
	completed bool
}

func NewTodo(id string) *Todo {
	return &Todo{
		AggregateBase: common.NewAggregateBase(id),
	}
}

func (t *Todo) Message() string {
	return t.message
}

func (t *Todo) Completed() bool {
	return t.completed
}

func (t *Todo) RaiseEvent(e common.EventContext, n bool) error {
	switch e := e.(type) {
	case *events.TodoRegistered:
		t.message = e.Message
		t.completed = e.Completed
	case *events.TodoMessageChanged:
		t.message = e.Message
	case *events.TodoCompleted:
		t.completed = e.Completed
	default:
		return errors.New("unknown event")
	}

	if n {
		t.AppendUncommittedEvent(e)
	}

	return nil
}

func (t *Todo) Replay(storedEvents []*common.StoredEvent) error {
	for _, storedEvent := range storedEvents {
		e, err := events.EventConverter(storedEvent.EventType, storedEvent.Data)
		if err != nil {
			return err
		}

		if err := t.RaiseEvent(e, false); err != nil {
			return err
		}

		t.CommitEvent(e)
		t.SetStreamVersion(storedEvent.StreamVersion)
	}

	return nil
}

 ドメイン貧血症感満載ですが、サンプルだからよしとします。
 前回定義した「AggregateBase」を埋め込み、そちらに定義していなかった「Repaly」メソッドを実装しています。「RaiseEvent」でイベントが反映される感じです。

 続いてTodoを操作するアクターを定義します。

todo_actor.go
package command

import (
	"errors"

	"go.uber.org/zap"

	"github.com/lightstaff/go-dddcqrses/common"
	"github.com/lightstaff/go-dddcqrses/events"
	"github.com/lightstaff/go-dddcqrses/messages"
)

type (
	TodoRegistry struct {
		Message   string
		Completed bool
	}

	TodoMessageChange struct {
		AggregateID string
		Message     string
	}

	TodoComplete struct {
		AggregateID string
		Completed   bool
	}
)

type TodoActor struct {
	persistence common.PersistenceContext
	producer    common.MessagingProducerContext
	logger      *zap.SugaredLogger
}

func NewTodoActor(persistence common.PersistenceContext, producer common.MessagingProducerContext, logger *zap.SugaredLogger) *TodoActor {
	return &TodoActor{
		persistence: persistence,
		producer:    producer,
		logger:      logger,
	}
}

func (t *TodoActor) Act(command interface{}) error {
	switch command := command.(type) {
	case *TodoRegistry:
		aggregateID, err := common.NewAggregateID()
		if err != nil {
			return err
		}
		entity := NewTodo(aggregateID)
		e, err := events.NewTodoRegistered(aggregateID, command.Message, command.Completed)
		if err != nil {
			return err
		}
		if err := entity.RaiseEvent(e, true); err != nil {
			return err
		}
		if err := t.persistence.Save(entity); err != nil {
			return err
		}
		m, err := messages.NewTodoEventOccurred(entity.AggregateID(), entity.StreamVersion())
		if err != nil {
			return err
		}
		if err := t.producer.Publish(m); err != nil {
			return err
		}
	case *TodoMessageChange:
		entity := NewTodo(command.AggregateID)
		e, err := events.NewTodoMessageChanged(command.AggregateID, command.Message)
		if err != nil {
			return err
		}
		if err := entity.RaiseEvent(e, true); err != nil {
			return err
		}
		if err := t.persistence.Save(entity); err != nil {
			return err
		}
		m, err := messages.NewTodoEventOccurred(entity.AggregateID(), entity.StreamVersion())
		if err != nil {
			return err
		}
		if err := t.producer.Publish(m); err != nil {
			return err
		}
	case *TodoComplete:
		entity := NewTodo(command.AggregateID)
		e, err := events.NewTodoCompleted(command.AggregateID, command.Completed)
		if err != nil {
			return err
		}
		if err := entity.RaiseEvent(e, true); err != nil {
			return err
		}
		if err := t.persistence.Save(entity); err != nil {
			return err
		}
		m, err := messages.NewTodoEventOccurred(entity.AggregateID(), entity.StreamVersion())
		if err != nil {
			return err
		}
		if err := t.producer.Publish(m); err != nil {
			return err
		}
	}
	return errors.New("unknown command")
}

 頭で定義された各コマンドによって処理を分岐しています。「TodoRegistry」コマンドで作成、「TodoMessageChange」コマンでTodoのメッセージを変更。「TodoComplete」コマンドでTodoを完了させます。Proto.Actorのアクターはもっといろいろ管理させたりもできますが、今回はそこまでやりません。

クエリー側

 cqrsのクエリー側をqueryパッケージに準備します。
 DTOの永続化もインメモリーで定義します。

query_db.go
package query

import "go.uber.org/zap"

type QueryDBContext interface {
	FindByID(id string) *TodoQuery
	Save(entity *TodoQuery)
}

type FakeQueryDB struct {
	data   map[string]*TodoQuery
	logger *zap.SugaredLogger
}

func NewFakeQueryDB(logger *zap.SugaredLogger) *FakeQueryDB {
	return &FakeQueryDB{
		data:   make(map[string]*TodoQuery),
		logger: logger,
	}
}

func (db *FakeQueryDB) FindByID(id string) *TodoQuery {
	if entity, ok := db.data[id]; ok {
		return entity
	}
	return nil
}

func (db *FakeQueryDB) Save(entity *TodoQuery) {
	db.data[entity.AggregateID] = entity
}

 特にコメント無し。
 次にDTOとして「TodoQuery」モデルを定義します。

todo_query.go
package query

import (
	"github.com/lightstaff/go-dddcqrses/common"
	"github.com/lightstaff/go-dddcqrses/events"
)

type TodoQuery struct {
	AggregateID   string
	Message       string
	Completed     bool
	StreamVersion int64
}

func (t *TodoQuery) ApplyEvent(e common.EventContext) error {
	switch e := e.(type) {
	case *events.TodoRegistered:
		t.Message = e.Message
		t.Completed = e.Completed
	case *events.TodoMessageChanged:
		t.Message = e.Message
	case *events.TodoCompleted:
		t.Completed = e.Completed
	}
	return nil
}

 「ApplyEvent」メソッドは下のアクターによって呼ばれます。
 こちらはクエリー側のアクターです。

todo_actor.go
package query

import (
	"github.com/lightstaff/go-dddcqrses/common"
	"github.com/lightstaff/go-dddcqrses/events"
	"github.com/lightstaff/go-dddcqrses/messages"
	"go.uber.org/zap"
)

type TodoActor struct {
	persistenceQuery common.PersistenceQueryContext
	queryDB          QueryDBContext
	logger           *zap.SugaredLogger
}

func NewTodoActor(persistenceQuery common.PersistenceQueryContext, queryDB QueryDBContext, logger *zap.SugaredLogger) *TodoActor {
	return &TodoActor{
		persistenceQuery: persistenceQuery,
		queryDB:          queryDB,
		logger:           logger,
	}
}

func (t *TodoActor) Act(msg *messages.TodoEventOccurred) error {
	target := t.queryDB.FindByID(msg.AggregateID)
	if target == nil {
		target = &TodoQuery{
			AggregateID: msg.AggregateID,
		}
	}
	storedEvents, err := t.persistenceQuery.QueryEvents(msg.AggregateID, target.StreamVersion, msg.StreamVersion)
	if err != nil {
		return err
	}
	for _, storedEvent := range storedEvents {
		e, err := events.EventConverter(storedEvent.EventType, storedEvent.Data)
		if err != nil {
			return err
		}
		target.ApplyEvent(e)
		target.StreamVersion = storedEvent.StreamVersion
		t.logger.Infow("apply event", "event", e)
	}
	t.queryDB.Save(target)
	return nil
}

 アクターはメッセージを受け取り、永続化ストアのクエリから取得したイベントをDTOに流し、保存します。

メイン

 最後にそれぞれを動作させるエントリポイントとしてmainを定義します。

main.go
package main

import (
	"context"
	"os"
	"os/signal"

	"github.com/lightstaff/go-dddcqrses/command"
	"github.com/lightstaff/go-dddcqrses/common"
	"github.com/lightstaff/go-dddcqrses/messages"
	"github.com/lightstaff/go-dddcqrses/query"
	"go.uber.org/zap"
)

func main() {
	logger, err := zap.NewProduction()
	if err != nil {
		panic(err)
	}
	defer logger.Sync()

	sugar := logger.Sugar()

	inMemoryDB := common.NewInMemoryDB(sugar)

	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	delivery := make(chan *common.InMemoryMessage)
	errc := make(chan error)

	go func() {
		persistence := common.NewFakePersistence(inMemoryDB, sugar)
		producer := common.NewFakeMessagingProducer(delivery, sugar)
		commandActor := command.NewTodoActor(persistence, producer, sugar)
		commandActor.Act(&command.TodoRegistry{
			Message:   "test message",
			Completed: false,
		})
		sugar.Info("command actor action completed")
	}()

	go func() {
		defer close(delivery)

		persistenceQuery := common.NewFakePersistenceQuery(inMemoryDB, sugar)
		consumer := common.NewFakeMessagingConsumer(delivery, sugar)
		queryDB := query.NewFakeQueryDB(sugar)
		queryActor := query.NewTodoActor(persistenceQuery, queryDB, sugar)

		go func() {
			if err := consumer.Consume(ctx, delivery); err != nil {
				errc <- err
			}
		}()

		for {
			select {
			case <-ctx.Done():
				sugar.Info("call context cancel")
				return
			case m := <-delivery:
				msg, err := messages.MessageConveter(m.Header, m.Data)
				if err != nil {
					errc <- err
				}
				sugar.Infow("receive message", "message", msg)
				switch msg := msg.(type) {
				case *messages.TodoEventOccurred:
					if err := queryActor.Act(msg); err != nil {
						errc <- err
					}
					sugar.Info("query actor action completed")
					sugar.Infow("now todo", "todo", queryDB.FindByID(msg.AggregateID))
				}
			}
		}
	}()

	go func() {
		if err := <-errc; err != nil {
			sugar.Errorw("error happend", "error", err)
			cancel()
		}
	}()

	quit := make(chan os.Signal, 1)
	signal.Notify(quit, os.Interrupt)
	<-quit
}

 今回はインメモリーで永続化やメッセージングをしているのでコマンドとクエリーを分割できませんが、そのへんを別の機構で実装すればそれぞれを独立することが可能なはずです。
 Proto.Actorのリモート周りを使うのも手ですが、protocol-bufferの知識があった方が良いのとホスト側でリモートのインスタンスを立てなきゃいけないので今回は見送りました。

おわりに

 何とかかんとか実装してみたけど、cqrs(DDD含めて)+esとgolangの相性が良いかと言われると微妙な気がする。C#やらJava(Scala)に比べて抽象化はしずらいし(埋め込みでは解決できないこともある)、諸々の命名がいまいち長ったらしくなってしまってgolang感が・・・。

 成果物:https://github.com/lightstaff/go-dddcqrses

参考資料

  • エヴァンス本
  • 実践ドメイン駆動設計
  • .NETのエンタープライズアプリケーションアーキテクチャ第2版
  • Akka
  • Proto.Actor
3
3
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
3
3

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?