Akka FSM で ステートマシンをサクッと
Akka FSMの日本語の記事を見かけないので、もっと陽のあたる場所にってことで、こちら から転載しました。
Akkaの FSM(Finit State Machine) を利用することで、内部DSLでステートマシンをサクッと実装できます。
例として、FoolBossがWorker(ステートマシン)に適当に指示するというモデルを実装してみます。
まずはWorkerのステートの定義
sealed trait State
case object Sleeping extends State
case object Ready extends State
case object Working extends State
続いて、ステートの内部情報に相当するデータクラスを定義。
Powerの量で WorkingになるかSleepingに遷移するかになります。
sealed trait Power
case class InRest(value: Int) extends Power
case class InWorking(value: Int, boss: ActorRef, request: String) extends Power
Worker(ステートマシン)に対する指示とレスポンスを定義。
これはなくてもよいんだけど、より実践ぽくするために定義。
sealed trait Command
case object WakeUp extends Command
case class Work(request: String) extends Command
case object Stop extends Command
sealed trait Response
case class InBed(power: Int) extends Response
case object Ok extends Response
case object Busy extends Response
case class Done(result: String) extends Response
CommandとStateによって、WorkerのStateが遷移したりしなかったりしつつ、
Responseを返したり、返さなかったりします。
そして、実際のWorker(ステートマシン)を以下の様に実装します。
class Worker extends FSM[State, Power] {
startWith(Sleeping, InRest(0))
val cost = 3
when(Sleeping){
case Event(_, InRest(v)) if v < cost => stay using InRest(v+1) replying InBed(v+1)
case Event(WakeUp, InRest(v)) => goto(Ready) using InRest(v+1) replying Ok
case Event(_, InRest(v)) => stay using InRest(v+1) replying InBed(v+1)
}
when(Ready){
case Event(Work(req), InRest(v)) if cost <= v => goto(Working) using InWorking(v, sender, req)
case Event(_, InRest(v)) if v < cost => goto(Sleeping) using InRest(v+1) replying InBed(v+1)
case _ => stay replying Ok
}
when(Working, stateTimeout = 3 second) {
case Event(StateTimeout, InWorking(v, boss, req)) =>
boss ! Done(req.toUpperCase + "?")
goto(Ready) using InRest(v - cost)
case Event(Stop, InWorking(v, _, _)) => goto(Ready) using InRest(v - cost/2)
case _ => stay replying Busy
}
initialize()
}
クラス定義でFSMを継承し、Type parameterでステートと内部情報を指定します。
startWithで初期ステートと内部情報を渡します。
class Worker extends FSM[State, Power] {
startWith(Sleeping, InRest(0))
when(ステート) とPartialFunction[Event,State] を定義しています。PartialFunction なので、orElse で合成もできます。
initialize() で初期ステートに遷移させるようです。(タイマーの初期化もこのタイミングらしい)
また、今回記述していませんが、whenUnhandled を定義することで、全てのステートでハンドリングできなかったEventに対する処理が書けます。
when(Sleeping){
case Event(_, InRest(v)) if v < cost => stay using InRest(v+1) replying InBed(v+1)
case Event(WakeUp, InRest(v)) => goto(Ready) using InRest(v+1) replying Ok
case Event(_, InRest(v)) => stay using InRest(v+1) replying InBed(v+1)
}
//(中略)
initialize()
次は、FoolBossです。
自身のタイミングでWorkerにランダムな指示を出します。
WorkerからのResponseがOkであれば、仕事の指示(Work)を出します。
class FoolBoss(val worker :ActorRef)(implicit xc: ExecutionContext = ExecutionContext.global) extends Actor {
private case object Tick
self ! Tick
val messages = List(WakeUp, Work("Well, Your work is ..."), Stop)
def nextTickIn: FiniteDuration = (1.0 + ThreadLocalRandom.current.nextDouble() * 9.0).seconds
def receive = {
case Tick =>
val msg = util.Random.shuffle(messages).head
println(s"Tickked! send to Worker: $msg")
worker ! msg
case InBed(power) =>
println(s"Worker In Bed, power $power")
context.system.scheduler.scheduleOnce(nextTickIn, self, Tick)
case Ok =>
println(s"Worker is Ready!")
worker ! Work("OK, Your work is ...")
case Busy =>
println(s"Worker is Busy!")
context.system.scheduler.scheduleOnce(nextTickIn, self, Tick)
case Done(result) =>
println(s"worker done: $result")
context.system.scheduler.scheduleOnce(nextTickIn, self, Tick)
}
}
最後にmain関数です。(実際はAppですが)
object FSMApp extends App {
val system = ActorSystem()
val worker = system.actorOf(Props(classOf[Worker]))
val boss = system.actorOf(Props(new FoolBoss(worker)()))
}
実行すると、以下の様になります。
Oh! send to Worker: WakeUp
Worker In Bed, power 1
Oh! send to Worker: Stop
Worker In Bed, power 2
Oh! send to Worker: Work(Well, Your work is ...)
Worker In Bed, power 3
Oh! send to Worker: Work(Well, Your work is ...)
Worker In Bed, power 4
Oh! send to Worker: Work(Well, Your work is ...)
Worker In Bed, power 5
Oh! send to Worker: Stop
Worker In Bed, power 6
Oh! send to Worker: Stop
Worker In Bed, power 7
Oh! send to Worker: WakeUp
Worker is Ready!
worker done: OK! YOUR WORK IS ...?
Oh! send to Worker: Stop
Worker is Ready!
worker done: OK! YOUR WORK IS ...?
Oh! send to Worker: Work(Well, Your work is ...)
Worker In Bed, power 3
(以下略)
FoolBossとWorkerのやりとりが見えると思います。
ちなみに、ステート定義でハンドルできないEventがあると、ログとして以下のような出力が得られます。
unhandled event WakeUp in state Ready
同様のことは、Actorのbecomeメソッドでもできます。
FSMの方が、DSLを覚えるという新たなコストがありますが、
よりステートと内部データの状態を宣言的に実装でき、見通しがよくなると個人的に感じています。
また、becomeで指定したRecieve型(PartialFunction[Any, Unit])でハンドルできないものがあっても、
FSMのwhenと異なり、上記のようなログは出力されずに、沈黙したままになります。
FSMのソースコード は Actorの効果的な使い方とDSLの勉強にもよいかもしれません。
ソースコードによると、FSMもreceiveで最終的に全てをEventに包んでいるだけのようです。
FSMのreceiveに送られてきたメッセージを渡せばよいので、デバッグなどでデータを覗き見る場合は、以下のようにできます。
def allLogging : PartialFunction[Any, Any] = {
case x => log.debug(s"Received : $x")
x
}
override def receive = allLogging andThen super.receive