はじめに
この記事に関係する範囲で自己紹介:
- 最近 Scala に触れている
- Play Framework
- Akka
- Elixir にも触れている
- Akka のページで語られている lightweight とか let it crash とか、Erlang や Elixir のページでもよく見る
Akka とは
Scala や Java で実装できるメッセージ駆動アプリケーションのライブラリ。
こちらから引用しました。
自分が学習用に使ったページ
The Akka actor hierarchy
図はこちらから引用しました。
Akka で処理を担うのは Actor と呼ばれる instance です(上の図では丸で表されています)。
Actor たちは木構造を形成しています。
ここでは root に近い Actor を parent Actor、遠いほうは child Actor と相対的に呼びます。
Parent Actor は child Actor たちを supervise しています。
デフォルトの動作では、child Actor が fail した場合、その child Actor を停止して再起動するようになっています。
再帰処理が増殖してしまう問題
Actor は Scheduler を利用することにより、指定時間 delay
後に自分へのメッセージを送ることができます。
context.system.scheduler.scheduleOnce(delay, self, message)
ここで self
というのが自分を指しているのですが、「自分」の意味に注意が必要です。
self
は Actor
のメンバで、型は ActorRef
です。
この ActorRef
は文字通り Actor への参照ですが、特徴として、supervisor による再起動が起きた場合、再起動された Actor、つまり後継の Actor を参照するようになります。
したがって、「自分」には後継者も含まれていると考えることができます。
このおかげで、自分が死んでしまったあとも後継者がメッセージを処理してくれます。
これは便利なのですが、再帰的な自分宛メッセージの送信処理を起動時に開始するようにしていると、その再帰処理が増えてしまうことになります。。。
問題のあるプログラム
上記の問題が起こるプログラムを Akka Quickstart with Scala で使われているサンプルコードを元につくりました。
主要な実装は下記です1。
package com.example
import scala.concurrent.duration._
import akka.actor.{ Actor, ActorLogging, ActorSystem, Props }
trait HumanActor extends Actor with ActorLogging
{
final protected def speak(words: String): Unit = {
log.info(s"[${toString}] ${words}")
}
final protected def toFutureMe[T](delay: FiniteDuration, message: T): Unit = {
implicit val executionContext = context.system.dispatcher
context.system.scheduler.scheduleOnce(delay, self, message)
}
}
object JapaneseActor {
// messages
case object SpeakFirst
case object SpeakSecond
}
class JapaneseActor(val words1: String, val words2: String) extends HumanActor
{
import JapaneseActor._
val interval1: FiniteDuration = 5.seconds
val interval2: FiniteDuration = 3.seconds
var count: Integer = 0
def receive: Receive = {
case SpeakFirst =>
speak(words1)
toFutureMe(interval1, SpeakFirst)
case SpeakSecond =>
if (count < 2) {
count += 1
} else {
throw new Exception("I failed!")
}
speak(words2)
toFutureMe(interval2, SpeakSecond)
}
override def preStart(): Unit = {
self ! SpeakFirst
self ! SpeakSecond
}
}
object AkkaEndlessRecursion extends App
{
val system = ActorSystem("MyPrettyActors")
val hiroshiAbe = system.actorOf(
Props(classOf[JapaneseActor], "Why don't you do your best?", "Yamada!"))
}
このプログラムで JapaneseActor
2はざっくり下記のような挙動をします。
- 5 秒間隔で
Why don't you do your best?
を出力する - 3 秒間隔で
Yamada!
を出力する- 3 度目の
Yamada!
を出力しようとすると、その前に fail して再起動される
- 3 度目の
これらの定期的な出力処理3は、起動時に preStart()
の中で SpeakFirst
と SpeakSecond
を自分に送信することで開始され、下記の要領で再帰的な処理が続いていきます。
-
SpeakFirst
を受け取ったらWhy don't you do your best?
を出力し、次の自分宛の送信をスケジュールに登録する -
SpeakSecond
を受け取ったらYamada!
を出力し、次の自分宛の送信をスケジュールに登録する
JapaneseActor
が fail するタイミングは後者の処理の前なので、後者の再帰処理は断ち切られます。
一方、前者の再帰処理は、既に scheduler に登録されていて JapaneseActor
が fail しても消えないため、増加の一途を辿ります。
結果として、下記のような出力になります。
akka-quickstart-scala [INFO] [08/04/2019 12:44:27.765] [MyPrettyActors-akka.actor.default-dispatcher-5] [akka://MyPrettyActors/user/$a] [com.example.JapaneseActor@7e7bdcce] Why don't you do your best?
akka-quickstart-scala [INFO] [08/04/2019 12:44:27.766] [MyPrettyActors-akka.actor.default-dispatcher-5] [akka://MyPrettyActors/user/$a] [com.example.JapaneseActor@7e7bdcce] Yamada!
akka-quickstart-scala [INFO] [08/04/2019 12:44:28.546] [MyPrettyActors-akka.actor.default-dispatcher-6] [akka://MyPrettyActors/user/$a] [com.example.JapaneseActor@7e7bdcce] Why don't you do your best?
akka-quickstart-scala [INFO] [08/04/2019 12:44:28.625] [MyPrettyActors-akka.actor.default-dispatcher-5] [akka://MyPrettyActors/user/$a] [com.example.JapaneseActor@7e7bdcce] Why don't you do your best?
akka-quickstart-scala [INFO] [08/04/2019 12:44:28.707] [MyPrettyActors-akka.actor.default-dispatcher-6] [akka://MyPrettyActors/user/$a] [com.example.JapaneseActor@7e7bdcce] Why don't you do your best?
akka-quickstart-scala [INFO] [08/04/2019 12:44:29.567] [MyPrettyActors-akka.actor.default-dispatcher-5] [akka://MyPrettyActors/user/$a] [com.example.JapaneseActor@7e7bdcce] Why don't you do your best?
akka-quickstart-scala [INFO] [08/04/2019 12:44:29.645] [MyPrettyActors-akka.actor.default-dispatcher-6] [akka://MyPrettyActors/user/$a] [com.example.JapaneseActor@7e7bdcce] Why don't you do your best?
akka-quickstart-scala [INFO] [08/04/2019 12:44:29.727] [MyPrettyActors-akka.actor.default-dispatcher-5] [akka://MyPrettyActors/user/$a] [com.example.JapaneseActor@7e7bdcce] Why don't you do your best?
akka-quickstart-scala [INFO] [08/04/2019 12:44:30.505] [MyPrettyActors-akka.actor.default-dispatcher-6] [akka://MyPrettyActors/user/$a] [com.example.JapaneseActor@7e7bdcce] Why don't you do your best?
akka-quickstart-scala [INFO] [08/04/2019 12:44:30.587] [MyPrettyActors-akka.actor.default-dispatcher-5] [akka://MyPrettyActors/user/$a] [com.example.JapaneseActor@7e7bdcce] Why don't you do your best?
akka-quickstart-scala [INFO] [08/04/2019 12:44:30.666] [MyPrettyActors-akka.actor.default-dispatcher-6] [akka://MyPrettyActors/user/$a] [com.example.JapaneseActor@7e7bdcce] Why don't you do your best?
akka-quickstart-scala [INFO] [08/04/2019 12:44:30.747] [MyPrettyActors-akka.actor.default-dispatcher-5] [akka://MyPrettyActors/user/$a] [com.example.JapaneseActor@7e7bdcce] Why don't you do your best?
akka-quickstart-scala [INFO] [08/04/2019 12:44:30.786] [MyPrettyActors-akka.actor.default-dispatcher-6] [akka://MyPrettyActors/user/$a] [com.example.JapaneseActor@7e7bdcce] Yamada!
akka-quickstart-scala [INFO] [08/04/2019 12:44:31.527] [MyPrettyActors-akka.actor.default-dispatcher-5] [akka://MyPrettyActors/user/$a] [com.example.JapaneseActor@7e7bdcce] Why don't you do your best?
akka-quickstart-scala [INFO] [08/04/2019 12:44:31.605] [MyPrettyActors-akka.actor.default-dispatcher-6] [akka://MyPrettyActors/user/$a] [com.example.JapaneseActor@7e7bdcce] Why don't you do your best?
akka-quickstart-scala [INFO] [08/04/2019 12:44:31.686] [MyPrettyActors-akka.actor.default-dispatcher-5] [akka://MyPrettyActors/user/$a] [com.example.JapaneseActor@7e7bdcce] Why don't you do your best?
akka-quickstart-scala [INFO] [08/04/2019 12:44:31.767] [MyPrettyActors-akka.actor.default-dispatcher-6] [akka://MyPrettyActors/user/$a] [com.example.JapaneseActor@7e7bdcce] Why don't you do your best?
akka-quickstart-scala [INFO] [08/04/2019 12:44:32.547] [MyPrettyActors-akka.actor.default-dispatcher-5] [akka://MyPrettyActors/user/$a] [com.example.JapaneseActor@7e7bdcce] Why don't you do your best?
akka-quickstart-scala [INFO] [08/04/2019 12:44:32.625] [MyPrettyActors-akka.actor.default-dispatcher-6] [akka://MyPrettyActors/user/$a] [com.example.JapaneseActor@7e7bdcce] Why don't you do your best?
akka-quickstart-scala [INFO] [08/04/2019 12:44:32.707] [MyPrettyActors-akka.actor.default-dispatcher-5] [akka://MyPrettyActors/user/$a] [com.example.JapaneseActor@7e7bdcce] Why don't you do your best?
akka-quickstart-scala [INFO] [08/04/2019 12:44:32.785] [MyPrettyActors-akka.actor.default-dispatcher-6] [akka://MyPrettyActors/user/$a] [com.example.JapaneseActor@7e7bdcce] Why don't you do your best?
akka-quickstart-scala [INFO] [08/04/2019 12:44:33.567] [MyPrettyActors-akka.actor.default-dispatcher-5] [akka://MyPrettyActors/user/$a] [com.example.JapaneseActor@7e7bdcce] Why don't you do your best?
akka-quickstart-scala [INFO] [08/04/2019 12:44:33.645] [MyPrettyActors-akka.actor.default-dispatcher-6] [akka://MyPrettyActors/user/$a] [com.example.JapaneseActor@7e7bdcce] Why don't you do your best?
akka-quickstart-scala [INFO] [08/04/2019 12:44:33.727] [MyPrettyActors-akka.actor.default-dispatcher-5] [akka://MyPrettyActors/user/$a] [com.example.JapaneseActor@7e7bdcce] Why don't you do your best?
akka-quickstart-scala [ERROR] [08/04/2019 12:44:33.806] [MyPrettyActors-akka.actor.internal-dispatcher-2] [akka://MyPrettyActors/user/$a] I failed!
修正案
HumanActor.toFutureMe(...)
が akka.actor.Cancellable
を返すように変更します。
akka.actor.Cancellable
の cancel()
4 を呼ぶことによりスケジュールした処理を無かったことにできます。
trait HumanActor extends Actor with ActorLogging
{
final protected def speak(words: String): Unit = {
log.info(s"[${toString}] ${words}")
}
final protected def toFutureMe[T](delay: FiniteDuration, message: T): Cancellable = {
implicit val executionContext = context.system.dispatcher
context.system.scheduler.scheduleOnce(delay, self, message)
}
}
JapaneseActor
に schedule1
と schedule2
を用意し、HumanActor.toFutureMe(...)
から返された Cancellable
を保持します。
Supervisor による停止時に実行される postStop()
の中でスケジュールした処理をキャンセルします。
class JapaneseActor(val words1: String, val words2: String) extends HumanActor
{
import JapaneseActor._
val interval1: FiniteDuration = 5.seconds
val interval2: FiniteDuration = 3.seconds
var schedule1: Option[Cancellable] = None
var schedule2: Option[Cancellable] = None
var count: Integer = 0
def receive: Receive = {
case SpeakFirst =>
speak(words1)
schedule1 = Option(toFutureMe(interval1, SpeakFirst))
case SpeakSecond =>
if (count < 2) {
count += 1
} else {
throw new Exception("I failed!")
}
speak(words2)
schedule2 = Option(toFutureMe(interval2, SpeakSecond))
}
override def preStart(): Unit = {
self ! SpeakFirst
self ! SpeakSecond
}
override def postStop(): Unit = {
schedule1.foreach { s => s.cancel() }
schedule2.foreach { s => s.cancel() }
}
}
以上で再帰処理の増殖を防ぐことができます。
補足
What actually happens though is that the
preRestart()
andpostRestart()
methods are called which, if not overridden, by default delegate topostStop()
andpreStart()
respectively.
こちらからの引用です。
おわりに
便利な機能が一転して牙を剝くという状況は興奮?してしまいますね!