Posted at

Akka で再帰処理の増殖を体験してみる


はじめに

この記事に関係する範囲で自己紹介:


  • 最近 Scala に触れている


    • Play Framework

    • Akka

    • Elixir にも触れている

    • Akka のページで語られている lightweight とか let it crash とか、Erlang や Elixir のページでもよく見る




Akka とは


Scala や Java で実装できるメッセージ駆動アプリケーションのライブラリ。


こちらから引用しました。


自分が学習用に使ったページ


The Akka actor hierarchy

actor_top_tree

図はこちらから引用しました。

Akka で処理を担うのは Actor と呼ばれる instance です(上の図では丸で表されています)。

Actor たちは木構造を形成しています。

ここでは root に近い Actor を parent Actor、遠いほうは child Actor と相対的に呼びます。

Parent Actor は child Actor たちを supervise しています。

デフォルトの動作では、child Actor が fail した場合、その child Actor を停止して再起動するようになっています。


再帰処理が増殖してしまう問題

Actor は Scheduler を利用することにより、指定時間 delay 後に自分へのメッセージを送ることができます。

context.system.scheduler.scheduleOnce(delay, self, message)

ここで self というのが自分を指しているのですが、「自分」の意味に注意が必要です。

selfActor のメンバで、型は ActorRef です。

この ActorRef は文字通り Actor への参照ですが、特徴として、supervisor による再起動が起きた場合、再起動された Actor、つまり後継の Actor を参照するようになります。

したがって、「自分」には後継者も含まれていると考えることができます。

このおかげで、自分が死んでしまったあとも後継者がメッセージを処理してくれます。

これは便利なのですが、再帰的な自分宛メッセージの送信処理を起動時に開始するようにしていると、その再帰処理が増えてしまうことになります。。。


問題のあるプログラム

上記の問題が起こるプログラムを Akka Quickstart with Scala で使われているサンプルコードを元につくりました。

主要な実装は下記です1

package com.example

import scala.concurrent.duration._
import akka.actor.{ Actor, ActorLogging, ActorSystem, Props }

trait HumanActor extends Actor with ActorLogging
{
final protected def speak(words: String): Unit = {
log.info(s"[${toString}] ${words}")
}

final protected def toFutureMe[T](delay: FiniteDuration, message: T): Unit = {
implicit val executionContext = context.system.dispatcher
context.system.scheduler.scheduleOnce(delay, self, message)
}
}

object JapaneseActor {
// messages
case object SpeakFirst
case object SpeakSecond
}

class JapaneseActor(val words1: String, val words2: String) extends HumanActor
{
import JapaneseActor._

val interval1: FiniteDuration = 5.seconds
val interval2: FiniteDuration = 3.seconds
var count: Integer = 0

def receive: Receive = {
case SpeakFirst =>
speak(words1)
toFutureMe(interval1, SpeakFirst)
case SpeakSecond =>
if (count < 2) {
count += 1
} else {
throw new Exception("I failed!")
}
speak(words2)
toFutureMe(interval2, SpeakSecond)
}

override def preStart(): Unit = {
self ! SpeakFirst
self ! SpeakSecond
}
}

object AkkaEndlessRecursion extends App
{
val system = ActorSystem("MyPrettyActors")
val hiroshiAbe = system.actorOf(
Props(classOf[JapaneseActor], "Why don't you do your best?", "Yamada!"))
}

このプログラムで JapaneseActor2はざっくり下記のような挙動をします。


  • 5 秒間隔で Why don't you do your best? を出力する

  • 3 秒間隔で Yamada! を出力する


    • 3 度目の Yamada! を出力しようとすると、その前に fail して再起動される



これらの定期的な出力処理3は、起動時に preStart() の中で SpeakFirstSpeakSecond を自分に送信することで開始され、下記の要領で再帰的な処理が続いていきます。



  • SpeakFirst を受け取ったら Why don't you do your best? を出力し、次の自分宛の送信をスケジュールに登録する


  • SpeakSecond を受け取ったら Yamada! を出力し、次の自分宛の送信をスケジュールに登録する

JapaneseActor が fail するタイミングは後者の処理の前なので、後者の再帰処理は断ち切られます。

一方、前者の再帰処理は、既に scheduler に登録されていて JapaneseActor が fail しても消えないため、増加の一途を辿ります。

結果として、下記のような出力になります。

