1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

alpakka-kafkaのTipsメモ

Posted at

alpakka-kafkaを使った際のメモ書き、得られた知見を貯めとく場所

DrainingControlの設定あれこれ

こっそりドキュメントにノートされてるけど、DrainingControlを用いることでシャットダウン時のDelayの設定は不要になる。なのできっちり設定しておいた方が幸せ。

val drainingControl =
  Consumer
    .committableSource(consumerSettings.withStopTimeout(Duration.Zero), Subscriptions.topics(topic))

DrainingControlとRestartSourceの組み合わせについて

公式の例はCommittableSourceでのサンプルが記述してあるんですが、RestartSourcemについての言及はありません
テストコードを覗くと下記のようなコードが見つかります

Refs: https://github.com/akka/alpakka-kafka/blob/ed97467ab1be70db10584ae57f35811429f7f76a/tests/src/test/scala/docs/scaladsl/ConsumerExample.scala#L509-L534

  it should "work with committable source" in assertAllStagesStopped {
    val consumerSettings = consumerDefaults.withGroupId(createGroupId())
    val committerSettings = committerDefaults
    val topic = createTopic()
    val partitionNumber = 0
    val control = new AtomicReference[Consumer.Control](Consumer.NoopControl)

    val result = RestartSource
      .onFailuresWithBackoff(
        minBackoff = 3.seconds,
        maxBackoff = 30.seconds,
        randomFactor = 0.2
      ) { () =>
        val subscription = Subscriptions.assignment(new TopicPartition(topic, partitionNumber))
        Consumer
          .committableSource(consumerSettings, subscription)
          .mapMaterializedValue(c => control.set(c))
          .via(businessFlow)
          .map(_.committableOffset)
          .via(Committer.flow(committerSettings))
      }
      .runWith(Sink.ignore)
    awaitProduce(produce(topic, 1 to 10))
    // when shut down is desired
    val drainingControl = DrainingControl(Tuple2(control.get(), result))
    drainingControl.drainAndShutdown().futureValue shouldBe Done
  }

ここでは RestartSource.onFailuresWithBackoff を使っていますが、実は RestartSource.withBackoff を使うと drainAndShutdown をコールしてもいつまでもConsumeが止まらない状態になってしまいます。(2019/12/08現在)
そのため、サンプルに合わせて RestartSource.onFailuresWithBackoff を用いる必要があります。元々、RestartSource.withBackoffを使っていた人向けの対策は分からず…

1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?