Scala
Akka
actor
statemachine
FSM

StateMachine by Akka FSM

More than 3 years have passed since last update.

Akka FSM で ステートマシンをサクッと

Akka FSMの日本語の記事を見かけないので、もっと陽のあたる場所にってことで、こちら から転載しました。

Akkaの FSM(Finit State Machine) を利用することで、内部DSLでステートマシンをサクッと実装できます。

例として、FoolBossがWorker(ステートマシン)に適当に指示するというモデルを実装してみます。

まずはWorkerのステートの定義

sealed trait State
case object Sleeping extends State
case object Ready    extends State
case object Working  extends State

続いて、ステートの内部情報に相当するデータクラスを定義。
Powerの量で WorkingになるかSleepingに遷移するかになります。

sealed trait Power
case class InRest(value: Int) extends Power
case class InWorking(value: Int, boss: ActorRef, request: String) extends Power

Worker(ステートマシン)に対する指示とレスポンスを定義。
これはなくてもよいんだけど、より実践ぽくするために定義。

sealed trait Command
case object WakeUp               extends Command
case class Work(request: String) extends Command
case object Stop                 extends Command
sealed trait Response
case class InBed(power: Int)     extends Response
case object Ok                   extends Response
case object Busy                 extends Response
case class  Done(result: String) extends Response

CommandとStateによって、WorkerのStateが遷移したりしなかったりしつつ、
Responseを返したり、返さなかったりします。

そして、実際のWorker(ステートマシン)を以下の様に実装します。

class Worker extends FSM[State, Power] {
  startWith(Sleeping, InRest(0))
  val cost = 3
  when(Sleeping){
    case Event(_, InRest(v)) if v < cost => stay        using InRest(v+1) replying InBed(v+1)
    case Event(WakeUp, InRest(v))        => goto(Ready) using InRest(v+1) replying Ok
    case Event(_, InRest(v))             => stay        using InRest(v+1) replying InBed(v+1)
  }
  when(Ready){
    case Event(Work(req), InRest(v)) if cost <= v => goto(Working)  using InWorking(v, sender, req)
    case Event(_, InRest(v)) if v < cost          => goto(Sleeping) using InRest(v+1) replying InBed(v+1)
    case _                                        => stay replying Ok
  }
  when(Working, stateTimeout = 3 second) {
    case Event(StateTimeout, InWorking(v, boss, req)) =>
      boss ! Done(req.toUpperCase + "?")
      goto(Ready) using InRest(v - cost)
    case Event(Stop, InWorking(v, _, _))  => goto(Ready) using InRest(v - cost/2)
    case _                                => stay replying Busy
  }
  initialize()
}

クラス定義でFSMを継承し、Type parameterでステートと内部情報を指定します。

startWithで初期ステートと内部情報を渡します。

class Worker extends FSM[State, Power] {
  startWith(Sleeping, InRest(0))

when(ステート) とPartialFunction[Event,State] を定義しています。PartialFunction なので、orElse で合成もできます。
initialize() で初期ステートに遷移させるようです。(タイマーの初期化もこのタイミングらしい)
また、今回記述していませんが、whenUnhandled を定義することで、全てのステートでハンドリングできなかったEventに対する処理が書けます。

  when(Sleeping){
    case Event(_, InRest(v)) if v < cost => stay        using InRest(v+1) replying InBed(v+1)
    case Event(WakeUp, InRest(v))        => goto(Ready) using InRest(v+1) replying Ok
    case Event(_, InRest(v))             => stay        using InRest(v+1) replying InBed(v+1)
  }
  //(中略)
  initialize()

次は、FoolBossです。
自身のタイミングでWorkerにランダムな指示を出します。
WorkerからのResponseがOkであれば、仕事の指示(Work)を出します。

class FoolBoss(val worker :ActorRef)(implicit xc: ExecutionContext = ExecutionContext.global) extends Actor {
  private case object Tick
  self ! Tick

  val messages = List(WakeUp, Work("Well, Your work is ..."), Stop)

  def nextTickIn: FiniteDuration = (1.0 + ThreadLocalRandom.current.nextDouble() * 9.0).seconds

  def receive = {
    case Tick =>
      val msg = util.Random.shuffle(messages).head
      println(s"Tickked! send to Worker: $msg")
      worker ! msg
    case InBed(power) =>
      println(s"Worker In Bed, power $power")
      context.system.scheduler.scheduleOnce(nextTickIn, self, Tick)
    case Ok =>
      println(s"Worker is Ready!")
      worker ! Work("OK, Your work is ...")
    case Busy =>
      println(s"Worker is Busy!")
      context.system.scheduler.scheduleOnce(nextTickIn, self, Tick)
    case Done(result) =>
      println(s"worker done: $result")
      context.system.scheduler.scheduleOnce(nextTickIn, self, Tick)
  }
}

最後にmain関数です。(実際はAppですが)

object FSMApp extends App {
  val system = ActorSystem()
  val worker = system.actorOf(Props(classOf[Worker]))
  val boss = system.actorOf(Props(new FoolBoss(worker)()))
}

実行すると、以下の様になります。

Oh! send to Worker: WakeUp
Worker In Bed, power 1
Oh! send to Worker: Stop
Worker In Bed, power 2
Oh! send to Worker: Work(Well, Your work is ...)
Worker In Bed, power 3
Oh! send to Worker: Work(Well, Your work is ...)
Worker In Bed, power 4
Oh! send to Worker: Work(Well, Your work is ...)
Worker In Bed, power 5
Oh! send to Worker: Stop
Worker In Bed, power 6
Oh! send to Worker: Stop
Worker In Bed, power 7
Oh! send to Worker: WakeUp
Worker is Ready!
worker done: OK! YOUR WORK IS ...?
Oh! send to Worker: Stop
Worker is Ready!
worker done: OK! YOUR WORK IS ...?
Oh! send to Worker: Work(Well, Your work is ...)
Worker In Bed, power 3
(以下略)

FoolBossとWorkerのやりとりが見えると思います。
ちなみに、ステート定義でハンドルできないEventがあると、ログとして以下のような出力が得られます。

unhandled event WakeUp in state Ready

同様のことは、Actorのbecomeメソッドでもできます。
FSMの方が、DSLを覚えるという新たなコストがありますが、
よりステートと内部データの状態を宣言的に実装でき、見通しがよくなると個人的に感じています。

また、becomeで指定したRecieve型(PartialFunction[Any, Unit])でハンドルできないものがあっても、
FSMのwhenと異なり、上記のようなログは出力されずに、沈黙したままになります。

FSMのソースコード は Actorの効果的な使い方とDSLの勉強にもよいかもしれません。

ソースコードによると、FSMもreceiveで最終的に全てをEventに包んでいるだけのようです。
FSMのreceiveに送られてきたメッセージを渡せばよいので、デバッグなどでデータを覗き見る場合は、以下のようにできます。

  def allLogging : PartialFunction[Any, Any] = {
    case x => log.debug(s"Received : $x")
      x
  }
  override def receive = allLogging andThen super.receive