Scala
Akka
actor

akka.actor.FSMを永続化出来るPersistentFSM

More than 1 year has passed since last update.

状態の保持/状態遷移に長けたFSMと、その状態/状態遷移を永続化するAkka-Persistentの組み合わせとしてPersistentFSMがある。

状態を持つActorでイベントを永続化したいケース(EventSourcingなど)には強力な武器となる。
ただ、FSMと同様に普通のActor(PersistentActor)でもbecomeを使えば状態は表現できるので使いどころは難しいかもしれない。

ドキュメントにもあるようにexperimentalなものなので注意(v2.4.9時点)

実装の仕方

akka.actor.Actorの代わりにakka.persistence.fsm.PersistentFSMをextendsしてやればよい。

通常のFSMは型パラメータとしてStateとDataにあたる型を2つ要求したが、PersistentFSMでは型パラメータを3つ、State/Data/Eventに当たる型を要求する。
また、StateはPersistentFSM.FSMStateを継承した型でなければならない。

trait PersistentFSM[S <: FSMState, D, E] extends PersistentActor with PersistentFSMBase[S, D, E] with ActorLogging {
  ...
}

State, Data, Eventを用意する

ドキュメントのサンプルにあわせてショッピングカートっぽいのを実装してみる。
サンプルコード全体はGithubにあげてある。

まず、ショッピングの対象となる商品にあたるデータ型を用意する。

case class Item(name: String, price: Int)

State

StateはFSMの状態を表現する型にあたり、PersistentFSM.FSMStateをextendsしておく。
identifier: Stringをoverrideする必要があるので適当に実装しておく。

sealed trait ShoppingState extends PersistentFSM.FSMState {
  override def identifier: String = s"my-event: ${getClass.getSimpleName}"
}
case object Looking extends ShoppingState
case object Shopping extends ShoppingState
case object Purchased extends ShoppingState

Data

FSMの内部に保持するデータとなる。
今回はSet[Item]とした。

case class ShoppingData private[ShoppingData] (items: Set[Item]) {
  // 合計price
  def price: Int = items.map(_.price).sum
}
object ShoppingData {
  // 初期状態となるもの
  val empty = ShoppingData(Set.empty)
}

Event

FSMの状態を変化させるイベントとなる。
状態だけでなく内部データにも影響を及ぼすイベントも用意する。

sealed trait ShoppingEvent
// 内部データ(ShoppingData)に影響する
case class AddItem(item: Item) extends ShoppingEvent
case class RemoveItem(item: Item) extends ShoppingEvent
// 状態遷移するだけ
case object Purchase extends ShoppingEvent
case object Leave extends ShoppingEvent

PersistentFSMを実装する

State/Data/Eventにあたる型が用意出来たので、ようやくPersistentFSMを実装できる。
実装全体をのせる。

class Customer(implicit val domainEventClassTag: ClassTag[ShoppingEvent])
  extends PersistentFSM[ShoppingState, ShoppingData, ShoppingEvent] {

  // PersistentActor#persistenceIdと同じもの
  override def persistenceId: String = "example-persistence-FSM"

  // 状態変化する際に適用されるメソッドで、内部データを変化させる
  // `[stay|goty] applying ???`とした時に呼ばれる
  override def applyEvent(domainEvent: ShoppingEvent, currentData: ShoppingData): ShoppingData = {
    log.info(s"\n***applyEvent: $domainEvent, $currentData, state: $stateName")
    domainEvent match {
      case AddItem(item) =>
        // カートにItemを追加
        currentData.copy(items = currentData.items + item)
      case RemoveItem(item) =>
        // カートからItemを除去
        currentData.copy(items = currentData.items - item)
      case Purchase =>
        // 購入する。カートは空になる。
        println(s"price => ${currentData.price}")
        ShoppingData.empty
      case Leave =>
        // ショッピング終了。
        ShoppingData.empty
    }
  }

  // 初期Stateと初期Data
  startWith(Looking, ShoppingData.empty)

  // FSMと同様。`when(現在の状態) { イベントハンドラ }`という構造
  when(Looking) {
    case Event(addItem: AddItem, _) =>
      // Looking状態からAddItemでShopping状態に移行
      gotoLogging(Shopping) applying addItem andThen {
        // andThenでapplyEvent後のイベントハンドラを設定できる
        case afterAddItem => log.info(s"after => $afterAddItem")
      }
  }

  when(Shopping) {
    case Event(addItem: AddItem, _) =>
      // Shopping状態でAddItemしても状態は変化しないが、AddItemは適用する
      stay applying addItem

    case Event(removeItem: RemoveItem, _) =>
      // Shopping状態でRemoveItemしても状態は変化しないが、RemoveItemは適用する
        stay applying removeItem

    case Event(Purchase, _) =>
      // Shopping状態でPurchaseするとPurchased状態に移行
      goto(Purchased) applying Purchase
  }

  when(Purchased) {
    case Event(Leave, _) =>
      // Purchased状態からLeaveすることでLooking状態に戻る
      goto(Looking) applying Leave
  }

  // 状態遷移ハンドラ
  onTransition {
    case Purchased -> Looking =>
      // ショッピング終了時にsnapshotをとっておく
      saveStateSnapshot()
  }

}

