Wano Group Advent Calendar 2021 の記事です。
教義通りにRepositoryでトランザクション貼ってるDDDやらレイヤードアーキテクチャやら関心の分離やらの記事はあれど、Repisitoryをまたいだトランザクションのサンプルってそんなに多くないですね。
多くのスタートアップで現実的には向き合わなきゃいけなさそうな気もするんですが、みんな分散トランザクションの天才とかなのかしら...(
一部プロジェクトでやってるトランザクションのパターンを本稿では扱おうと思います。
要約
- トランザクションは主に各アプリのusecaseで貼ってる
- context.Contextにmutableなトランザクションオブジェクトを詰めてる
- repositoryにはcontextからORMのオブジェクトを復元する関数自体をDIしてる
前段:保存したいモデル/RDB上の構成/Goのパッケージ構成
こんなかんじのmodelで、この中の video_asset.VideoAsset
を保存したいとします。
今回データベースはMySQLとします。
モデルに対応する部分は簡易ER図でいうとこんな感じです
上記を実装するVideoAssetリポジトリ周りのGoのパッケージ構成はこんなものであるとします
pkg/domain/video_asset
// video asset のモデルとかここでしかつかわないvalue objectとかrepositoryのinterfaceとか
├── validate.go
├── video_asset.go
├── video_asset.go_linq_generated.go
└── video_asset.repository.go
...
pkg/infra/adapter/domain/mysql_repository
// repositoryのmysql実装
├── 00_init_test.go
├── 020_video_asset.repository_test.go
├── mysqldto
│ ├── 0README.md
│ ├── 00_go_generate.go
│ ├── asset.go
│ ├── user.go
│ ├── video_asset.go
│ ├── video_asset_crop.go
...
├── video_asset.repository.go
...
pkg/domain/vo
// value objectパッケージ / 共通モデル
├── 00_README.md
├── dconst
...
│ ├── s3_storage_class.go
│
...
├── user_id.go
└── video_asset_id.go
保存したいモデル群です。
...
// aggregate root
type VideoAsset struct {
ID vo.VideoAssetID // aggregate id
Asset vo.Asset
FfprobeInfo vo.NullFFprobeJson
VideoAssetCrop *VideoAssetCrop
}
type VideoAssets []VideoAsset
type VideoAssetCrop struct {
// 黒枠(crop) の確定データ
Top int
Bottom int
Left int
Right int
DetectedCrops VideoAssetCropByFrameJson
}
// フレームごとの黒枠(crop)情報
type VideoAssetCropByFrameJson string
func (self VideoAssetCropByFrameJson) ToCropOnTimeFrames() []CropOnTimeFrame {
target := make([]CropOnTimeFrame, 0)
_ = json.Unmarshal([]byte(self), &target)
return target
}
type CropOnTimeFrame struct {
TimeSeconds float64 `json:"time_seconds"`
Top int `json:"top"`
Bottom int `json:"bottom"`
Left int `json:"left"`
Right int `json:"right"`
}
// パフォーマンス要件等ない限り共通objectにidentifierは持たない
type Asset struct {
UserID UserID
Bucket string
Location string
MimeType string
UploadDatetime null.Time
Filename null.String
S3StorageClassID dconst.S3StorageClassID
IsExist bool
FileSizeByte typedef.ByteSize
Md5Hash null.String
}
func (self *Asset) RecommendedMimeType() string {
fileName := self.Filename.String
return mime.TypeByExtension(filepath.Ext(fileName))
}
...
Repositoryの定義
ここから本題です。
Repositoryの定義をします。
集約内のフィールド情報だけを用いたあんまり複雑じゃない最低限の検索メソッドはこの際許すこととして,interfaceを定義します。
package video_asset
...
//go:generate mockgen -source=${GOFILE} -destination=../../infra/adapter/domain/mock_repository/mock_${GOFILE}_generated.go -package=mock_repository
type VideoAssetRepository interface {
Save(ctx context.Context, videoAsset *VideoAsset) (*VideoAsset, error)
DeleteByIDs(ctx context.Context, id ...vo.VideoAssetID) (deleted VideoAssets, err error)
Where(ctx context.Context) VideoAssetFilter
}
type VideoAssetFilter interface {
IDIn(id ...vo.VideoAssetID) VideoAssetFilter
UserIDIn(userId ...vo.UserID) VideoAssetFilter
// asset
AssetBucketNameIn(bucket ...string) VideoAssetFilter
AssetLocationIn(location ...string) VideoAssetFilter
AssetS3StorageClassIDIN(s3Class ...dconst.S3StorageClassID) VideoAssetFilter
AssetFileSizeByteGreaterThan(byteSize int64) VideoAssetFilter
AssetIsExist(b bool) VideoAssetFilter
AssetMimeTypeIn(mimeType ...string) VideoAssetFilter
// get
Find() (VideoAssets, error)
First() (*VideoAsset, error)
}
Save(ctx context.Context, videoAsset *VideoAsset) (*VideoAsset, error)
時にオブジェクトを返していますが、これはvideo_assetテーブルにおけるIDがauto incrementであることによる採番の事情が色濃く入っていて、できるならULIDとかで採番し、Save(ctx context.Context, videoAsset *VideoAsset) error
とかのが良さそうです。
Repository実装
mysql版実装です。
コネクションはContextに詰める想定です。
また、ORMはgormを使っています。
Repositoryには、context.Contextからmysqlのconnectionをとる関数自体をDIしています。
type GetTx = func(ctx context.Context) *gorm.DB
type MysqlDependency struct {
GetTx GetTx `validate:"required"`
}
package mysql_repository
...
func NewVideoAssetRepository(deps *MysqlDependency) video_asset.VideoAssetRepository {
err := validator.New().Struct(deps)
if err != nil {
log.Panic(err)
}
impl := implVideoAssetRepository{
MysqlDependency: deps,
}
return &impl
}
type implVideoAssetRepository struct {
*MysqlDependency
}
func (self *implVideoAssetRepository) Save(ctx context.Context, videoAsset *video_asset.VideoAsset) (*video_asset.VideoAsset, error) {
// asset レコードを delete/insertするコード
// video_assetを保存するコード
....
return self.Where(ctx).IDIn(vo.VideoAssetID(vaDto.ID)).Single()
}
func (self *implVideoAssetRepository) DeleteByIDs(ctx context.Context, id ...vo.VideoAssetID) (deleted video_asset.VideoAssets, err error) {
// ... assetを物理削除してからvideo_assetを削除するコード
...
}
func (self *implVideoAssetRepository) Where(ctx context.Context) video_asset.VideoAssetFilter {
// 綺麗な状態のcloneを作ること
tx := self.GetTx(ctx).New()
return newVideoAssetFilter(tx)
}
func newVideoAssetFilter(gormDB *gorm.DB) video_asset.VideoAssetFilter {
gormDB = gormDB.Table(`video_asset`).
Joins(`INNER JOIN asset ON video_asset.asset_id = asset.id`)
return &implVideoAssetFilter{
db: gormDB,
}
}
type implVideoAssetFilter struct {
db *gorm.DB
}
func (self *implVideoAssetFilter) IDIn(id ...vo.VideoAssetID) video_asset.VideoAssetFilter {
self.db = self.db.Where(`video_asset.id IN (?)`, id)
return self
}
func (self *implVideoAssetFilter) UserIDIn(userId ...vo.UserID) video_asset.VideoAssetFilter {
self.db = self.db.Where(`asset.user_id IN (?)`, userId)
return self
}
func (self *implVideoAssetFilter) AssetBucketNameIn(bucket ...string) video_asset.VideoAssetFilter {
self.db = self.db.Where(`asset.bucket IN (?)`, bucket)
return self
}
...
func (self *implVideoAssetFilter) AssetFileSizeByteLessThanOrEqual(byteSize int64) video_asset.VideoAssetFilter {
self.db = self.db.Where(`asset.file_size_byte <= ?`, byteSize)
return self
}
func (self *implVideoAssetFilter) AssetIsExist(b bool) video_asset.VideoAssetFilter {
self.db = self.db.Where(`asset.is_exist = ?`, b)
return self
}
func (self *implVideoAssetFilter) Find() (video_asset.VideoAssets, error) {
videoAssetDtos := mysqldto.VideoAssetSlice{}
err := self.db.Find(&videoAssetDtos).Error
if err != nil {
return nil, xerrors.Errorf(`%w`, err)
}
assetIds := videoAssetDtos.SelectInt(func(va mysqldto.VideoAsset) int {
return va.AssetID
})
newCmd := self.db.New()
assetDtos := mysqldto.AssetSlice{}
err = newCmd.Raw(`
SELECT *
FROM asset
WHERE id IN (?)
`, assetIds).Scan(&assetDtos).Error
if err != nil {
log.Error(err)
return nil, xerrors.Errorf(`%w`, err)
}
assetIdToAssetMap := assetDtos.GroupByDistinctInt(func(asset mysqldto.Asset) int {
return asset.ID
})
// mysql構造体からドメイン構造体に詰め替え
videoAssets := video_asset.VideoAssets{}
for _, vaDto := range videoAssetDtos {
assetDto := assetIdToAssetMap[vaDto.AssetID]
assetModel := dtoconvert.AssetDtoToModel(assetDto)
vaModel := dtoconvert.VideoAssetDtoToModel(vaDto, assetModel)
videoAssets = append(videoAssets, vaModel)
}
return videoAssets, nil
}
func (self *implVideoAssetFilter) First() (*video_asset.VideoAsset, error) {
vas, err := self.Find()
if err != nil {
return nil, xerrors.Errorf(`%v`, err)
}
if len(vas) == 0 {
return nil, nil
}
return &vas[0], nil
}
- パフォーマンスが許せば子集約/ValueObjectなテーブルはdelete/insertとかでいいと思います。
- 検索群メソッドは基本的に集約内で使っているテーブルのカラム以外にアクセスするのは禁止です。
コネクションとContextをつなぐ
リポジトリができたのでContextとコネクションの媒介ヘルパーを用意します。
// トランザクション保持用インスタンス
type DB interface {
GetGormDB() *gorm.DB // registry , injection時しか使っちゃダメです
Begin() error
Rollback()
Commit() error
}
type _db struct {
gormDB *gorm.DB
}
...
こういうmutableなトランザクション保持用インスタンスのポインタをContextに詰める方向で実装します。
Contextとコネクションをうまく媒介するやつ用意しときます。
type Database struct {
Name string `yaml:"Name"`
Dialect string `yaml:"Dialect"`
Role string `yaml:"Role"`
Addr string `yaml:"Addr"`
DBName string `yaml:"DBName"`
User string `yaml:"User"`
Password string `yaml:"Password"`
Net string `yaml:"Net"`
Location TIME_LOCATION `yaml:"Location"`
MaxConnections int `yaml:"MaxConnections"`
MaxIdleConnections int `yaml:"MaxIdleConnections"`
ConnectionMaxLifeTime int `yaml:"ConnectionMaxLifeTime"` // seconds
Logging bool `yaml:"Logging"` // true or false
}
func (self *Database) NewConnectionFromPool() DB {
// ... DB をつくるやつ
}
func (self *Database) Close() DB {
// ... 持ってるコネクションを全て閉じるやつ。main関数などでアプリの終了時に打つ
}
// DBContextHelper は context.Contextにdao.DBのコネクションを仕込んでおくような場合に動作します。
type DBContextHelper interface {
GetExistedDB(ctx context.Context) DB
/*
GetNewDBContext は分岐した新しいContextにコネクションプールからあいてるコネクションをつめて返します。
入力した古いContextに入るわけではないので注意.
## CRUD系API Serverなどの場合
Handler/Middleware層の認可チェックなどで使いたいならコネクションを取得 (このへんはアプリごとの要件によります)
_ , ctx := dependency.DBContextHelper.GetNewDBContext(echoContext.Request().Context())
あるいはトランザクションを貼るUsecaseなどでも新規取得するといいかと思います
tx, newContext := dependency.DBContextHelper.GetNewDBContext(oldContext)
tx.Begin()
videoRepo.Save(newContext , video)
...
## バッチ処理など複数トランザクションを扱う場合
逆にいうと複数トランザクションを扱うアプリではContextが別にすることができます。
このへんは要件によって.
conn1, contextWithConn1 := dependency.DBContextHelper.GetNewDBContext(ctx)
video , err := videoRepo.GetById(contextWithConn1 , video)
for _ , message := range messages {
conn2, contextWithConn2 := dependency.DBContextHelper.GetNewDBContext(contextWithConn1)
conn2.Begin()
video , err := videoRepo.GetById(contextWithConn2 , video)
conn2.Commit()
}
*/
GetNewDBContext(oldContext context.Context) (db DB, newContext context.Context)
}
...
func NewDBContextHelper(conf Database, contextItemKey string /* ex. __CONTEXT_TRANSACTION__ */) DBContextHelper {
impl := implMysqlTransactionHelper{
conf: conf,
transactionContextItemKey: contextItemKey,
}
return &impl
}
func (i *implMysqlTransactionHelper) GetNewDBContext(oldContext context.Context) (db DB, newContext context.Context) {
contextItem := TransactionContextItem{
Tx: i.conf.NewConnectionFromPool()
}
newContext = context.WithValue(oldContext, i.transactionContextItemKey, &contextItem)
return contextItem.Tx, newContext
}
type TransactionContextItem struct {
Tx DB
}
func (i *implMysqlTransactionHelper) GetExistedDB(ctx context.Context) DB {
contextItem, ok := ctx.Value(i.transactionContextItemKey).(*TransactionContextItem)
if !ok {
log.Panic(`transaction context is null`)
}
return contextItem.Tx
}
type implMysqlTransactionHelper struct {
conf Database
transactionContextItemKey string
}
トランザクションを貼ろう
mainパッケージ/レジストリなどのコードでは、repositoryに対してDBContextHelperに生えてるGetExistedDBを使用したコードを渡してあげます。
func NewMySqlRepositories(m dao.DBContextHelper) Repositories {
return NewMySqlRepositoriesByContextTx(m.GetExistedDB.GetGormDB())
}
func NewMySqlRepositoriesByContextTx(getTx func(ctx context.Context) *gorm.DB) Repositories {
mysqlDeps := &mysql_repository.MysqlDependency{GetTx: getTx}
r := Repositories{
...
VideoAssetRepository: mysql_repository.NewVideoAssetRepository(mysqlDeps),
}
err := validator.New().Struct(r)
if err != nil {
log.Panic(err)
}
return r
}
一般的なCRUDサーバの場合だと基本的にusecase層
でトランザクションを貼ってます。
(usecaseにたどり着く前にhandler/web層の認可チェックとかでDBに繋ぐ必要がある場合は、そっちはそっちでDBContextHelper/コネクションプールから新規に取得します。)
ここでは、Usecaseでこいつを使います。
type FailDeliveryUsecase interface {
FailDelivery(ctx context.Context /* 上位のHandlerで渡ってきたリクエストコンテキスト */, in IoFailDeliveryInput) (*IoFailDeliveryOutput, error)
}
type FailDeliveryUsecaseDependency struct {
DBContextHelper dao.DBContextHelper `validate:"required"`
VideoAssetRepository video_asset.VideoAssetRepository `validate:"required"`
...
// 必要あればセカンダリーへのアクセスも可能にする
ReadDBContextHelper dao.DBContextHelper `validate:"required"`
}
type implFailDeliveryUsecase struct {
*FailDeliveryUsecaseDependency
}
func (self *implFailDeliveryUsecase) FailDelivery(_ctx context.Context, in IoFailDeliveryInput) (*IoFailDeliveryOutput, error) {
tx, txCtx := self.DBContextHelper.GetNewDBContext(_ctx)
tx.Begin()
....
// txCtx(context.Contextt)を使っていろんなリポジトリへの保存処理とロールバック
tx.Commit()
return &IoFailDeliveryOutput{}, nil
}
...
Usecaseでのトランザクション管理が可能になりました。
もちろん事情によっては複数トランザクション対応や別DB用に別のContextに分岐させることも可能です。
疑問
Q. 排他のパターンはあるか
うちではやらずにリトライとかで済んでるとこもありますが、DDDに詳しい/わりと現実的な戦術の温度感を示してくれる 松岡さんのkotlinのコード実例を参考にするとこういう感じになるかな?と思っています。
type VideoAssetFilter interface {
...
Find(enableLock ...bool) (VideoAssets, error)
First(enableLock ...bool) (*VideoAsset, error)
}
enableLockがtrueなら SELECT FOR UPDATE
で取得する、みたいな感じらしいです。
Q. context.Contextだの関数DIだのわかりづらくない?
バックエンドのデータストアやORM置き換えるかっていうと置き換えなくね?みたいな話もあるかと思うので、ORMのオブジェクトやオレオレトランザクショナルオブジェクトとかを直接引数で渡したほうが素直って考え方もあるかもしれません。
教条的DDDやクリーンアーキテクチャに大きなトランザクションを許したとたん、層ごとの関心の分割ってあるようでない温度感と妥協の世界にはなってしまうので。
Q. 複雑なJOINや集約外の他テーブル使いたいんだけど
Read専用構造体/マテビューやクエリサービス使えってことにはなってます。
Repositoryは本当は検索用でなく保存単位を死守するパターン。
このへんの温度感の答えは正直あんまりでてないけどチーム内で相談、ですかね。
Q. 複数モデルで同じentityのテーブルを更新するのはありか
reviewer,developerみたいなモデルとそれぞれのリポジトリがあったとして、実質は両集約はadminテーブルとadmin_idである、みたいなやつですね。
ちょうど今日この話出たんですが現実に管理可能なんですかね?
答えでてないです。
Q. 詰め替えいる?
今回のスコープとは外れますが一時のめんどくささに対して得られるものが大きいので業務ロジックメインのアプリは割とおすすめです。
一方コードで語るよりデータベースモデリングがほとんど変数置き場でいいって考え方もあるので、それはそれでひとつのわかりやすさかなーとも。
まとめ
わりと実装ぼかしてるところもあるので妙にエクスキューズの多い記事になってしまいました。
うちではこうしてる!ってのもっと知りたいですね。
参考になりそうな記事:
Goとクリーンアーキテクチャとトランザクションと
【Go言語】DBトランザクション共通処理の具体例