2
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

golangでcqrs+esにチャレンジしてみる 準備編

Posted at

#はじめに
 goでcqrs+esできないもんだろうか?と思ったのでできるとこからやってみた。
 あくまでチャレンジなので厳密なDDDやcqrsの定義は置いておきたい。特にDDDの値オブジェクトとかシリアル化したい場合、goではドメイン内で隠匿するのが面倒くさかったりする気がするのでそこは頑張らない。
#準備
 とりあえず準備として共有して使う物をcommonパッケージとしてまとめてみる。

集約ルート

aggregate.go
package common

import (
	"sort"

	"github.com/google/uuid"
	"github.com/pkg/errors"
)

type AggregateContext interface {
	AggregateID() string
	StreamVersion() int64
	SetStreamVersion(int64)
	UncommittedEvents() []EventContext
	AppendUncommittedEvent(EventContext)
	CommitEvent(EventContext)
	CommittedEvents() []EventContext
	Replay([]*StoredEvent) error
}

func NewAggregateID() (string, error) {
	id, err := uuid.NewUUID()
	if err != nil {
		return "", errors.Wrap(err, "IDの生成に失敗しました")
	}

	return id.String(), nil
}

type AggregateBase struct {
	aggregateID         string
	streamVersion       int64
	uncommittedEventMap map[string]EventContext
	committedEventMap   map[string]EventContext
}

func NewAggregateBase(aggregateID string) *AggregateBase {
	return &AggregateBase{
		aggregateID:         aggregateID,
		streamVersion:       int64(0),
		uncommittedEventMap: make(map[string]EventContext),
		committedEventMap:   make(map[string]EventContext),
	}
}

func (a *AggregateBase) AggregateID() string {
	return a.aggregateID
}

func (a *AggregateBase) StreamVersion() int64 {
	return a.streamVersion
}

func (a *AggregateBase) SetStreamVersion(value int64) {
	a.streamVersion = value
}

func (a *AggregateBase) UncommittedEvents() []EventContext {
	if a.uncommittedEventMap == nil {
		a.uncommittedEventMap = make(map[string]EventContext)
	}
	results := make([]EventContext, 0, len(a.uncommittedEventMap))
	for _, event := range a.uncommittedEventMap {
		results = append(results, event)
	}
	sort.SliceStable(results, func(i, j int) bool {
		return results[i].GetOccurredOn() < results[j].GetOccurredOn()
	})
	return results
}

func (a *AggregateBase) AppendUncommittedEvent(event EventContext) {
	if a.uncommittedEventMap == nil {
		a.uncommittedEventMap = make(map[string]EventContext)
	}
	a.uncommittedEventMap[event.GetEventID()] = event
}

func (a *AggregateBase) CommittedEvents() []EventContext {
	if a.committedEventMap == nil {
		a.committedEventMap = make(map[string]EventContext)
	}
	results := make([]EventContext, 0, len(a.committedEventMap))
	for _, event := range a.committedEventMap {
		results = append(results, event)
	}
	sort.SliceStable(results, func(i, j int) bool {
		return results[i].GetOccurredOn() < results[j].GetOccurredOn()
	})
	return results
}

func (a *AggregateBase) CommitEvent(event EventContext) {
	if a.uncommittedEventMap == nil {
		a.uncommittedEventMap = make(map[string]EventContext)
	}
	delete(a.uncommittedEventMap, event.GetEventID())
	if a.committedEventMap == nil {
		a.committedEventMap = make(map[string]EventContext)
	}
	a.committedEventMap[event.GetEventID()] = event
}

 大まかに説明すると...

  • AggregateIDが一意なID。DDD的には集約毎に値オブジェクトにした方がいいんだろうなとは思う。
  • StreamVersionがイベントソーシングのバージョン管理で永続化されるとSetStreamVersionでインクリメントされる。
  • UncommittedEventsが適用前のイベントで、イベントがAppendUncommittedEventでここに突っ込まれる。
  • CommittedEventsが適用済のイベントで、永続化やリプレイ時にここに適用された際にCommitEventでイベントが入る。
  • Replayは永続化ストアから再生する時に呼ばれる。これは各集約ルートで実装。

イベント

event.go
package common

import (
	"github.com/google/uuid"
	"github.com/pkg/errors"
)

