概要
システムをDDDで組み上げる時に、各レイヤー(RepositoryやEntityなど)をscalazやcatsのような関数型ライブラリを用いてどのように組むと良さそうかを試してみました。
あくまで個人的に、Domainから如何にApplication側の都合を隠すか、各レイヤーの役割を型でなるべく表現・制約できないか、などを考えたものになります。
なお、ドメインの詳細に立ち入ると長くなる&自信のないものもあるので、ここでは各レイヤー・パーツ毎にどう考えたか、という実装パターンを記すにとどめます。
全体像
DDD本を読んで、今回システムを組む中で設計したのは以下です。
- Domain層
- Entity
- Value Object(VO)
- Service
- Repository(interface)
- Application層(Web Server)
- Repository(implementation)
- Application Service(Use Case)
- Controller
全体像としてはこちらの図を参考にしています。
Domain層
Value Object(VO)
役割
・あるオブジェクトの属性として振る舞う。
・同じ値であれば区別する必要がない。
設計方針
・equalsでは同値かどうかを判定する
・オブジェクトを作る際に不整合なものを作らないようにする
実装例
場合分けするような属性の場合、不正な値(例えば「山々県」とか)のオブジェクトを作りたくないのでJavaでいうEnumの要領で作ります。
sealed abstract class Prefecture(val name: String) extends Product with Serializable
object Prefecture {
case object `北海道` extends Prefecture("北海道")
case object `青森県` extends Prefecture("青森県")
case object `岩手県` extends Prefecture("岩手県")
...
val values = Seq(`北海道`, `青森県`, `岩手県`, ...)
def apply(name: String): Option[Prefecture] = values.find(_.name == name)
}
特にIDは持たないオブジェクトについては、状態の整合性に気をつけつつ作ります。
import cats.data.ValidatedNel
import cats.syntax.contravariantSemigroupal._
import cats.syntax.validated._
// 出る目の確率が細工されているサイコロオブジェクト
sealed abstract case class CheatDice(
ratioOf1: BigDecimal,
ratioOf2: BigDecimal,
ratioOf3: BigDecimal,
ratioOf4: BigDecimal,
ratioOf5: BigDecimal,
ratioOf6: BigDecimal
)
object CheatDice {
def create(
ratioOf1: BigDecimal,
ratioOf2: BigDecimal,
ratioOf3: BigDecimal,
ratioOf4: BigDecimal,
ratioOf5: BigDecimal,
ratioOf6: BigDecimal
): ValidatedNel[IllegalArgumentException, CheatDice] = {
type ValidationResult[A] = ValidatedNel[IllegalArgumentException, A]
val validateRatios: ValidationResult[Seq[BigDecimal]] = {
val ratioSeq = Seq(ratioOf1, ratioOf2, ratioOf3, ratioOf4, ratioOf5, ratioOf6)
if (ratioSeq.exists(ratio => ratio < BigDecimal(0) || ratio > BigDecimal(100))) {
new IllegalArgumentException("確率はいずれも0~100%にしてください").invalidNel
} else if (ratioSeq.sum != BigDecimal(100)) {
new IllegalArgumentException("確率の合計が100%になっていません").invalidNel
} else ratioSeq.validNel
}
validateRatios.map(ratioSeq => {
new CheatDice(ratioSeq(0),
ratioSeq(1),
ratioSeq(2),
ratioSeq(3),
ratioSeq(4),
ratioSeq(5)) {}
})
}
}
例えばイカサマ用に、出る目の確率を変えられるサイコロオブジェクトがあった場合に、各目が出る確率は100%以下でなくてはいけません。また同時に、全ての確率の合計は100%でなくてはいけません。
そこで、このようにValidationを掛けることで、不正な入力の時には作成が失敗し、かつどんな理由でダメだったのかをアプリケーション側に知らせることが出来ます。
備考
・createの返り値に ValidatedNel[E, A]
などを置いていますが、
・Either
にすべきかValidatedNel
にすべきか
・他のVOのエラーも合成したいので組み合わせられるようにValidatedNel
にしている
・Error時の型を、具象クラスを書くか、DomainError
など抽象化したものにするか
・具象にした方が便利かなと思いつつ、関数型ライブラリがinvariantな型のAPIを持っていることが多いため合成しにくくなる。。
Entity
役割
・集約の中心(Domainの関心事)
・属性ではなくIDでもって識別される(属性は変わりうる)
設計方針
・生成方法はVOと同じ。
・生成処理が複雑になるならFactoryに処理を移譲するようだが、個人的には後述のServiceで十分でFactoryを使うケースがよくわからない。。
・属性の更新処理もあるので、生成・更新含めて不整合な状態にならないようにする。
実装例
例えばTag Entityであれば、テキストを更新する必要があるかもしれません。その場合にもバリデーションをかけたいのでこのように実装します。
sealed abstract case class Tag(
id: TagId,
title: String,
) {
def updateText(
text: String,
): Either[NonEmptyList[IllegalArgumentException], Tag] = {
type ValidationResult[A] = ValidatedNel[IllegalArgumentException, A]
val validateText: ValidationResult[String] = {
if (text.isEmpty) {
new IllegalArgumentException("内容が空になっています").invalidNel
} else text.validNel
}
val tag = validateText.map(text => new Tag(id, text){})
tag.toEither
}
}
object Tag {
def create(
id: TagId,
text: String,
): Either[NonEmptyList[IllegalArgumentException], Tag] = {...}
}
備考
・Entityの生成エラーはVOと違いエラーを追加したりしないのでEIther
で返している
・equals
やhashcode
をどう定義するかについてはIDのみでも良いのかなと思うものの、Collectionの操作などを気にして普通のcase classにしています。
Repository(interface)
役割
・ドメイン層(Serviceなど)からRepositoryを呼び出すためのAPI定義
・実装と定義を分けることで、Repositoryの実装(RDB接続など)にドメインが依存しなくなる(DIP)
設計方針
・Repositoryは集約ごとに実装する。
・集約がトランザクション境界になっているべき
・基本的にはEntityが集約単位なので、Entityを操作(保存・更新・削除)することになる
実装例
例えばTag Entityを扱うRepositoryでCRUDをしたい場合はこのようになるでしょうか。
catsのIO[_]
は、ExecutionContextを使わずに合成できるFuture[_]
と思ってもらえればいいです。
import cats.data.NonEmptyList
import cats.effect.IO
trait TagRepository {
def fetchNextId(): IO[TagId]
def delete(tag: Tag): IO[Unit]
def create(tag: Tag): IO[Unit]
def update(tag: Tag): IO[Unit]
def findById(id: TagId): IO[Option[Tag]]
def findAll(): IO[List[Tag]]
def findByText(text: String): IO[Option[Tag]]
}
なお、EntityのIDについては一般にSequentialな数字のものと、衝突しないように設計されたランダム文字列などが使われると思います。
このRepositoryでは前者を想定しており、Repositoryから次のIDを採番してくることにしています。ここで取得されたIDを使ってEntityを生成し、Repository#createを呼びます。
備考
・メソッド名をcreateとupdateで分けている。
・更新のつもりで新規作成するなどを防ぎたい
・updateText
などのメソッドを用意すると、不要な更新を抑えることで実装側で最適化をかけやすい
・Context(ConnectionやSessionなど)を定義すべきか
・トランザクションは主にRDBという実装都合なので入れないつもりでしたが、ドメイン側として一貫性を確保したいなら明示したほうが良いかも
・例えば上記の例だと、「findByText
して同名なタグがなければcreate
」をしたい場合はトランザクションの概念を持った実装を強制するのが自然かもしれない。
・「create
の中で採番してTagオブジェクトを作れば良いのでは?」
・Entityの生成をRepositoryでやるとvalidationエラーも返すかもしれません。
・実装側でrollback処理を掛けたい時にIOのエラーだけでなくほかも考慮する必要があり、難解になるので割けています。
Service
役割
・ドメインの一連の操作を行う処理ロジック
・主にUse Case
から使われる。
・Repositoryを受け取って処理をする。この際、Use Case
側でトランザクションを掛けても良い。
・使う側がドメイン理解が浅くても使えるAPIにしたい
設計方針
・合成しやすいようにする
・実装に依存しない
・トランザクションはServiceの各API内に閉じる
・外までトランザクションが広がると、呼び出し側の方で不整合が起きないように理解して呼ばないといけない
実装例
例えばTagの生成を担うメソッドではこのような形に。
import cats.data.{EitherT, NonEmptyList, OptionT, Reader}
import cats.effect.IO
object TagService {
def create(text: String): Reader[TagRepository, IO[Either[NonEmptyList[Exception], Tag]]] =
Reader { repository =>
val result = for {
_ <- EitherT(repository.findByText(text).map {
case None => Left(NonEmptyList(TagAlreadyExistsException(text), Nil))
case Some(_) => Right()
})
tagId <- EitherT.liftF(repository.fetchNextId())
tag <- EitherT.fromEither[IO](Tag.create(tagId, text))
_ <- EitherT.liftF(repository.create(tag))
} yield tag
result.value
}
}
createメソッドにtextだけ渡すと、
- 既に同名のタグがあるかチェックして
- IDを採番して
- Tag Entityを作成して
- それを新規保存する
といった一連の流れを行います。
このときトランザクションが必要になりますがrepository
がSessionを持つことで、create
メソッドをトランザクション境界にできます。(詳細は後述のRepository(implementation)
へ。)
また、「既に同名のタグがある」 / 「Tagのバリデーションエラー」などドメイン側で問題が置きた場合にはそれを返せるようになっています。
さらに、Repositoryは実行時に渡してもらう形にすることで、実装に依存せずテストもしやすくなります。
以上の要件を踏まえて、返り値の型が定義されています。
とても長いのですが順番に、
-
Reader[TagRepository, A]
- Repositoryを実行時に渡してもらう
-
IO[A]
- Repositoryの操作は非同期
-
Either[NonEmptyList[Exception], Tag]]
- ドメインロジックが全て成功したらTagが返り、失敗したらエラー内容が返る
という形になっています。
備考
・トランザクション境界を型で表現したほうが良いか
・実装例ではSessionはRepositoryが持つことにしていますが、そもそもドメインロジックとして「同名のタグがあったらエラーを返す」と書いているのでトランザクションはロジック的に必須かもしれない
・その場合、Context(transactionを扱う抽象クラス)を引数に与えて、repository側はdef findByText(text: String)(implicit context: Context)
などの定義になる
・Readerモナドを使いましたが、Serviceを複数組み合わせたいニーズは今の所ない。
・複雑なドメインだと、ServiceのAPI同士を合わせて一つのトランザクションで処理したいなどあるかもしれない。
・引数にrepositoryを取るほうがわかりやすいかもしれない。
・Serviceに渡したいパラメータが多い場合にどうするか
・ただのパラメータの入れ物としての~~Request
オブジェクトを定義しています。
・引数が多くなるのが良いのか、変換処理があってもまとめたほうが良いのかはよく分からず
Repository(implementation)
ここからApplication(非ドメイン)層です。
役割
・具体的なDBを扱うための実装都合なコードをカプセル化する。
・パフォーマンスを最適化する
・各APIで一貫性を保つ必要がある
設計方針
・MySQLやS3など具体的なDBを扱うことを明示する
・interfaceには無かったが必要なもの(DBSessionなど)はコンストラクタで受け取る
・どのExecutionContextで実行するかなども決める
実装例
例としてTagRepositoryのRDB実装を取り上げます。
この場合、interfaceには無かったが必要なものとして、DBSession
やExecutionContext
があります。
これらを含んだrepository instanceを作りたいので、Factoryを用意します。
import cats.effect.IO
import scalikejdbc._
import scala.concurrent.{ExecutionContext, Future}
trait TagRepositoryImplFactory {
def newInstance(dbSession: DBSession, rdbPool: ExecutionContext, processPool: ExecutionContext): TagRepository =
new TagRepositoryImpl(dbSession, rdbPool, processPool)
}
class TagRepositoryImpl(_dbSession: DBSession, _rdbPool: ExecutionContext, _defaultPool: ExecutionContext) extends TagRepository {
implicit val dbSession: DBSession = _dbSession
val rdbPool: ExecutionContext = _rdbPool
val defaultPool: ExecutionContext = _defaultPool
override def findAll(): IO[List[Tag]] = {
Future {
TagDao.list()
.map(t => Tag.create(t.id, t.text).right.get)
}(rdbPool).toIO(defaultPool)
}
override def create(tag: Tag): IO[Unit] = {
Future {
TagDao.create(tag)
}(rdbPool).toIO(defaultPool)
}
...
}
ここではTagDao(RDBのTagテーブルを操作する)を呼び出しているだけのシンプルなものですが、集約によっては複数のテーブルを同時に編集するでしょう。その場合は複数のDaoを操作することになります。
このとき、DBSessionを引き回すことでトランザクションを扱っています。
また、チューニングをしようとするとexecutionContextを扱う必要も出てきます。
ドメイン層ではcats.effect.IO
型を使っていたため気にしませんでしたが、コネクション数なども考慮してDB操作用のContextとドメインロジック用のContextを分けたくなることもあるでしょう。
上記実装例では、DBの操作においてはrdbPool
を、その後(ドメインロジックで使われる)はdefaultPool
を使うようにしています。
備考
・これまでも何度か書いていますが、トランザクション境界を定義するのはDomain側かもしれません。
・その場合、Repository(interface)
やServiceでDBSessionのようなものを扱う必要があるかもしれません。
・複数のDBを扱いたい場合(特にファイル操作。例えばS3にファイルを保存し、その参照をRDBで持つ)にRepositoryを分けるべき?
・DBをまたいでトランザクションを敷くのは難しいので結果整合性を受け入れざるを得ない
・DB側で一時的に不整合になるかもしれない
・不整合にならないとしても、ゴミファイルやレコードは残るので何らか対応する必要があるかも
・そもそもファイルを扱うことがドメインロジックなのか実装都合なのかよくわからない。
・「エクセルファイルをアップロード出来る。ダウンロードも出来る。」などの要件はよくあるが、ドメインなのかUIの都合なのか。。
・個人的には、Repositoryを分けたほうが良いかもと思ってきています。
・とある集約を扱うときに毎回バイナリを扱いたいわけではないことが多い
・(実装都合ですが)パフォーマンスにも影響がある
・RDB側でrollbackなどした場合にリカバリ処理を混ぜやすい。(不整合を解消しやすい)
Application Service(Use Case)
役割
- Repositoryに(実装都合の)必要な情報を含める
- ドメインの
Service
を呼び出す - ドメイン特有のオブジェクトと、クライアントとやりとりするオブジェクトとの変換を行う
設計方針
- 設定情報やExecutionContextを使ってドメインロジックを動かす
- なるべくロジックを含めない
実装例
ここでは、Repository実装と同じくScalikeJDBC(futureLocalTx
)を使っています。
また、Guiceを用いて設定情報やJDBCのConnectionPool、actorSystemなどをDIしています。
それら実装都合の情報を使ってRepositoryを生成し、ドメインのService
に食わせてます。
@Singleton
class TagAppServiceImpl @Inject()(
appDBConnection: AppDBConnection,
applicationConf: ApplicationConf,
tagRepositoryJDBCFactory: TagRepositoryJDBCFactory,
actorSystem: ActorSystem) extends TagAppService {
private[this] val ioContext = actorSystem.dispatchers.lookup("akka.actor.io-dispatcher")
private[this] val defaultContext = actorSystem.dispatchers.lookup("akka.actor.default-dispatcher")
override def delete(id: TagId): Future[Unit] = {
appDBConnection.db.futureLocalTx { implicit session =>
val repo = tagRepositoryJDBCFactory.newInstance(session, ioContext, defaultContext)
repo.delete(id).unsafeToFuture()
}(defaultContext)
}
override def create(text: String): Future[Either[NonEmptyList[DomainException], Tag]] = {
appDBConnection.db.futureLocalTx { implicit session =>
val repo = tagRepositoryJDBCFactory.newInstance(session, ioContext, defaultContext)
TagService.create(text)
.run(repo).unsafeToFuture()
}(defaultContext)
}
}
特にロジックが無いように作れると思います。場合によっては複数のドメインサービスを呼ぶことになるかもしれませんが、それぞれ別のDBSessionを使うことでトランザクション境界を明確にできそうです。
また、DB操作でエラーが置きた場合、ScalikeJDBCのfutureLocalTx
がFutureのfailを検知してrollbackしてくれます。
備考
・DBSessionの生成をここで行わないほうが良いかもしれない?
・本記事でも何度か述べましたが、トランザクション境界はドメインロジックで管理すべきかもしれません。
・その場合、Repository
の方で、sessionStart()
やrollback
などを定義し、Service
でそれを呼ぶことで対応出来るかも
Controller
ここではWebサーバーのような、Clientとやりとりするものを想定します。
役割
- Clientからのリクエストをdeserializeし
Use Case
に渡す -
Use Case
からの返り値をSerializeしてClientに返す
設計方針
- Futureのエラーなどは5XXで返す
- Either Leftなどドメイン層のエラーであれば4XXで返す
実装例
設計方針そのままです。処理に失敗するとフレームワーク(Akka HTTP)側で500を返します。
@Singleton
class TagController @Inject()(service: TagAppService, actorSystem: ActorSystem) extends CustomValidationDirectives with CustomJsonFormat {
implicit val context: ExecutionContext = actorSystem.dispatchers.lookup("akka.actor.default-dispatcher")
private[controller] def create(): Route = {
validate(as[TagCreateRequest]) { validatedReq =>
onSuccess(service.create(validatedReq.text)) {
case Left(nel) => {
val error = nel.toList.map(e => e.getMessage)
complete(StatusCodes.BadRequest, ErrorResponse("入力が正しくありません", error))
}
case Right(tag) => complete(TagDto(tag.id, tag.text))
}
}
}
private[controller] def list(): Route = {
onSuccess(service.list()) { seq =>
val v = seq.map(t => TagDto(t.id, t.text))
complete(v)
}
}
}
備考
・テスト時はApplication Service(Use Case)
をモックする
まとめ
色々手探りで設計していますが、ドメイン層はかなり型安全で純粋に書けました。
ただ、繰り返しですがトランザクションの概念をどうドメインに組み込んだものかまだ悩んでいます。
(tagless finalを使うことで、interfaceにトランザクション管理のためのContextを持たせることが出来そうな気がします。)
https://www.slideshare.net/AoiroAoino/purelyfunctionalplayframeworkapplication