6
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

ActorでCircuitBreakerを実装してみた話

Posted at

実装してみる理由

ReactiveSystemやMicroservices等の文脈で登場するCircuitBreakerについて以前調べた。

CircuitBreakerは内部にClose, Open, Half-Openの3つの状態を持ち、さらに非同期処理の実行とその監視という2つの責務を持つ。
状態の管理と責務の分割にはActorが適しているんじゃないかと思ったのがきっかけ。

リポジトリはここ。
petitviolet/supervisor
Actor同士のsuperviseしている状態に似ている(と感じた)のでsupervisorという名前にしてある。
気分が乗ったのでMavenCentralにも公開している。

使い方

まずはCircuitBreakerにあたるものを作成する。
Supervisorというakka.actor.Actorがあり、それを普通にActorSystem#actorOfActorRefを作成する。

// 事前準備
implicit val system = ActorSystem(s"SupervisorPrac")
implicit val dispatcher: ExecutionContext = ExecutionContext.fromExecutor(new ForkJoinPool(1))

// `Supervisor`なactorRefを作成
import scala.concurrent.duration._
val supervisorActor = system.actorOf(Supervisor.props(
  maxFailCount = 2,
  runTimeout = 1000.milliseconds,
  resetWait = 3000.milliseconds
))

外部サービス呼び出しを模倣するscala.util.FutureをCircuitBreakerで監視しつつ実行して結果を取り出す。

// 何かしらの非同期処理
val future = Future.apply { val i = Random.nextInt(2000); Thread.sleep(i); i }

// `Execute`にいれて`supervisorActor`に送りつける
supervisorActor ? Execute(future) onComplete {
  case Success(x) => println(s"success => $x")
  case Failure(t) => println(s"fail => $t")
}

実行してみると

  • 成功時はsuccess => 348
  • 失敗時はfail => java.util.concurrent.TimeoutException: Futures timed out after [1000 milliseconds]

といった感じのログが出力される。

akka.pattern.CircuitBreaker#withCircuitBreakerと同じようにscala.util.Futureを監視対象として、
専用のメッセージ(Execute)に入れてSupervisorActorRefに対して送るとCircuitBreakerらしく動く。

どうやって作るか

大きく、状態の管理とFutureの監視に分けられる。

状態の管理

Actorが持つbecomeを使う。
akka.actor.FSMを使うのも良さそうだが、まずは普通のActorでやってみる。
実装の全体はこのへん
抜き出すには量が多いのでリンク先を参照してもらいたい。

基本的には状態としてCloseOpenにあたるreceiveがあれば良くて、ざっくりと以下の2つを用意すれば良い。

  • 受け取ったFutureから値を取り出すreceive
  • 例外を投げ続けるreceive

Futureの失敗回数を数えておき、Supervisorの初期化時に設定したmaxFailCountrunTimeoutに応じて状態を変更する。

OpenからHalf-Openへの遷移にはakka.actor.Schedulerが使える。
Schedulerで指定されたresetWait時間後に自分自身にHalf-Openへ戻るためのメッセージを送信するように設定しておけば、処理したタイミングでHalf-Openになれる。
Half-OpenClose状態と同じreceiveでまかなえるが、次にFutureが失敗したら即座にOpenに戻るように仕込んでおく。

無駄な処理をさせない {#lazy-evaluation}

Supervisorの状態がOpenの時に外部サービス呼び出しなどを行っても無駄になってしまう。
そこで、Executeメッセージとして渡されるFutureを実行してしまわないように遅延評価する。
コンストラクタはcall-by-nameな=> Future[T]にしておいて内部的には() => Future[T]な関数として扱うようにした。
object側でapplyunapplyを用意して便利に扱えるようにしておく。実装

Futureの監視

FuturerunTimeoutで指定した時間内に成功するかどうかはAwait.resultでブロックしてFutureから値を取り出す。
そのブロックするのはSupervisor自身ではなくて、その子アクターに委譲している。
子アクターのreceiveを抜粋するとこんな感じ。

override def receive: Actor.Receive = {
  case Run =>
    log.debug(s"ExecutorActor: $message")
    Try { Await.result(message.run.apply, timeout) } match {
      case Success(result) =>
        respondSuccessToParent(originalSender, result)
      case Failure(t) =>
        message match {
          case ExecuteWithFallback(_, fallback) =>
            respondSuccessToParent(originalSender, fallback.apply)
          case _ =>
            respondFailureToParent(originalSender, t)
        }
    }

成功と失敗を区別しつつ親アクターとなるSupervisorにメッセージを送り返す。
失敗していたらSupervisor側でその回数を記録しておき、状態遷移判定を行えば良い。

テストを書く

受け取ったメッセージによって状態が変わり、タイムアウトなど時間経過を含むActorのテストを書くにはTestkitが非常に便利だった。
Testing Actor Systems — Akka Documentation
実際に使ってSupervisorテストを書いた

テストしたい対象をTestActorRefでwrapすれば自由に内部状態にアクセス出来るようになるため、以下のように書ける。

val supervisor = TestActorRef.apply[Supervisor[_]](Supervisor.props(maxFailCount, runTimeout, resetWait))
supervisor.underlyingActor.becomeOpen()
supervisor.underlyingActor.state shouldBe Open

普段はActorそのものではなくてActorRefとなることで内部状態がカプセル化されているが、
Testkitのおかげでテストは非常に楽に書けた。

問題点

Supervisorが出来ることはFutureを監視するだけなのでシンプルだが、使い方には注意点がある。
上で説明したように、監視対象とするためのExecuteのコンストラクタ(apply)はcall-by-nameなパラメータとしてあるが、
Execute#applyの前にFuture.applyを呼んでいると外部サービス呼び出し等の非同期処理が走り出してしまう。
従って、Supervisorの状態がOpenなら完全に無駄になってしまう処理を実行しようとすることになる。

つまり、これと

val future = Future.apply { ??? }
supervisor ? Execute(future)

これが違う。

supervisor ? Execute(Future.apply { ??? })

上のように使用すると無駄な処理が走るが、下なら何も起きない。
テストとして書くとこんな感じ?

解決策は、Futureそのものを拡張してOpenなら実行しないようにしなければならない。
AkkaのCircuitBreakerも同じような問題点がある(はず)。
そうなるとHystrixのようにCommandという形で外部サービス呼び出しと監視をまとめて実装する方が安全になりそう。

6
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
6
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?