概要
この記事では、Actorのエラーハンドリングの設定を独自実装して、検証した内容を記載したいと思います。
Akkaでの耐障害性について
Akkaでは「let it crash」という思想のもと、耐障害性を確保しています。SupervisorがActorを管理し、監視対象のActorに障害が発生した際、プロセスをRestartさせたり、Stopさせたりする事で、エラー時のハンドリングも自分の好きなようにカスタマイズする事が出来ます。
SuperVisorStrategyについて
Akkaでは、子アクターが例外処理出来ないケースでは、親アクターが例外のハンドリングを行います。今回は下記の2つを取り上げます。
- OneForOneStrategy
- AllForOneStrategy
OneForOneStrategyとは
子アクターの例外を検知すると、親が対象の子アクターのみを再起動します。デフォルトはこちらの設定を使用しています。
AllForOneStrategyとは
一方でこちらは、子アクターの例外を検知すると、親が管理しているの子アクター全てを再起動します。相互の依存関係にある場合はこちらを使用します。
Akkaのソースを見ながら実装してみる
- 子アクターの監視設定するメソッド。デフォルトはOneForOneStrategyが設定されている。
trait Actor {
/**
* User overridable definition the strategy to use for supervising
* child actors.
*/
def supervisorStrategy: SupervisorStrategy = SupervisorStrategy.defaultStrategy //デフォルトはOneForOneStrategy
- supervisorStrategyをoverrideして、AllForOneStrategyの設定をしてみる
import akka.actor.SupervisorStrategy._
override def supervisorStrategy = AllForOneStrategy(maxNrOfRetries = 10, withinTimeRange = Duration(1, "second")) {
case _:Exception => Restart
}
- AllForOneStrategyに渡せるパラメータはこちら
/**
* @param maxNrOfRetries the number of times a child actor is allowed to be restarted, negative value means no limit,
* if the limit is exceeded the child actor is stopped
* @param withinTimeRange duration of the time window for maxNrOfRetries, Duration.Inf means no window
* @param decider mapping from Throwable to [[akka.actor.SupervisorStrategy.Directive]], you can also use a
* [[scala.collection.immutable.Seq]] of Throwables which maps the given Throwables to restarts, otherwise escalates.
* @param loggingEnabled the strategy logs the failure if this is enabled (true), by default it is enabled
*/
実際にコードを書いて確認してみた
登場人物
- 親アクター [SupervisorExceptionActor] → 子アクターを起動する
- 子アクター1 [ExceptionActor] → 例外を吐く
- 子アクター2 [NoExceptionActor] → printlnするだけ
ソースコード
import akka.actor.{Actor, ActorSystem, AllForOneStrategy, Props}
import scala.concurrent.duration.Duration
object ActorSample {
def main(args: Array[String]): Unit = {
val system = ActorSystem("helloSystem")
val actor = system.actorOf(Props[SupervisorExceptionActor])
for(i <- 0 to 10) {
actor ! i
Thread.sleep(3 * 1000)
}
system.shutdown
}
}
class SupervisorExceptionActor extends Actor {
val eXActor = context.actorOf(Props[ExceptionActor])
val noExActor = context.actorOf(Props[NoExceptionActor])
override def preStart() = println("preStart() in SupervisorExceptionActor")
import akka.actor.SupervisorStrategy._
override def supervisorStrategy = AllForOneStrategy(maxNrOfRetries = 10, withinTimeRange = Duration(1, "second")) {
case _:Exception => Restart
}
def receive = {
case x => {
eXActor ! x
noExActor ! x
}
}
}
class ExceptionActor extends Actor {
override def preStart() = println("preStart() in ExceptionActor")
def receive = {
case 4 => {
println("---------- ERROR! ----------")
throw new Exception("Dead Exception")
}
case x => println("message:" + x + " : " + this)
}
}
実行結果
- 例外が発生した後に、prestartされ、子アクターを両方とも起動し直している。
preStart() in ExceptionActor
preStart() in SupervisorExceptionActor
preStart() in NoExceptionActor
message:0 : ExceptionActor@6073f10d
message:0 : NoExceptionActor@566f78e6
message:1 : NoExceptionActor@566f78e6
message:1 : ExceptionActor@6073f10d
message:2 : NoExceptionActor@566f78e6
message:2 : ExceptionActor@6073f10d
message:3 : ExceptionActor@6073f10d
message:3 : NoExceptionActor@566f78e6
message:4 : NoExceptionActor@566f78e6
---------- ERROR! ----------
[ERROR] [07/06/2017 16:26:06.598] [helloSystem-akka.actor.default-dispatcher-4] [akka://helloSystem/user/$a/$a] Dead Exception
java.lang.Exception: Dead Exception
at ExceptionActor$$anonfun$receive$2.applyOrElse(ActorSample.scala:45)
at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
at ExceptionActor.aroundReceive(ActorSample.scala:39)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
at akka.dispatch.Mailbox.run(Mailbox.scala:221)
at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
preStart() in NoExceptionActor
preStart() in ExceptionActor
message:5 : ExceptionActor@4ff9afc8
message:5 : NoExceptionActor@155e3918
message:6 : ExceptionActor@4ff9afc8
message:6 : NoExceptionActor@155e3918
message:7 : NoExceptionActor@155e3918
message:7 : ExceptionActor@4ff9afc8
message:8 : ExceptionActor@4ff9afc8
message:8 : NoExceptionActor@155e3918
message:9 : NoExceptionActor@155e3918
message:9 : ExceptionActor@4ff9afc8
message:10 : ExceptionActor@4ff9afc8
message:10 : NoExceptionActor@155e3918
Process finished with exit code 0
BackoffSupervisor による運用負荷軽減
BackoffSupervisorって何ができるの?
子アクターが失敗したときに、親アクターが子アクターを再起動する間隔を伸ばして起動する事ができる。
val supervisor = BackoffSupervisor.props(
Backoff.onStop(
childProps,
childName = "myEcho",
minBackoff = 3.seconds,
maxBackoff = 30.seconds,
randomFactor = 0.2 // adds 20% "noise" to vary the intervals slightly
))
maxBackoff、minBackoffは、バックオフする時間の最大最小の時間。
randamFactorは、再起動する時間にある程度ランダムな時間を付与する事が出来る。今回は検証なので誤差1%で試した。
実際にコードを書いて確認してみた
import akka.actor.{Actor, ActorSystem, Props}
import akka.pattern.{Backoff, BackoffSupervisor}
import org.joda.time.DateTime
import scala.concurrent.duration._
object ActorSample2 {
def main(args: Array[String]): Unit = {
val system = ActorSystem("EchoActor")
val supervisor = BackoffSupervisor.props(
Backoff.onFailure(
Props(classOf[EchoActor]),
childName = "EchoActor",
minBackoff = 10.seconds,
maxBackoff = 400.seconds,
randomFactor = 0.01
))
val actor = system.actorOf(supervisor)
for(i <- 0 to 1000) {
actor ! i
Thread.sleep(5 * 1000)
}
}
}
class EchoActor extends Actor {
val now = new DateTime
override def preStart() = {
println("preStart() in SupervisorExceptionActor" + now.toString("yyyy/MM/dd HH:mm:ss"))
}
def receive = {
case x => {
throw new RuntimeException("Dead Exception")
}
}
}
想定結果
- minBackoff = 10、 maxBackoff = 400 と設定しているので、Actorのエラー後に、10、20、40、80、160、320秒と再起動がかかる
実際の結果
[ERROR] [07/24/2017 19:18:20.068] [EchoActor-akka.actor.default-dispatcher-4] [akka://EchoActor/user/$a/EchoActor] Dead Exception
preStart() in SupervisorExceptionActor2017/07/24 19:18:30
→ ERRORから10秒後に再起動が走る
[ERROR] [07/24/2017 19:18:34.994] [EchoActor-akka.actor.default-dispatcher-3] [akka://EchoActor/user/$a/EchoActor] Dead Exception
preStart() in SupervisorExceptionActor2017/07/24 19:18:55
→ ERRORから21秒後に再起動が走る
[ERROR] [07/24/2017 19:19:00.009] [EchoActor-akka.actor.default-dispatcher-2] [akka://EchoActor/user/$a/EchoActor] Dead Exception
preStart() in SupervisorExceptionActor2017/07/24 19:19:40
→ ERRORから40秒後に再起動が走る
[ERROR] [07/24/2017 19:19:45.040] [EchoActor-akka.actor.default-dispatcher-4] [akka://EchoActor/user/$a/EchoActor] Dead Exception
preStart() in SupervisorExceptionActor2017/07/24 19:21:05
→ ERRORから80秒後に再起動が走る
[ERROR] [07/24/2017 19:21:10.102] [EchoActor-akka.actor.default-dispatcher-4] [akka://EchoActor/user/$a/EchoActor] Dead Exception
preStart() in SupervisorExceptionActor2017/07/24 19:23:50
→ ERRORから160秒後に再起動が走る
[ERROR] [07/24/2017 19:23:55.215] [EchoActor-akka.actor.default-dispatcher-4] [akka://EchoActor/user/$a/EchoActor] Dead Exception
preStart() in SupervisorExceptionActor2017/07/24 19:29:18
→ ERRORから323秒後に再起動が走る
[ERROR] [07/24/2017 19:29:20.446] [EchoActor-akka.actor.default-dispatcher-2] [akka://EchoActor/user/$a/EchoActor] Dead Exception
- 想定した結果通りの挙動をしている。
結論
一時的にDBに接続出来ない場合や、API Callしても一時的にレスポンスが返ってこない場合などで、一定の間隔を置いて起動したい場合にこの設定を入れておけば、DB接続の負荷を軽減したり、リクエストの流量を制御する事が出来るので、是非設定してみて欲しい。