実装してみる理由
ReactiveSystemやMicroservices等の文脈で登場するCircuitBreakerについて以前調べた。
CircuitBreakerは内部にClose
, Open
, Half-Open
の3つの状態を持ち、さらに非同期処理の実行とその監視という2つの責務を持つ。
状態の管理と責務の分割にはActorが適しているんじゃないかと思ったのがきっかけ。
リポジトリはここ。
petitviolet/supervisor
Actor同士のsuperviseしている状態に似ている(と感じた)のでsupervisor
という名前にしてある。
気分が乗ったのでMavenCentralにも公開している。
使い方
まずはCircuitBreakerにあたるものを作成する。
Supervisor
というakka.actor.Actor
があり、それを普通にActorSystem#actorOf
でActorRef
を作成する。
// 事前準備
implicit val system = ActorSystem(s"SupervisorPrac")
implicit val dispatcher: ExecutionContext = ExecutionContext.fromExecutor(new ForkJoinPool(1))
// `Supervisor`なactorRefを作成
import scala.concurrent.duration._
val supervisorActor = system.actorOf(Supervisor.props(
maxFailCount = 2,
runTimeout = 1000.milliseconds,
resetWait = 3000.milliseconds
))
外部サービス呼び出しを模倣するscala.util.Future
をCircuitBreakerで監視しつつ実行して結果を取り出す。
// 何かしらの非同期処理
val future = Future.apply { val i = Random.nextInt(2000); Thread.sleep(i); i }
// `Execute`にいれて`supervisorActor`に送りつける
supervisorActor ? Execute(future) onComplete {
case Success(x) => println(s"success => $x")
case Failure(t) => println(s"fail => $t")
}
実行してみると
- 成功時は
success => 348
- 失敗時は
fail => java.util.concurrent.TimeoutException: Futures timed out after [1000 milliseconds]
といった感じのログが出力される。
akka.pattern.CircuitBreaker#withCircuitBreaker
と同じようにscala.util.Future
を監視対象として、
専用のメッセージ(Execute
)に入れてSupervisor
なActorRef
に対して送るとCircuitBreakerらしく動く。
どうやって作るか
大きく、状態の管理とFuture
の監視に分けられる。
状態の管理
Actor
が持つbecome
を使う。
akka.actor.FSM
を使うのも良さそうだが、まずは普通のActor
でやってみる。
実装の全体はこのへん。
抜き出すには量が多いのでリンク先を参照してもらいたい。
基本的には状態としてClose
とOpen
にあたるreceive
があれば良くて、ざっくりと以下の2つを用意すれば良い。
- 受け取った
Future
から値を取り出すreceive
- 例外を投げ続ける
receive
Future
の失敗回数を数えておき、Supervisor
の初期化時に設定したmaxFailCount
やrunTimeout
に応じて状態を変更する。
Open
からHalf-Open
への遷移にはakka.actor.Scheduler
が使える。
Scheduler
で指定されたresetWait
時間後に自分自身にHalf-Open
へ戻るためのメッセージを送信するように設定しておけば、処理したタイミングでHalf-Open
になれる。
Half-Open
はClose
状態と同じreceive
でまかなえるが、次にFuture
が失敗したら即座にOpen
に戻るように仕込んでおく。
無駄な処理をさせない {#lazy-evaluation}
Supervisor
の状態がOpen
の時に外部サービス呼び出しなどを行っても無駄になってしまう。
そこで、Execute
メッセージとして渡されるFuture
を実行してしまわないように遅延評価する。
コンストラクタはcall-by-nameな=> Future[T]
にしておいて内部的には() => Future[T]
な関数として扱うようにした。
object側でapply
とunapply
を用意して便利に扱えるようにしておく。実装
Futureの監視
Future
がrunTimeout
で指定した時間内に成功するかどうかはAwait.result
でブロックしてFuture
から値を取り出す。
そのブロックするのはSupervisor
自身ではなくて、その子アクターに委譲している。
子アクターのreceive
を抜粋するとこんな感じ。
override def receive: Actor.Receive = {
case Run =>
log.debug(s"ExecutorActor: $message")
Try { Await.result(message.run.apply, timeout) } match {
case Success(result) =>
respondSuccessToParent(originalSender, result)
case Failure(t) =>
message match {
case ExecuteWithFallback(_, fallback) =>
respondSuccessToParent(originalSender, fallback.apply)
case _ =>
respondFailureToParent(originalSender, t)
}
}
成功と失敗を区別しつつ親アクターとなるSupervisor
にメッセージを送り返す。
失敗していたらSupervisor
側でその回数を記録しておき、状態遷移判定を行えば良い。
テストを書く
受け取ったメッセージによって状態が変わり、タイムアウトなど時間経過を含むActor
のテストを書くにはTestkitが非常に便利だった。
Testing Actor Systems — Akka Documentation
実際に使ってSupervisor
のテストを書いた。
テストしたい対象をTestActorRef
でwrapすれば自由に内部状態にアクセス出来るようになるため、以下のように書ける。
val supervisor = TestActorRef.apply[Supervisor[_]](Supervisor.props(maxFailCount, runTimeout, resetWait))
supervisor.underlyingActor.becomeOpen()
supervisor.underlyingActor.state shouldBe Open
普段はActorそのものではなくてActorRef
となることで内部状態がカプセル化されているが、
Testkitのおかげでテストは非常に楽に書けた。
問題点
Supervisor
が出来ることはFuture
を監視するだけなのでシンプルだが、使い方には注意点がある。
上で説明したように、監視対象とするためのExecute
のコンストラクタ(apply
)はcall-by-nameなパラメータとしてあるが、
Execute#apply
の前にFuture.apply
を呼んでいると外部サービス呼び出し等の非同期処理が走り出してしまう。
従って、Supervisor
の状態がOpen
なら完全に無駄になってしまう処理を実行しようとすることになる。
つまり、これと
val future = Future.apply { ??? }
supervisor ? Execute(future)
これが違う。
supervisor ? Execute(Future.apply { ??? })
上のように使用すると無駄な処理が走るが、下なら何も起きない。
テストとして書くとこんな感じ?
解決策は、Future
そのものを拡張してOpen
なら実行しないようにしなければならない。
AkkaのCircuitBreakerも同じような問題点がある(はず)。
そうなるとHystrixのようにCommandという形で外部サービス呼び出しと監視をまとめて実装する方が安全になりそう。