CircuitBreakerとは
原典にあたるのはMartin Fowler氏のpostになるはず。
リモートへのリクエストが失敗した時にうまいこと障害が伝搬しないようにするためのデザイン。
昨今のMicroserviceアーキテクチャな文脈だと考えやすい。
特定のサービスがダウンしてしまった時に、
それに依存するサービスが引きずられてダウンしてしまわないようにするもの。
Reactiveなシステムを作る上で欠かせない存在になりそう。
CircuitBreakerの状態
監視対象とするサービスの状態に応じてClose, Open, Half-Openの3つをとる。
それぞれの状態の意味はおおざっぱに以下。
- Close
- 正常に動作していて、対象サービスにリクエストを送る状態
- Open
- 対象サービスに異常があり、リクエストを送らない状態
- Half-Open
- そろそろ直ったかな、とちょっとリクエストを送ろうとしてみる状態
Closeが正常でOpenが異常というのは何となく直感に反するが、
CircuitBreaker
の語源が電気回路にあることを考慮すれば自然。
Close状態でサービスの失敗が重なるとOpenに移行、一定時間後にHalf-Openとなり、
サービスに一度リクエストを送信して成功したらClose、失敗したらOpenに戻る、という状態遷移。
AkkaのCircuitBreaker
AkkaもCircuitBreakerを提供している。
Circuit Breaker — Akka Documentation
ちなみに実装はここにある。
この記事ではakka.pattern.CircuitBreaker
を使ってCircuitBreakerの動作を追って理解する。
動作を確認するためにサンプルを実装してみる
コードはgithubに置いた。
サービスにあたるものを実装
まず、障害が起こりうるサービスをActorで用意する。
case class Message(value: String)
case object PanicMessage
case object HeavyMessage
class UnstableActor extends Actor {
override def receive: Receive = {
case Message(value) =>
println(value)
sender ! s"receive: $value"
case PanicMessage =>
// just fail
sys.error("Oops...")
case HeavyMessage =>
// sleep over callTimeout
Thread.sleep(2000)
sender ! s"finish $HeavyMessage"
}
}
-
Message(value)
は正常処理出来るメッセージ -
PanicMessage
は例外をthrowしてしまうメッセージ -
HeavyMessage
は時間のかかる処理をするメッセージ
となっている。
CircuitBreakerを用意
implicit val system = ActorSystem(s"AkkaCircuitBreakerPrac")
val circuitBreaker = CircuitBreaker(
system.scheduler,
maxFailures = 2,
callTimeout = 1.seconds,
resetTimeout = 3.seconds)
.onOpen(println(s"OPEN"))
.onClose(println(s"CLOSE"))
.onHalfOpen(println(s"HALF-OPEN"))
状態遷移時のコールバックを登録できるので標準出力に表示するようにしている。
CircuitBreaker.apply
の引数それぞれの意味は以下。
- maxFailures
- Close状態で
maxFailures
回、サービスの実行に失敗したらOpen状態に移行する
- Close状態で
- callTimeout
- サービスを実行しようとして
callTimeout
時間経過しても結果が返ってこなければ失敗とみなす
- サービスを実行しようとして
- resetTimeout
- Open状態で
resetTimeout
時間経過したらHalf-Openに移行する
- Open状態で
CircuitBreakerの動作を見る
まずはActorRef
とかの準備
val actorRef = system.actorOf(Props[UnstableActor])
implicit val dispatcher: ExecutionContext = ExecutionContext.Implicits.global
implicit val timeout = Timeout(5.seconds)
CircuitBreaker#withCircuitBreaker
で監視しつつActorRef
への?
を実行。
初期状態ではCloseになっているので正常に処理される。
// 状態はClose
circuitBreaker.withCircuitBreaker(actorRef ? Message("1"))
circuitBreaker.withCircuitBreaker(actorRef ? Message("2"))
次に一度Failure
になる処理を実行する。
maxFailure
が2なのでCloseなまま。
// 1回失敗
circuitBreaker.withCircuitBreaker(actorRef ? PanicMessage)
circuitBreaker.withCircuitBreaker(actorRef ? Message("3"))
// Closeのまま
2回Failure
させてOpenにする。
// `maxFailure`を超える2回の失敗
circuitBreaker.withCircuitBreaker(actorRef ? PanicMessage)
circuitBreaker.withCircuitBreaker(actorRef ? HeavyMessage)
// `HeavyMesage`の処理が終わるまで待つ
Thread.sleep(3000)
// Openになっているためメッセージは処理されない
circuitBreaker.withCircuitBreaker(actorRef ? Message("4"))
resetTimeout
以上の時間が経つとHalf-Openに移行する。
// `resetTimeout`以上待つ
Thread.sleep(3500)
Half-Openの状態で処理に成功するとCloseに戻る。
circuitBreaker.withCircuitBreaker(actorRef ? Message("5"))
// Closeに戻る
// 以下は成功する
circuitBreaker.withCircuitBreaker(actorRef ? Message("6"))
circuitBreaker.withCircuitBreaker(actorRef ? Message("7"))
circuitBreaker.withCircuitBreaker(actorRef ? Message("8"))
感想
CircuitBreakerが持つClose,Half-Open,Openの3つの状態と、
失敗する可能性のある外部サービスへのアクセスなどをマッピングがうまく出来ていれば、
いわゆるレジリエントなシステムになる気がする。