alpakka-kafkaを使った際のメモ書き、得られた知見を貯めとく場所
DrainingControlの設定あれこれ
こっそりドキュメントにノートされてるけど、DrainingControlを用いることでシャットダウン時のDelayの設定は不要になる。なのできっちり設定しておいた方が幸せ。
val drainingControl =
Consumer
.committableSource(consumerSettings.withStopTimeout(Duration.Zero), Subscriptions.topics(topic))
DrainingControlとRestartSourceの組み合わせについて
公式の例はCommittableSourceでのサンプルが記述してあるんですが、RestartSourcemについての言及はありません
テストコードを覗くと下記のようなコードが見つかります
it should "work with committable source" in assertAllStagesStopped {
val consumerSettings = consumerDefaults.withGroupId(createGroupId())
val committerSettings = committerDefaults
val topic = createTopic()
val partitionNumber = 0
val control = new AtomicReference[Consumer.Control](Consumer.NoopControl)
val result = RestartSource
.onFailuresWithBackoff(
minBackoff = 3.seconds,
maxBackoff = 30.seconds,
randomFactor = 0.2
) { () =>
val subscription = Subscriptions.assignment(new TopicPartition(topic, partitionNumber))
Consumer
.committableSource(consumerSettings, subscription)
.mapMaterializedValue(c => control.set(c))
.via(businessFlow)
.map(_.committableOffset)
.via(Committer.flow(committerSettings))
}
.runWith(Sink.ignore)
awaitProduce(produce(topic, 1 to 10))
// when shut down is desired
val drainingControl = DrainingControl(Tuple2(control.get(), result))
drainingControl.drainAndShutdown().futureValue shouldBe Done
}
ここでは RestartSource.onFailuresWithBackoff
を使っていますが、実は RestartSource.withBackoff
を使うと drainAndShutdown
をコールしてもいつまでもConsumeが止まらない状態になってしまいます。(2019/12/08現在)
そのため、サンプルに合わせて RestartSource.onFailuresWithBackoff
を用いる必要があります。元々、RestartSource.withBackoff
を使っていた人向けの対策は分からず…