13
5

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

[Akka]CircuitBreakerはどう動くのか

Posted at

CircuitBreakerとは

原典にあたるのはMartin Fowler氏のpostになるはず。
リモートへのリクエストが失敗した時にうまいこと障害が伝搬しないようにするためのデザイン。

昨今のMicroserviceアーキテクチャな文脈だと考えやすい。
特定のサービスがダウンしてしまった時に、
それに依存するサービスが引きずられてダウンしてしまわないようにするもの。
Reactiveなシステムを作る上で欠かせない存在になりそう。

CircuitBreakerの状態

監視対象とするサービスの状態に応じてClose, Open, Half-Openの3つをとる。
それぞれの状態の意味はおおざっぱに以下。

  • Close
    • 正常に動作していて、対象サービスにリクエストを送る状態
  • Open
    • 対象サービスに異常があり、リクエストを送らない状態
  • Half-Open
    • そろそろ直ったかな、とちょっとリクエストを送ろうとしてみる状態

Closeが正常でOpenが異常というのは何となく直感に反するが、
CircuitBreakerの語源が電気回路にあることを考慮すれば自然。

Close状態でサービスの失敗が重なるとOpenに移行、一定時間後にHalf-Openとなり、
サービスに一度リクエストを送信して成功したらClose、失敗したらOpenに戻る、という状態遷移。

AkkaのCircuitBreaker

AkkaもCircuitBreakerを提供している。
Circuit Breaker — Akka Documentation
ちなみに実装はここにある。
この記事ではakka.pattern.CircuitBreakerを使ってCircuitBreakerの動作を追って理解する。

動作を確認するためにサンプルを実装してみる

コードはgithubに置いた。

サービスにあたるものを実装

まず、障害が起こりうるサービスをActorで用意する。

case class Message(value: String)
case object PanicMessage
case object HeavyMessage

class UnstableActor extends Actor {
  override def receive: Receive = {
    case Message(value) =>
      println(value)
      sender ! s"receive: $value"
    case PanicMessage =>
      // just fail
      sys.error("Oops...")
    case HeavyMessage =>
      // sleep over callTimeout
      Thread.sleep(2000)
      sender ! s"finish $HeavyMessage"
  }
}
  • Message(value)は正常処理出来るメッセージ
  • PanicMessageは例外をthrowしてしまうメッセージ
  • HeavyMessageは時間のかかる処理をするメッセージ

となっている。

CircuitBreakerを用意

implicit val system = ActorSystem(s"AkkaCircuitBreakerPrac")
val circuitBreaker = CircuitBreaker(
  system.scheduler,
  maxFailures = 2,
  callTimeout =  1.seconds,
  resetTimeout =  3.seconds)
  .onOpen(println(s"OPEN"))
  .onClose(println(s"CLOSE"))
  .onHalfOpen(println(s"HALF-OPEN"))

状態遷移時のコールバックを登録できるので標準出力に表示するようにしている。
CircuitBreaker.applyの引数それぞれの意味は以下。

  • maxFailures
    • Close状態でmaxFailures回、サービスの実行に失敗したらOpen状態に移行する
  • callTimeout
    • サービスを実行しようとしてcallTimeout時間経過しても結果が返ってこなければ失敗とみなす
  • resetTimeout
    • Open状態でresetTimeout時間経過したらHalf-Openに移行する

CircuitBreakerの動作を見る

まずはActorRefとかの準備

val actorRef = system.actorOf(Props[UnstableActor])
implicit val dispatcher: ExecutionContext = ExecutionContext.Implicits.global
implicit val timeout = Timeout(5.seconds)

CircuitBreaker#withCircuitBreakerで監視しつつActorRefへの?を実行。
初期状態ではCloseになっているので正常に処理される。

// 状態はClose
circuitBreaker.withCircuitBreaker(actorRef ? Message("1"))
circuitBreaker.withCircuitBreaker(actorRef ? Message("2"))

次に一度Failureになる処理を実行する。
maxFailureが2なのでCloseなまま。

// 1回失敗
circuitBreaker.withCircuitBreaker(actorRef ? PanicMessage)
circuitBreaker.withCircuitBreaker(actorRef ? Message("3"))
// Closeのまま

2回FailureさせてOpenにする。

// `maxFailure`を超える2回の失敗
circuitBreaker.withCircuitBreaker(actorRef ? PanicMessage)
circuitBreaker.withCircuitBreaker(actorRef ? HeavyMessage)
// `HeavyMesage`の処理が終わるまで待つ
Thread.sleep(3000)

// Openになっているためメッセージは処理されない
circuitBreaker.withCircuitBreaker(actorRef ? Message("4"))

resetTimeout以上の時間が経つとHalf-Openに移行する。

// `resetTimeout`以上待つ
Thread.sleep(3500)

Half-Openの状態で処理に成功するとCloseに戻る。

circuitBreaker.withCircuitBreaker(actorRef ? Message("5"))
// Closeに戻る

// 以下は成功する
circuitBreaker.withCircuitBreaker(actorRef ? Message("6"))
circuitBreaker.withCircuitBreaker(actorRef ? Message("7"))
circuitBreaker.withCircuitBreaker(actorRef ? Message("8"))

感想

CircuitBreakerが持つClose,Half-Open,Openの3つの状態と、
失敗する可能性のある外部サービスへのアクセスなどをマッピングがうまく出来ていれば、
いわゆるレジリエントなシステムになる気がする。

13
5
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
13
5

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?