type EventContext interface {
	GetEventID() string
	GetEventType() string
	GetOccurredOn() int64
}

func NewEventID() (string, error) {
	id, err := uuid.NewUUID()
	if err != nil {
		return "", errors.Wrap(err, "IDの生成に失敗しました")
	}

	return id.String(), nil
}

 特に説明することもないけど、なんでインターフェースでGet付けてんのかというと実装時に名前が被るから。

永続化ストア

persistence.go
package common

type PersistenceContext interface {
	ReplayAggregate(a AggregateContext) error
	Save(a AggregateContext) error
}

type FakePersistence struct{}

func (p *FakePersistence) ReplayAggregate(a AggregateContext) error {
	storedEvents := make([]*StoredEvent, 0)
	return a.Replay(storedEvents)
}

func (p *FakePersistence) Save(a AggregateContext) error {
	for _, e := range a.UncommittedEvents() {
		/*
			storedEvent := &StoredEvent {
				AggregateID: a.AggregateID(),
				StreamVersion: a.StreamVersion(),
				OccurredOn: e.OccurredOn(),
				EventType: e.EventType(),
				Data: []byte(e),
			}

			db save stored event
		*/

		a.CommitEvent(e)
	}

	return nil
}

type PersistenceQueryContext interface {
	QueryEvents(id string, base, limit int64) ([]*StoredEvent, error)
}

type FakePersistenceQuery struct{}

func (p *FakePersistenceQuery) QueryEvents(id string, base, limit int64) ([]*StoredEvent, error) {
	storedEvents := make([]*StoredEvent, 0)

	/*
		load stored events from db
	*/

	return storedEvents, nil
}

type StoredEvent struct {
	AggregateID   string
	StreamVersion int64
	OccurredOn    int64
	EventType     string
	Data          []byte
}

 Fakeが中々苦しい。実際はCassandra(gocql)でやってみたりしてますが、そこまで書くと長くなりすぎるのでFakeで割愛。

  • FakePersistenceをコマンド側が利用し、集約ルートの永続化やリプレイを行います。
  • FakePersistenceQueryをクエリ側が利用し、指定された範囲(baseとlimit)のバージョンのイベントを取得できるようにします。
  • goではC#や、Javaのようにblobデータから単純にstructに戻せないのでStoredEventにラップして、再生時はその中のEventType(構造体名)を見てイベントを元のstructに戻します。

メッセージング

messaging.go
package common

import (
	"context"

	"github.com/google/uuid"
	"github.com/pkg/errors"
)

type MessageContext interface {
	GetMessageID() string
	GetMessageType() string
}

func NewMessageID() (string, error) {
	id, err := uuid.NewUUID()
	if err != nil {
		return "", errors.Wrap(err, "IDの生成に失敗しました")
	}

	return id.String(), nil
}

type MessagingProducerContext interface {
	Publish(m MessageContext) error
}

type FakeMessagingProducer struct{}

func (p *FakeMessagingProducer) Publish(m MessageContext) error {
	/*
		create message for messaging system
		message header = m.MessageType()
		message body = []byte(m)
		messaging system send message
	*/

	return nil
}

type MessagingConsumerContext interface {
	Consume(ctx context.Context, msg chan<- MessageContext) error
}

type FakeMessagingConsumer struct{}

func (c *FakeMessagingConsumer) Consume(ctx context.Context, msg chan<- MessageContext) error {
	/*
		for {
			select {
			case <-ctx.Done():
				return ctx.Err()
			default:
				delivery = from messaging system
				msg <- delivery to MessageContext
			}
		}
	*/

	return nil
}

 これまたFakeが苦しいですが、こっちも実際にはRabbitmq(amqp)を使ってみたりしてます。CosumeにChannelを入れてるのはそのためです。

  • FakeMessagingProducerが送信側です。
  • FakeMessagingConsumerが受信側です。Channelを使って受け待ちします。
  • こちらもstructを再生するためにヘッダー等にMessageType(構造体名)と入れます。

おわりに

 とりあえず今回は準備編としてここまで。
 何かFake周りのでせいで実装できるか不安になってきたのと、何かコード自体が重苦しいのでもっとgoらしく軽い感じにできんかなぁと思ったりもします・・・。

2
2
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?