akka-quickstart-scala [INFO] [08/04/2019 12:44:27.765] [MyPrettyActors-akka.actor.default-dispatcher-5] [akka://MyPrettyActors/user/$a] [com.example.JapaneseActor@7e7bdcce] Why don't you do your best?

akka-quickstart-scala [INFO] [08/04/2019 12:44:27.766] [MyPrettyActors-akka.actor.default-dispatcher-5] [akka://MyPrettyActors/user/$a] [com.example.JapaneseActor@7e7bdcce] Yamada!
akka-quickstart-scala [INFO] [08/04/2019 12:44:28.546] [MyPrettyActors-akka.actor.default-dispatcher-6] [akka://MyPrettyActors/user/$a] [com.example.JapaneseActor@7e7bdcce] Why don't you do your best?
akka-quickstart-scala [INFO] [08/04/2019 12:44:28.625] [MyPrettyActors-akka.actor.default-dispatcher-5] [akka://MyPrettyActors/user/$a] [com.example.JapaneseActor@7e7bdcce] Why don't you do your best?
akka-quickstart-scala [INFO] [08/04/2019 12:44:28.707] [MyPrettyActors-akka.actor.default-dispatcher-6] [akka://MyPrettyActors/user/$a] [com.example.JapaneseActor@7e7bdcce] Why don't you do your best?
akka-quickstart-scala [INFO] [08/04/2019 12:44:29.567] [MyPrettyActors-akka.actor.default-dispatcher-5] [akka://MyPrettyActors/user/$a] [com.example.JapaneseActor@7e7bdcce] Why don't you do your best?
akka-quickstart-scala [INFO] [08/04/2019 12:44:29.645] [MyPrettyActors-akka.actor.default-dispatcher-6] [akka://MyPrettyActors/user/$a] [com.example.JapaneseActor@7e7bdcce] Why don't you do your best?
akka-quickstart-scala [INFO] [08/04/2019 12:44:29.727] [MyPrettyActors-akka.actor.default-dispatcher-5] [akka://MyPrettyActors/user/$a] [com.example.JapaneseActor@7e7bdcce] Why don't you do your best?
akka-quickstart-scala [INFO] [08/04/2019 12:44:30.505] [MyPrettyActors-akka.actor.default-dispatcher-6] [akka://MyPrettyActors/user/$a] [com.example.JapaneseActor@7e7bdcce] Why don't you do your best?
akka-quickstart-scala [INFO] [08/04/2019 12:44:30.587] [MyPrettyActors-akka.actor.default-dispatcher-5] [akka://MyPrettyActors/user/$a] [com.example.JapaneseActor@7e7bdcce] Why don't you do your best?
akka-quickstart-scala [INFO] [08/04/2019 12:44:30.666] [MyPrettyActors-akka.actor.default-dispatcher-6] [akka://MyPrettyActors/user/$a] [com.example.JapaneseActor@7e7bdcce] Why don't you do your best?
akka-quickstart-scala [INFO] [08/04/2019 12:44:30.747] [MyPrettyActors-akka.actor.default-dispatcher-5] [akka://MyPrettyActors/user/$a] [com.example.JapaneseActor@7e7bdcce] Why don't you do your best?
akka-quickstart-scala [INFO] [08/04/2019 12:44:30.786] [MyPrettyActors-akka.actor.default-dispatcher-6] [akka://MyPrettyActors/user/$a] [com.example.JapaneseActor@7e7bdcce] Yamada!
akka-quickstart-scala [INFO] [08/04/2019 12:44:31.527] [MyPrettyActors-akka.actor.default-dispatcher-5] [akka://MyPrettyActors/user/$a] [com.example.JapaneseActor@7e7bdcce] Why don't you do your best?
akka-quickstart-scala [INFO] [08/04/2019 12:44:31.605] [MyPrettyActors-akka.actor.default-dispatcher-6] [akka://MyPrettyActors/user/$a] [com.example.JapaneseActor@7e7bdcce] Why don't you do your best?
akka-quickstart-scala [INFO] [08/04/2019 12:44:31.686] [MyPrettyActors-akka.actor.default-dispatcher-5] [akka://MyPrettyActors/user/$a] [com.example.JapaneseActor@7e7bdcce] Why don't you do your best?
akka-quickstart-scala [INFO] [08/04/2019 12:44:31.767] [MyPrettyActors-akka.actor.default-dispatcher-6] [akka://MyPrettyActors/user/$a] [com.example.JapaneseActor@7e7bdcce] Why don't you do your best?
akka-quickstart-scala [INFO] [08/04/2019 12:44:32.547] [MyPrettyActors-akka.actor.default-dispatcher-5] [akka://MyPrettyActors/user/$a] [com.example.JapaneseActor@7e7bdcce] Why don't you do your best?
akka-quickstart-scala [INFO] [08/04/2019 12:44:32.625] [MyPrettyActors-akka.actor.default-dispatcher-6] [akka://MyPrettyActors/user/$a] [com.example.JapaneseActor@7e7bdcce] Why don't you do your best?
akka-quickstart-scala [INFO] [08/04/2019 12:44:32.707] [MyPrettyActors-akka.actor.default-dispatcher-5] [akka://MyPrettyActors/user/$a] [com.example.JapaneseActor@7e7bdcce] Why don't you do your best?
akka-quickstart-scala [INFO] [08/04/2019 12:44:32.785] [MyPrettyActors-akka.actor.default-dispatcher-6] [akka://MyPrettyActors/user/$a] [com.example.JapaneseActor@7e7bdcce] Why don't you do your best?
akka-quickstart-scala [INFO] [08/04/2019 12:44:33.567] [MyPrettyActors-akka.actor.default-dispatcher-5] [akka://MyPrettyActors/user/$a] [com.example.JapaneseActor@7e7bdcce] Why don't you do your best?
akka-quickstart-scala [INFO] [08/04/2019 12:44:33.645] [MyPrettyActors-akka.actor.default-dispatcher-6] [akka://MyPrettyActors/user/$a] [com.example.JapaneseActor@7e7bdcce] Why don't you do your best?
akka-quickstart-scala [INFO] [08/04/2019 12:44:33.727] [MyPrettyActors-akka.actor.default-dispatcher-5] [akka://MyPrettyActors/user/$a] [com.example.JapaneseActor@7e7bdcce] Why don't you do your best?
akka-quickstart-scala [ERROR] [08/04/2019 12:44:33.806] [MyPrettyActors-akka.actor.internal-dispatcher-2] [akka://MyPrettyActors/user/$a] I failed!


修正案

HumanActor.toFutureMe(...)akka.actor.Cancellable を返すように変更します。

akka.actor.Cancellablecancel()4 を呼ぶことによりスケジュールした処理を無かったことにできます。

trait HumanActor extends Actor with ActorLogging

{
final protected def speak(words: String): Unit = {
log.info(s"[${toString}] ${words}")
}

final protected def toFutureMe[T](delay: FiniteDuration, message: T): Cancellable = {
implicit val executionContext = context.system.dispatcher
context.system.scheduler.scheduleOnce(delay, self, message)
}
}

JapaneseActorschedule1schedule2 を用意し、HumanActor.toFutureMe(...) から返された Cancellable を保持します。

Supervisor による停止時に実行される postStop() の中でスケジュールした処理をキャンセルします。

class JapaneseActor(val words1: String, val words2: String) extends HumanActor

{
import JapaneseActor._

val interval1: FiniteDuration = 5.seconds
val interval2: FiniteDuration = 3.seconds
var schedule1: Option[Cancellable] = None
var schedule2: Option[Cancellable] = None
var count: Integer = 0

def receive: Receive = {
case SpeakFirst =>
speak(words1)
schedule1 = Option(toFutureMe(interval1, SpeakFirst))
case SpeakSecond =>
if (count < 2) {
count += 1
} else {
throw new Exception("I failed!")
}
speak(words2)
schedule2 = Option(toFutureMe(interval2, SpeakSecond))
}

override def preStart(): Unit = {
self ! SpeakFirst
self ! SpeakSecond
}

override def postStop(): Unit = {
schedule1.foreach { s => s.cancel() }
schedule2.foreach { s => s.cancel() }
}
}

以上で再帰処理の増殖を防ぐことができます。


補足


What actually happens though is that the preRestart() and postRestart() methods are called which, if not overridden, by default delegate to postStop() and preStart() respectively.


こちらからの引用です。


おわりに

便利な機能が一転して牙を剝くという状況は興奮?してしまいますね!





  1. JapaneseActorHumanActor から拡張している意味は特にありません。今回の問題に気づいた元のプログラムが似たような構成になっていたためです。 



  2. TRICK に出演されていた方です。 



  3. 定期的な処理なら scheduleOnce ではなく schedule を使えば良いのですが、処理間隔が変化し得る場合には scheduleOnce を再帰的に使う方法が有効だと思います。この記事では、簡単のために処理間隔を一定にしています。 



  4. 既に実行された処理に対して呼び出してしまっても問題ありません。戻り値が true ではなく false になるだけです。