Posted at

StateMachine by Akka FSM

More than 3 years have passed since last update.


Akka FSM で ステートマシンをサクッと

Akka FSMの日本語の記事を見かけないので、もっと陽のあたる場所にってことで、こちら から転載しました。

Akkaの FSM(Finit State Machine) を利用することで、内部DSLでステートマシンをサクッと実装できます。

例として、FoolBossがWorker(ステートマシン)に適当に指示するというモデルを実装してみます。

まずはWorkerのステートの定義

sealed trait State

case object Sleeping extends State
case object Ready extends State
case object Working extends State

続いて、ステートの内部情報に相当するデータクラスを定義。

Powerの量で WorkingになるかSleepingに遷移するかになります。

sealed trait Power

case class InRest(value: Int) extends Power
case class InWorking(value: Int, boss: ActorRef, request: String) extends Power

Worker(ステートマシン)に対する指示とレスポンスを定義。

これはなくてもよいんだけど、より実践ぽくするために定義。

sealed trait Command

case object WakeUp extends Command
case class Work(request: String) extends Command
case object Stop extends Command
sealed trait Response
case class InBed(power: Int) extends Response
case object Ok extends Response
case object Busy extends Response
case class Done(result: String) extends Response

CommandとStateによって、WorkerのStateが遷移したりしなかったりしつつ、

Responseを返したり、返さなかったりします。

そして、実際のWorker(ステートマシン)を以下の様に実装します。

class Worker extends FSM[State, Power] {

startWith(Sleeping, InRest(0))
val cost = 3
when(Sleeping){
case Event(_, InRest(v)) if v < cost => stay using InRest(v+1) replying InBed(v+1)
case Event(WakeUp, InRest(v)) => goto(Ready) using InRest(v+1) replying Ok
case Event(_, InRest(v)) => stay using InRest(v+1) replying InBed(v+1)
}
when(Ready){
case Event(Work(req), InRest(v)) if cost <= v => goto(Working) using InWorking(v, sender, req)
case Event(_, InRest(v)) if v < cost => goto(Sleeping) using InRest(v+1) replying InBed(v+1)
case _ => stay replying Ok
}
when(Working, stateTimeout = 3 second) {
case Event(StateTimeout, InWorking(v, boss, req)) =>
boss ! Done(req.toUpperCase + "?")
goto(Ready) using InRest(v - cost)
case Event(Stop, InWorking(v, _, _)) => goto(Ready) using InRest(v - cost/2)
case _ => stay replying Busy
}
initialize()
}

クラス定義でFSMを継承し、Type parameterでステートと内部情報を指定します。

startWithで初期ステートと内部情報を渡します。

class Worker extends FSM[State, Power] {

startWith(Sleeping, InRest(0))

when(ステート) とPartialFunction[Event,State] を定義しています。PartialFunction なので、orElse で合成もできます。

initialize() で初期ステートに遷移させるようです。(タイマーの初期化もこのタイミングらしい)

また、今回記述していませんが、whenUnhandled を定義することで、全てのステートでハンドリングできなかったEventに対する処理が書けます。

  when(Sleeping){

case Event(_, InRest(v)) if v < cost => stay using InRest(v+1) replying InBed(v+1)
case Event(WakeUp, InRest(v)) => goto(Ready) using InRest(v+1) replying Ok
case Event(_, InRest(v)) => stay using InRest(v+1) replying InBed(v+1)
}
//(中略)
initialize()

次は、FoolBossです。

自身のタイミングでWorkerにランダムな指示を出します。

WorkerからのResponseがOkであれば、仕事の指示(Work)を出します。

class FoolBoss(val worker :ActorRef)(implicit xc: ExecutionContext = ExecutionContext.global) extends Actor {

private case object Tick
self ! Tick

val messages = List(WakeUp, Work("Well, Your work is ..."), Stop)

def nextTickIn: FiniteDuration = (1.0 + ThreadLocalRandom.current.nextDouble() * 9.0).seconds

def receive = {
case Tick =>
val msg = util.Random.shuffle(messages).head
println(s"Tickked! send to Worker: $msg")
worker ! msg
case InBed(power) =>
println(s"Worker In Bed, power $power")
context.system.scheduler.scheduleOnce(nextTickIn, self, Tick)
case Ok =>
println(s"Worker is Ready!")
worker ! Work("OK, Your work is ...")
case Busy =>
println(s"Worker is Busy!")
context.system.scheduler.scheduleOnce(nextTickIn, self, Tick)
case Done(result) =>
println(s"worker done: $result")
context.system.scheduler.scheduleOnce(nextTickIn, self, Tick)
}
}

最後にmain関数です。(実際はAppですが)

object FSMApp extends App {

val system = ActorSystem()
val worker = system.actorOf(Props(classOf[Worker]))
val boss = system.actorOf(Props(new FoolBoss(worker)()))
}

実行すると、以下の様になります。

Oh! send to Worker: WakeUp

Worker In Bed, power 1
Oh! send to Worker: Stop
Worker In Bed, power 2
Oh! send to Worker: Work(Well, Your work is ...)
Worker In Bed, power 3
Oh! send to Worker: Work(Well, Your work is ...)
Worker In Bed, power 4
Oh! send to Worker: Work(Well, Your work is ...)
Worker In Bed, power 5
Oh! send to Worker: Stop
Worker In Bed, power 6
Oh! send to Worker: Stop
Worker In Bed, power 7
Oh! send to Worker: WakeUp
Worker is Ready!
worker done: OK! YOUR WORK IS ...?
Oh! send to Worker: Stop
Worker is Ready!
worker done: OK! YOUR WORK IS ...?
Oh! send to Worker: Work(Well, Your work is ...)
Worker In Bed, power 3
(以下略)

FoolBossとWorkerのやりとりが見えると思います。

ちなみに、ステート定義でハンドルできないEventがあると、ログとして以下のような出力が得られます。

unhandled event WakeUp in state Ready

同様のことは、Actorのbecomeメソッドでもできます。

FSMの方が、DSLを覚えるという新たなコストがありますが、

よりステートと内部データの状態を宣言的に実装でき、見通しがよくなると個人的に感じています。

また、becomeで指定したRecieve型(PartialFunction[Any, Unit])でハンドルできないものがあっても、

FSMのwhenと異なり、上記のようなログは出力されずに、沈黙したままになります。

FSMのソースコード は Actorの効果的な使い方とDSLの勉強にもよいかもしれません。

ソースコードによると、FSMもreceiveで最終的に全てをEventに包んでいるだけのようです。

FSMのreceiveに送られてきたメッセージを渡せばよいので、デバッグなどでデータを覗き見る場合は、以下のようにできます。

  def allLogging : PartialFunction[Any, Any] = {

case x => log.debug(s"Received : $x")
x
}
override def receive = allLogging andThen super.receive