domainEventClassTagを以下のように内部で定義しようとするとStackOverFlowが発生したので外から渡す形とした。

  ...
  // StackOverFlowが発生してしまう
  implicit val domainEventClassTag: ClassTag[ShoppingEvent] = classTag[ShoppingEvent]
  ...

実装自体を見ると、akka.actor.FSMがベースになっていて、when,onTransitionで状態遷移とそのハンドラを定義すれば良い。
ただし、FSMとはusingの代わりにapplyingを使用するという点が異なる
また、PersistentActorのようにpersistを明示的に呼ばなくてもapplyEventに渡ってくるdomainEventは自動的にpersistされている模様。

サンプルではinitializeを実行しているが、ver2.4.5からは内部APIとなったため不要。

andThenの注意点

カート内のItemが1つの時にRemoveItemされたらLookingに戻したい。

イベント適用後のイベントハンドラを設定するためのandThenというAPIがあるためそれが使えそうなので実装してみる。

case Event(removeItem: RemoveItem, _) =>
  stay applying removeItem andThen {
    // currentDataに対するイベントハンドラ(`afterTransitionDo`)を設定
    case ShoppingData.empty => goto(Looking)
  }

しかし、実際にはandThen内のgotostayでは状態遷移させることが出来なかった...。
解決するにはwhenに対するハンドラとしてそれを実装しなければならない。

when(Shopping) {
  ...
  case Event(removeItem: RemoveItem, ShoppingData.empty) =>
    // currentDataがemptyなら許可されていない処理
    throw new IllegalStateException("ShoppingCart is Empty")

  case Event(removeItem @ RemoveItem(item), ShoppingData(items)) if items.size == 1 && (items contains item) =>
    // 1つしかないitemがremoveされる場合
    gotoLogging(Looking) applying removeItem

  case Event(removeItem: RemoveItem, _) =>
    // 通常の動作
    stayLogging applying removeItem
  ...
}

何のためのandThenなのか...と言いたくなるが、
恐らく状態遷移を促すようなドメインイベントをメッセージとして受け取らずに内部的に状態遷移すると、永続化してあるメッセージをreplayしても状態の復元が出来なくなってしまうからではないかと思われる。

今回のようなケースではwhenのハンドラで事前に検査するしかない(はず)。
ShoppingからLookingに戻すようなイベントを定義してself!(tell)するのも考えられるが、メッセージが直後に処理されるとは限らないため上手くいかないケースも多い。

Recovery

Akka-Persistenceで大事な状態の復元。
PersistentFSMの場合、Actor起動時にSnapshotとjournalから自動で状態をrecoveryしてくれる。
普通のPersistentActorと同様に、最新のSnapshotを適用した後に、そのSnapshot採取後のイベントをjournalから読みだしてそれも適用してくれる。
recoveryが完了したらコールバックとしてonRecoveryCompletedが実行される。