状態の保持/状態遷移に長けたFSMと、その状態/状態遷移を永続化するAkka-Persistentの組み合わせとしてPersistentFSMがある。
状態を持つActorでイベントを永続化したいケース(EventSourcingなど)には強力な武器となる。
ただ、FSM
と同様に普通のActor(PersistentActor)でもbecome
を使えば状態は表現できるので使いどころは難しいかもしれない。
ドキュメントにもあるようにexperimentalなものなので注意(v2.4.9時点)
実装の仕方
akka.actor.Actor
の代わりにakka.persistence.fsm.PersistentFSM
をextendsしてやればよい。
通常のFSMは型パラメータとしてStateとDataにあたる型を2つ要求したが、PersistentFSM
では型パラメータを3つ、State/Data/Eventに当たる型を要求する。
また、StateはPersistentFSM.FSMState
を継承した型でなければならない。
trait PersistentFSM[S <: FSMState, D, E] extends PersistentActor with PersistentFSMBase[S, D, E] with ActorLogging {
...
}
State, Data, Eventを用意する
ドキュメントのサンプルにあわせてショッピングカートっぽいのを実装してみる。
サンプルコード全体はGithubにあげてある。
まず、ショッピングの対象となる商品にあたるデータ型を用意する。
case class Item(name: String, price: Int)
State
StateはFSMの状態を表現する型にあたり、PersistentFSM.FSMState
をextendsしておく。
identifier: String
をoverrideする必要があるので適当に実装しておく。
sealed trait ShoppingState extends PersistentFSM.FSMState {
override def identifier: String = s"my-event: ${getClass.getSimpleName}"
}
case object Looking extends ShoppingState
case object Shopping extends ShoppingState
case object Purchased extends ShoppingState
Data
FSMの内部に保持するデータとなる。
今回はSet[Item]
とした。
case class ShoppingData private[ShoppingData] (items: Set[Item]) {
// 合計price
def price: Int = items.map(_.price).sum
}
object ShoppingData {
// 初期状態となるもの
val empty = ShoppingData(Set.empty)
}
Event
FSMの状態を変化させるイベントとなる。
状態だけでなく内部データにも影響を及ぼすイベントも用意する。
sealed trait ShoppingEvent
// 内部データ(ShoppingData)に影響する
case class AddItem(item: Item) extends ShoppingEvent
case class RemoveItem(item: Item) extends ShoppingEvent
// 状態遷移するだけ
case object Purchase extends ShoppingEvent
case object Leave extends ShoppingEvent
PersistentFSMを実装する
State/Data/Eventにあたる型が用意出来たので、ようやくPersistentFSM
を実装できる。
実装全体をのせる。
class Customer(implicit val domainEventClassTag: ClassTag[ShoppingEvent])
extends PersistentFSM[ShoppingState, ShoppingData, ShoppingEvent] {
// PersistentActor#persistenceIdと同じもの
override def persistenceId: String = "example-persistence-FSM"
// 状態変化する際に適用されるメソッドで、内部データを変化させる
// `[stay|goty] applying ???`とした時に呼ばれる
override def applyEvent(domainEvent: ShoppingEvent, currentData: ShoppingData): ShoppingData = {
log.info(s"\n***applyEvent: $domainEvent, $currentData, state: $stateName")
domainEvent match {
case AddItem(item) =>
// カートにItemを追加
currentData.copy(items = currentData.items + item)
case RemoveItem(item) =>
// カートからItemを除去
currentData.copy(items = currentData.items - item)
case Purchase =>
// 購入する。カートは空になる。
println(s"price => ${currentData.price}")
ShoppingData.empty
case Leave =>
// ショッピング終了。
ShoppingData.empty
}
}
// 初期Stateと初期Data
startWith(Looking, ShoppingData.empty)
// FSMと同様。`when(現在の状態) { イベントハンドラ }`という構造
when(Looking) {
case Event(addItem: AddItem, _) =>
// Looking状態からAddItemでShopping状態に移行
gotoLogging(Shopping) applying addItem andThen {
// andThenでapplyEvent後のイベントハンドラを設定できる
case afterAddItem => log.info(s"after => $afterAddItem")
}
}
when(Shopping) {
case Event(addItem: AddItem, _) =>
// Shopping状態でAddItemしても状態は変化しないが、AddItemは適用する
stay applying addItem
case Event(removeItem: RemoveItem, _) =>
// Shopping状態でRemoveItemしても状態は変化しないが、RemoveItemは適用する
stay applying removeItem
case Event(Purchase, _) =>
// Shopping状態でPurchaseするとPurchased状態に移行
goto(Purchased) applying Purchase
}
when(Purchased) {
case Event(Leave, _) =>
// Purchased状態からLeaveすることでLooking状態に戻る
goto(Looking) applying Leave
}
// 状態遷移ハンドラ
onTransition {
case Purchased -> Looking =>
// ショッピング終了時にsnapshotをとっておく
saveStateSnapshot()
}
}
domainEventClassTag
を以下のように内部で定義しようとするとStackOverFlowが発生したので外から渡す形とした。
...
// StackOverFlowが発生してしまう
implicit val domainEventClassTag: ClassTag[ShoppingEvent] = classTag[ShoppingEvent]
...
実装自体を見ると、akka.actor.FSM
がベースになっていて、when
,onTransition
で状態遷移とそのハンドラを定義すれば良い。
ただし、FSM
とはusing
の代わりにapplying
を使用するという点が異なる
また、PersistentActor
のようにpersist
を明示的に呼ばなくてもapplyEvent
に渡ってくるdomainEvent
は自動的にpersist
されている模様。
サンプルではinitialize
を実行しているが、ver2.4.5からは内部APIとなったため不要。
andThenの注意点
カート内のItem
が1つの時にRemoveItem
されたらLooking
に戻したい。
イベント適用後のイベントハンドラを設定するためのandThen
というAPIがあるためそれが使えそうなので実装してみる。
case Event(removeItem: RemoveItem, _) =>
stay applying removeItem andThen {
// currentDataに対するイベントハンドラ(`afterTransitionDo`)を設定
case ShoppingData.empty => goto(Looking)
}
しかし、実際にはandThen
内のgoto
やstay
では状態遷移させることが出来なかった...。
解決するにはwhen
に対するハンドラとしてそれを実装しなければならない。
when(Shopping) {
...
case Event(removeItem: RemoveItem, ShoppingData.empty) =>
// currentDataがemptyなら許可されていない処理
throw new IllegalStateException("ShoppingCart is Empty")
case Event(removeItem @ RemoveItem(item), ShoppingData(items)) if items.size == 1 && (items contains item) =>
// 1つしかないitemがremoveされる場合
gotoLogging(Looking) applying removeItem
case Event(removeItem: RemoveItem, _) =>
// 通常の動作
stayLogging applying removeItem
...
}
何のためのandThen
なのか...と言いたくなるが、
恐らく状態遷移を促すようなドメインイベントをメッセージとして受け取らずに内部的に状態遷移すると、永続化してあるメッセージをreplayしても状態の復元が出来なくなってしまうからではないかと思われる。
今回のようなケースではwhen
のハンドラで事前に検査するしかない(はず)。
Shopping
からLooking
に戻すようなイベントを定義してself
に!
(tell
)するのも考えられるが、メッセージが直後に処理されるとは限らないため上手くいかないケースも多い。
Recovery
Akka-Persistenceで大事な状態の復元。
PersistentFSM
の場合、Actor起動時にSnapshotとjournalから自動で状態をrecoveryしてくれる。
普通のPersistentActor
と同様に、最新のSnapshotを適用した後に、そのSnapshot採取後のイベントをjournalから読みだしてそれも適用してくれる。
recoveryが完了したらコールバックとしてonRecoveryCompleted
が実行される。