メッセージの遅延の伝播
Actor ModelのActorはmailboxに届けられたメッセージを自分のスレッドで一つずつ処理するので、何らかの障害が起こって処理の遅延が発生した時に処理の遅延が後続のメッセージの処理にも伝播します。
例えば、ある処理が平均50msの処理時間がかかるときに、メッセージの送り手(Producer)が50ms以上の間隔でメッセージを送り続けたりすると、レイテンシはどんどん大きくなってしまいます。このとき、producerが40msの間隔でメッセージを送り続けると、1秒後には250msの遅延が発生します。
Circuit Breaker
Circuit Breakerはメッセージの送信側の方で想定よりも応答に時間がかかっている処理を失敗とみなして、メッセージの送信を遮断します。Back Pressureを行う場合にはメッセージの受け手で許容量を送り手に伝える仕組みが必要ですが、Circuit Breakerを使う場合にはメッセージの受け手の方でそうした制御をする必要はありません。
ただし、負荷を分散するためにスレッドの数やノードの数を調整したり、遮断したメッセージの回復処理を行う「場合には別途そのような仕組みを作りこむ必要があります。
Circuit Breakerの状態
Circuit Breakerは3つの状態を遷移し、メッセージの送信を制御します。
- Close
メッセージを送信可能な状態 - Open
メッセージの送信を遮断している状態。Open時にメッセージを送信しようとするとCircuitBreakerOpenExceptionがスローされる。 - Half-Open
メッセージの送信を一時的に再開している状態。Half-Open状態のときにメッセージの送信に成功すると(所定時間内に処理が終わると)Close状態に戻る。失敗した場合はOpen状態になる。
サンプルプログラム
一定時間ごとにProducerがメッセージを送り続け、Workerがそのメッセージを処理します。
Workerは単一ではProducerがメッセージを送り続ける頻度よりも処理能力が低いので、処理遅延が発生します。遅延が一定のしきいを超えるとスレッド数を増やして処理能力を増やすようにします。
Circuit Breakerは以下のように初期化します。
// workerのrouter。
private val worker = context.actorOf(Worker.routerProps, "worker")
// workerの数を増減させるActor
private val resizer = context.actorOf(PoolResizer.props(worker.path))
val breaker = new CircuitBreaker(
context.system.scheduler,
maxFailures = 5, // この回数を超えた失敗があった場合にはOpen状態になる。
callTimeout = 150.milliseconds, // 失敗とみなす処理時間のしきい値。レイテンシを150 milli sec以下に抑えたい
resetTimeout = 50.milliseconds // Open状態になってからHalf-Openになるまでの時間。
)(context.dispatcher)
.onOpen({
log.info("circuit breaker opened.")
resizer ! PoolResizer.Up // workerの数を増やす
})
.onClose({
log.info("circuit breaker closed.")
resizer ! PoolResizer.Down // workerの数を減らす
})
.onHalfOpen(log.info("circuit breaker half-opened."))
Circuit Breakerから処理を呼び出すときは以下のようにask patternと組み合わせてメッセージを呼び出します。
Circuit Breakerによって遮断されたメッセージを回復する必要がある場合はFutureのエラーハンドラを使って回復処理を書きます。
def receive = {
case m: Worker.Work =>
receiveCount += 1
log.info("receive count=" + receiveCount)
implicit val dispatcher = context.system.dispatchers.lookup("master-future-dispatcher")
// Circuit Breakerを使ってメッセージを
val future = breaker.withCircuitBreaker {
val future = worker ? m
pipe(future) to counter// 結果を数える。
future
}
future.onFailure {
case ce: CircuitBreakerOpenException =>
producer ! Producer.Retry(m) // 失敗したメッセージをリカバリ処理に乗せる。
case t: Throwable =>
log.warning(t.toString())
}
}
動作するプログラムはこちらにおいています。
サンプルのプログラムではworkerの数をリサイズする仕組みを自作していますが、Akkaの標準APIにResizerというものが用意されており、workerを増減させるくらいの調整だったらこちらを使った方がよさそうです。