18
9

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

Wano Group Advent Calendar2021Advent Calendar 2021

Day 9

GoとRepositoryパターンと集約またぎのトランザクションと

Last updated at Posted at 2021-12-09

Wano Group Advent Calendar 2021 の記事です。

教義通りにRepositoryでトランザクション貼ってるDDDやらレイヤードアーキテクチャやら関心の分離やらの記事はあれど、Repisitoryをまたいだトランザクションのサンプルってそんなに多くないですね。

多くのスタートアップで現実的には向き合わなきゃいけなさそうな気もするんですが、みんな分散トランザクションの天才とかなのかしら...(
一部プロジェクトでやってるトランザクションのパターンを本稿では扱おうと思います。

要約

  • トランザクションは主に各アプリのusecaseで貼ってる
  • context.Contextにmutableなトランザクションオブジェクトを詰めてる
  • repositoryにはcontextからORMのオブジェクトを復元する関数自体をDIしてる

前段:保存したいモデル/RDB上の構成/Goのパッケージ構成

こんなかんじのmodelで、この中の video_asset.VideoAsset を保存したいとします。

今回データベースはMySQLとします。
モデルに対応する部分は簡易ER図でいうとこんな感じです

上記を実装するVideoAssetリポジトリ周りのGoのパッケージ構成はこんなものであるとします

pkg/domain/video_asset
// video asset のモデルとかここでしかつかわないvalue objectとかrepositoryのinterfaceとか
├── validate.go
├── video_asset.go
├── video_asset.go_linq_generated.go
└── video_asset.repository.go
...
pkg/infra/adapter/domain/mysql_repository
// repositoryのmysql実装
├── 00_init_test.go
├── 020_video_asset.repository_test.go
├── mysqldto
│   ├── 0README.md
│   ├── 00_go_generate.go
│   ├── asset.go
│   ├── user.go
│   ├── video_asset.go
│   ├── video_asset_crop.go
...
├── video_asset.repository.go
...
pkg/domain/vo
// value objectパッケージ / 共通モデル
├── 00_README.md
├── dconst
...
│   ├── s3_storage_class.go
│
...

├── user_id.go
└── video_asset_id.go



保存したいモデル群です。

pkg/domain/video_asset/video_asset.go

...

// aggregate root
type VideoAsset struct {
	ID             vo.VideoAssetID     // aggregate id
	Asset          vo.Asset           
	FfprobeInfo    vo.NullFFprobeJson 
	VideoAssetCrop *VideoAssetCrop
}

type VideoAssets []VideoAsset

type VideoAssetCrop struct {
	// 黒枠(crop) の確定データ
	Top           int
	Bottom        int
	Left          int
	Right         int
	DetectedCrops VideoAssetCropByFrameJson
}

// フレームごとの黒枠(crop)情報
type VideoAssetCropByFrameJson string

func (self VideoAssetCropByFrameJson) ToCropOnTimeFrames() []CropOnTimeFrame {

	target := make([]CropOnTimeFrame, 0)
	_ = json.Unmarshal([]byte(self), &target)

	return target
}

type CropOnTimeFrame struct {
	TimeSeconds float64 `json:"time_seconds"`
	Top         int     `json:"top"`
	Bottom      int     `json:"bottom"`
	Left        int     `json:"left"`
	Right       int     `json:"right"`
}

pkg/domain/vo/asset.go

// パフォーマンス要件等ない限り共通objectにidentifierは持たない
type Asset struct {
	UserID           UserID                  
	Bucket           string                  
	Location         string                  
	MimeType         string                  
	UploadDatetime   null.Time               
	Filename         null.String             
	S3StorageClassID dconst.S3StorageClassID 
	IsExist          bool                    
	FileSizeByte     typedef.ByteSize    
	Md5Hash          null.String             
}

func (self *Asset) RecommendedMimeType() string {

	fileName := self.Filename.String
	return mime.TypeByExtension(filepath.Ext(fileName))
}
...


Repositoryの定義

ここから本題です。
Repositoryの定義をします。
集約内のフィールド情報だけを用いたあんまり複雑じゃない最低限の検索メソッドはこの際許すこととして,interfaceを定義します。

pkg/domain/video_asset/video_asset_repository.go

package video_asset

...

//go:generate mockgen -source=${GOFILE} -destination=../../infra/adapter/domain/mock_repository/mock_${GOFILE}_generated.go -package=mock_repository

type VideoAssetRepository interface {
	Save(ctx context.Context, videoAsset *VideoAsset) (*VideoAsset, error)
	DeleteByIDs(ctx context.Context, id ...vo.VideoAssetID) (deleted VideoAssets, err error)
	Where(ctx context.Context) VideoAssetFilter
}

type VideoAssetFilter interface {
	IDIn(id ...vo.VideoAssetID) VideoAssetFilter
	UserIDIn(userId ...vo.UserID) VideoAssetFilter
	// asset
	AssetBucketNameIn(bucket ...string) VideoAssetFilter
	AssetLocationIn(location ...string) VideoAssetFilter
	AssetS3StorageClassIDIN(s3Class ...dconst.S3StorageClassID) VideoAssetFilter
	AssetFileSizeByteGreaterThan(byteSize int64) VideoAssetFilter
	AssetIsExist(b bool) VideoAssetFilter
	AssetMimeTypeIn(mimeType ...string) VideoAssetFilter
	// get
	Find() (VideoAssets, error)
	First() (*VideoAsset, error)
}

Save(ctx context.Context, videoAsset *VideoAsset) (*VideoAsset, error) 時にオブジェクトを返していますが、これはvideo_assetテーブルにおけるIDがauto incrementであることによる採番の事情が色濃く入っていて、できるならULIDとかで採番し、Save(ctx context.Context, videoAsset *VideoAsset) error とかのが良さそうです。

Repository実装

mysql版実装です。
コネクションはContextに詰める想定です。
また、ORMはgormを使っています。
Repositoryには、context.Contextからmysqlのconnectionをとる関数自体をDIしています。

/pkg/infra/adapter/domain/mysql_repository/common.go
type GetTx = func(ctx context.Context) *gorm.DB

type MysqlDependency struct {
	GetTx GetTx `validate:"required"`
}

/pkg/infra/adapter/domain/mysql_repository/video_asset.repository.go
package mysql_repository

...

func NewVideoAssetRepository(deps *MysqlDependency) video_asset.VideoAssetRepository {

	err := validator.New().Struct(deps)
	if err != nil {
		log.Panic(err)
	}

	impl := implVideoAssetRepository{
		MysqlDependency: deps,
	}

	return &impl
}

type implVideoAssetRepository struct {
	*MysqlDependency
}

func (self *implVideoAssetRepository) Save(ctx context.Context, videoAsset *video_asset.VideoAsset) (*video_asset.VideoAsset, error) {
   // asset レコードを delete/insertするコード
   // video_assetを保存するコード
....
	return self.Where(ctx).IDIn(vo.VideoAssetID(vaDto.ID)).Single()
}

func (self *implVideoAssetRepository) DeleteByIDs(ctx context.Context, id ...vo.VideoAssetID) (deleted video_asset.VideoAssets, err error) {
	// ... assetを物理削除してからvideo_assetを削除するコード
...
}

func (self *implVideoAssetRepository) Where(ctx context.Context) video_asset.VideoAssetFilter {
	// 綺麗な状態のcloneを作ること
	tx := self.GetTx(ctx).New()

	return newVideoAssetFilter(tx)
}

func newVideoAssetFilter(gormDB *gorm.DB) video_asset.VideoAssetFilter {

	gormDB = gormDB.Table(`video_asset`).
		Joins(`INNER JOIN asset ON video_asset.asset_id = asset.id`)

	return &implVideoAssetFilter{
		db: gormDB,
	}
}

type implVideoAssetFilter struct {
	db *gorm.DB
}

func (self *implVideoAssetFilter) IDIn(id ...vo.VideoAssetID) video_asset.VideoAssetFilter {
	self.db = self.db.Where(`video_asset.id IN (?)`, id)
	return self
}

func (self *implVideoAssetFilter) UserIDIn(userId ...vo.UserID) video_asset.VideoAssetFilter {
	self.db = self.db.Where(`asset.user_id IN (?)`, userId)
	return self
}

func (self *implVideoAssetFilter) AssetBucketNameIn(bucket ...string) video_asset.VideoAssetFilter {
	self.db = self.db.Where(`asset.bucket IN (?)`, bucket)
	return self
}

...

func (self *implVideoAssetFilter) AssetFileSizeByteLessThanOrEqual(byteSize int64) video_asset.VideoAssetFilter {
	self.db = self.db.Where(`asset.file_size_byte <= ?`, byteSize)
	return self
}

func (self *implVideoAssetFilter) AssetIsExist(b bool) video_asset.VideoAssetFilter {
	self.db = self.db.Where(`asset.is_exist = ?`, b)
	return self
}


func (self *implVideoAssetFilter) Find() (video_asset.VideoAssets, error) {

	videoAssetDtos := mysqldto.VideoAssetSlice{}
	err := self.db.Find(&videoAssetDtos).Error
	if err != nil {
		return nil, xerrors.Errorf(`%w`, err)
	}

	assetIds := videoAssetDtos.SelectInt(func(va mysqldto.VideoAsset) int {
		return va.AssetID
	})

	newCmd := self.db.New()
	assetDtos := mysqldto.AssetSlice{}
	err = newCmd.Raw(`
		SELECT * 
		FROM asset
		WHERE id IN (?)
	`, assetIds).Scan(&assetDtos).Error
	if err != nil {
		log.Error(err)
		return nil, xerrors.Errorf(`%w`, err)
	}

	assetIdToAssetMap := assetDtos.GroupByDistinctInt(func(asset mysqldto.Asset) int {
		return asset.ID
	})

    // mysql構造体からドメイン構造体に詰め替え
	videoAssets := video_asset.VideoAssets{}
	for _, vaDto := range videoAssetDtos {

		assetDto := assetIdToAssetMap[vaDto.AssetID]
		assetModel := dtoconvert.AssetDtoToModel(assetDto)
		vaModel := dtoconvert.VideoAssetDtoToModel(vaDto, assetModel)
		videoAssets = append(videoAssets, vaModel)

	}

	return videoAssets, nil

}

func (self *implVideoAssetFilter) First() (*video_asset.VideoAsset, error) {
	vas, err := self.Find()
	if err != nil {
		return nil, xerrors.Errorf(`%v`, err)
	}

	if len(vas) == 0 {
		return nil, nil
	}
	return &vas[0], nil

}


  • パフォーマンスが許せば子集約/ValueObjectなテーブルはdelete/insertとかでいいと思います。
  • 検索群メソッドは基本的に集約内で使っているテーブルのカラム以外にアクセスするのは禁止です

コネクションとContextをつなぐ

リポジトリができたのでContextとコネクションの媒介ヘルパーを用意します。

// トランザクション保持用インスタンス
type DB interface {
	GetGormDB() *gorm.DB // registry , injection時しか使っちゃダメです
	Begin() error
	Rollback()
	Commit() error
}

type _db struct {
	gormDB *gorm.DB
}
...

こういうmutableなトランザクション保持用インスタンスのポインタをContextに詰める方向で実装します。
Contextとコネクションをうまく媒介するやつ用意しときます。

db_context_helper.go

type Database struct {
	Name                  string        `yaml:"Name"`
	Dialect               string        `yaml:"Dialect"`
	Role                  string        `yaml:"Role"`
	Addr                  string        `yaml:"Addr"`
	DBName                string        `yaml:"DBName"`
	User                  string        `yaml:"User"`
	Password              string        `yaml:"Password"`
	Net                   string        `yaml:"Net"`
	Location              TIME_LOCATION `yaml:"Location"`
	MaxConnections        int           `yaml:"MaxConnections"`
	MaxIdleConnections    int           `yaml:"MaxIdleConnections"`
	ConnectionMaxLifeTime int           `yaml:"ConnectionMaxLifeTime"` // seconds
	Logging               bool          `yaml:"Logging"`               // true or false
}

func (self *Database) NewConnectionFromPool() DB {

	// ... DB をつくるやつ
}

func (self *Database) Close() DB {

	// ... 持ってるコネクションを全て閉じるやつ。main関数などでアプリの終了時に打つ
}



// DBContextHelper は context.Contextにdao.DBのコネクションを仕込んでおくような場合に動作します。
type DBContextHelper interface {
	GetExistedDB(ctx context.Context) DB

	/*
		GetNewDBContext は分岐した新しいContextにコネクションプールからあいてるコネクションをつめて返します。
		入力した古いContextに入るわけではないので注意.

		## CRUD系API Serverなどの場合
		Handler/Middleware層の認可チェックなどで使いたいならコネクションを取得 (このへんはアプリごとの要件によります)
		_ , ctx := dependency.DBContextHelper.GetNewDBContext(echoContext.Request().Context())

		あるいはトランザクションを貼るUsecaseなどでも新規取得するといいかと思います
		tx, newContext := dependency.DBContextHelper.GetNewDBContext(oldContext)
		tx.Begin()
		videoRepo.Save(newContext , video)
		...

		## バッチ処理など複数トランザクションを扱う場合
		逆にいうと複数トランザクションを扱うアプリではContextが別にすることができます。
		このへんは要件によって.

		conn1, contextWithConn1 := dependency.DBContextHelper.GetNewDBContext(ctx)
		video  , err := videoRepo.GetById(contextWithConn1 , video)

		for _ , message := range messages {
			conn2, contextWithConn2 := dependency.DBContextHelper.GetNewDBContext(contextWithConn1)
			conn2.Begin()
			video  , err := videoRepo.GetById(contextWithConn2 , video)
			conn2.Commit()
		}


	*/
	GetNewDBContext(oldContext context.Context) (db DB, newContext context.Context)
}
...
func NewDBContextHelper(conf Database, contextItemKey string /* ex. __CONTEXT_TRANSACTION__     */) DBContextHelper {

	impl := implMysqlTransactionHelper{
		conf: conf,
		transactionContextItemKey: contextItemKey,
	}

	return &impl

}

func (i *implMysqlTransactionHelper) GetNewDBContext(oldContext context.Context) (db DB, newContext context.Context) {

	contextItem := TransactionContextItem{
		Tx: i.conf.NewConnectionFromPool()
	}

	newContext = context.WithValue(oldContext, i.transactionContextItemKey, &contextItem)
	return contextItem.Tx, newContext

}

type TransactionContextItem struct {
	Tx DB
}

func (i *implMysqlTransactionHelper) GetExistedDB(ctx context.Context) DB {

	contextItem, ok := ctx.Value(i.transactionContextItemKey).(*TransactionContextItem)
	if !ok {
		log.Panic(`transaction context  is null`)
	}
	return contextItem.Tx
}

type implMysqlTransactionHelper struct {
	conf Database 
	transactionContextItemKey string
}


トランザクションを貼ろう

mainパッケージ/レジストリなどのコードでは、repositoryに対してDBContextHelperに生えてるGetExistedDBを使用したコードを渡してあげます。

registry/repository_injection.go
func NewMySqlRepositories(m dao.DBContextHelper) Repositories {
	return NewMySqlRepositoriesByContextTx(m.GetExistedDB.GetGormDB())
}

func NewMySqlRepositoriesByContextTx(getTx func(ctx context.Context) *gorm.DB) Repositories {
	mysqlDeps := &mysql_repository.MysqlDependency{GetTx: getTx}

	r := Repositories{
		...
		VideoAssetRepository:                  mysql_repository.NewVideoAssetRepository(mysqlDeps),

	}

	err := validator.New().Struct(r)
	if err != nil {
		log.Panic(err)
	}
	return r
}

一般的なCRUDサーバの場合だと基本的にusecase層でトランザクションを貼ってます。
(usecaseにたどり着く前にhandler/web層の認可チェックとかでDBに繋ぐ必要がある場合は、そっちはそっちでDBContextHelper/コネクションプールから新規に取得します。)
ここでは、Usecaseでこいつを使います。

アプリのgitリポジトリ/xxx/fail_delivery_usecase.go

type FailDeliveryUsecase interface {
	FailDelivery(ctx context.Context /* 上位のHandlerで渡ってきたリクエストコンテキスト  */, in IoFailDeliveryInput) (*IoFailDeliveryOutput, error)
}

type FailDeliveryUsecaseDependency struct {
	DBContextHelper    dao.DBContextHelper                       `validate:"required"`

	VideoAssetRepository video_asset.VideoAssetRepository               `validate:"required"`
       ...
    // 必要あればセカンダリーへのアクセスも可能にする
    ReadDBContextHelper    dao.DBContextHelper                       `validate:"required"`
}

type implFailDeliveryUsecase struct {
	*FailDeliveryUsecaseDependency
}


func (self *implFailDeliveryUsecase) FailDelivery(_ctx context.Context, in IoFailDeliveryInput) (*IoFailDeliveryOutput, error) {

	tx, txCtx := self.DBContextHelper.GetNewDBContext(_ctx)

	tx.Begin()
        
	....
        // txCtx(context.Contextt)を使っていろんなリポジトリへの保存処理とロールバック
	tx.Commit()
	return &IoFailDeliveryOutput{}, nil
}

...

Usecaseでのトランザクション管理が可能になりました。
もちろん事情によっては複数トランザクション対応や別DB用に別のContextに分岐させることも可能です。

疑問

Q. 排他のパターンはあるか

うちではやらずにリトライとかで済んでるとこもありますが、DDDに詳しい/わりと現実的な戦術の温度感を示してくれる 松岡さんのkotlinのコード実例を参考にするとこういう感じになるかな?と思っています。

type VideoAssetFilter interface {
        ...
	Find(enableLock ...bool) (VideoAssets, error)
	First(enableLock ...bool) (*VideoAsset, error)
}

enableLockがtrueなら SELECT FOR UPDATE で取得する、みたいな感じらしいです。

Q. context.Contextだの関数DIだのわかりづらくない?

バックエンドのデータストアやORM置き換えるかっていうと置き換えなくね?みたいな話もあるかと思うので、ORMのオブジェクトやオレオレトランザクショナルオブジェクトとかを直接引数で渡したほうが素直って考え方もあるかもしれません。
教条的DDDやクリーンアーキテクチャに大きなトランザクションを許したとたん、層ごとの関心の分割ってあるようでない温度感と妥協の世界にはなってしまうので。

Q. 複雑なJOINや集約外の他テーブル使いたいんだけど

Read専用構造体/マテビューやクエリサービス使えってことにはなってます。
Repositoryは本当は検索用でなく保存単位を死守するパターン。
このへんの温度感の答えは正直あんまりでてないけどチーム内で相談、ですかね。

Q. 複数モデルで同じentityのテーブルを更新するのはありか

reviewer,developerみたいなモデルとそれぞれのリポジトリがあったとして、実質は両集約はadminテーブルとadmin_idである、みたいなやつですね。
ちょうど今日この話出たんですが現実に管理可能なんですかね?
答えでてないです。

Q. 詰め替えいる?

今回のスコープとは外れますが一時のめんどくささに対して得られるものが大きいので業務ロジックメインのアプリは割とおすすめです。
一方コードで語るよりデータベースモデリングがほとんど変数置き場でいいって考え方もあるので、それはそれでひとつのわかりやすさかなーとも。

まとめ

わりと実装ぼかしてるところもあるので妙にエクスキューズの多い記事になってしまいました。
うちではこうしてる!ってのもっと知りたいですね。

参考になりそうな記事:
Goとクリーンアーキテクチャとトランザクションと
【Go言語】DBトランザクション共通処理の具体例

18
9
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
18
9

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?