6
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

Money Forward Engineering 2Advent Calendar 2022

Day 14

ローカル開発環境で、Kafka Producerのトランザクションを単一ブローカーで動くようにする

Last updated at Posted at 2022-12-14

この記事について

Money Forward Engineering 2 Advent Calendar 2022 - Qiita の 14日目 の記事です。

はじめに

Kafka Producerのトランザクションを使用したメッセージング処理を作るときに、
ローカル環境では単一ブローカーで最小で動かしたかったのですが、
はじめ全然動かったのを動くようにしたので、その経験を放出します。

これ、みんな通らないの?と思う程度には情報が見つからなかったので、だいぶ困りましたw
※kafkaのコード以外はkotlinです

環境

  • kafka(docker image): confluentinc/cp-kafka:7.2.2
  • kotlin: 1.7系
  • OS: M1 Mac

コード

実装コード

今回は簡単にやりたいので、文字列を送るだけのシンプルなコードを用意しました。
ロールバックの時に意図的にエラーを発生させたいので、
mockをセットできるようにコンストラクタ引数でKafkaProducerを受け付けるようにしています。

jp.sh4.ooga.TransactionalKafkaProducer
class TransactionalKafkaProducer(
    producer: KafkaProducer<Int, String>? = null
) {
    private var producer: KafkaProducer<Int, String>
    init {
        this.producer = producer ?: ProducerFactory.createInstance()
    }

    @Suppress("TooGenericExceptionCaught")
    fun messageSend(topic: String, messages: List<String>) {
        producer.use {
            try {
                it.initTransactions()
                it.beginTransaction()
                messages.forEach { message ->
                    it.send(ProducerRecord(
                        topic,
                        Math.random().toInt(),
                        message,
                    ))
                }
                it.commitTransaction()
            } catch (e: Exception) {
                it.abortTransaction()
                println(e.stackTraceToString())
                throw e
            }
        }
    }
}

テストコード

実際にレコードは取れないという確認をするために、
トピックを消費するようにしてます。
ちゃんと動くとメッセージは取れないです。

class TransactionalKafkaProducerTests: AnnotationSpec() {

    @BeforeClass
    fun setup() {
        val props = Properties()
        props[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = "localhost:19092"
        val adminClient = Admin.create(props)
        adminClient.createTopics(listOf(
            NewTopic(TEST_TOPIC, 50, 1)
        ))
    }

    @Test
    fun confirmRollback() {

        val producer = spyk(ProducerFactory.createInstance())
        val messages = listOf("message1", "message2")
        val testTarget = TransactionalKafkaProducer(producer)
        every {
            producer.commitTransaction()
        } throws (TimeoutException("thrown by mock"))
        shouldThrow<TimeoutException> {
            testTarget.messageSend(TEST_TOPIC, messages)
        }

        val consumer = KafkaConsumerExecutor(listOf(TEST_TOPIC))
        val records = consumer.consumerRecord()
        records.count() shouldBe 0
    }
}

設定

docker-compose.yml
    environment:
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: "false"
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: DOCKER_LISTENER:PLAINTEXT,LOCALHOST_LISTENER:PLAINTEXT
kafka cluster
      KAFKA_ADVERTISED_LISTENERS: DOCKER_LISTENER://kafka:9092,LOCALHOST_LISTENER://localhost:19092
      KAFKA_INTER_BROKER_LISTENER_NAME: DOCKER_LISTENER
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181/kafka
      # When kafka cluster is running with single broker, broker id becomes 0 by default. So setting it explicitly.
      KAFKA_BROKER_ID: 1
      # In local env, save consumer offset to only 1 replica
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

テストの実行〜解決まで

簡単に組み込めてとても楽だったなーと思ってテストを動かすと、全く動いている気配がない・・・
何が起きているのか、ログを確認します。

image.png

アプリ側:

[Producer clientId=producer-tx-0, transactionalId=tx-0] Sending transactional request FindCoordinatorRequestData(key='', keyType=1, coordinatorKeys=[tx-0]) to node localhost:19092 (id: 1 rack: null) with correlation ID 205
[Producer clientId=producer-tx-0, transactionalId=tx-0] Sending FIND_COORDINATOR request with header RequestHeader(apiKey=FIND_COORDINATOR, apiVersion=4, clientId=producer-tx-0, correlationId=205) and timeout 30000 to node 1: FindCoordinatorRequestData(key='', keyType=1, coordinatorKeys=[tx-0])
[Producer clientId=producer-tx-0, transactionalId=tx-0] Received FIND_COORDINATOR response from node 1 for request with header RequestHeader(apiKey=FIND_COORDINATOR, apiVersion=4, clientId=producer-tx-0, correlationId=205): FindCoordinatorResponseData(throttleTimeMs=0, errorCode=0, errorMessage='', nodeId=0, host='', port=0, coordinators=[Coordinator(key='tx-0', nodeId=-1, host='', port=-1, errorCode=15, errorMessage='')])

errorCode: 15は、COORDINATOR_NOT_AVAILABLEで、コーディネータが利用不可らしいですけど、これだけしか書いてないと意味不明です。
なので、kafka側のログも見てみます。

kafka側:

INFO [Admin Manager on Broker 1]: Error processing create topic request CreatableTopic(name='__transaction_state', numPartitions=50, replicationFactor=3, assignments=[], configs=[CreateableTopicConfig(name='compression.type', value='uncompressed'), CreateableTopicConfig(name='cleanup.policy', value='compact'), CreateableTopicConfig(name='min.insync.replicas', value='2'), CreateableTopicConfig(name='segment.bytes', value='104857600'), CreateableTopicConfig(name='unclean.leader.election.enable', value='false')]) (kafka.server.ZkAdminManager)
org.apache.kafka.common.errors.InvalidReplicationFactorException: Replication factor: 3 larger than available brokers: 1.

kafkaトランザクションで使用する、内部的なトピックである「__transaction_state」の生成に失敗していることがわかります。
(わかります。と書いているけど、内部的なトピックを作ってるんだーへーって実際は思いましたw)
また、一番下に「Replication factor: 3 larger than available brokers: 1.」と書いてあり、
ブローカのノード数は3以上用意する必要がある、みたいなことが書いてあることがわかります。

そして、ネットを検索すると、トランザクションを使いたいならブローカーを3以上用意するのが必須という記述がチラホラ・・・。
でもCIでも動かしたいのに、ブローカー3つも立てたくないし、ローカル環境でブローカー1個で動かせないとか、開発者ライクじゃなさ過ぎるから、そんなことないでしょ。
と思ってkafka本体のコードを読むことに。
(Documentのどこかに書いてあるのかなぁと思いながら調べてはみたけど、見つけ出せなかった)

エラーメッセージを足がかりに設定を突き止める

「larger than available brokers」でコードを検索すると、以下のコードが見つかります。
3つ目の条件に引っかかってることがわかるので、
replicationFactorとbrokerMetadatasに、どこから何が渡されているのかを探します。

kafka.admin.AdminUtils
def assignReplicasToBrokers(brokerMetadatas: Iterable[BrokerMetadata],
                            nPartitions: Int,
                            replicationFactor: Int,
                            fixedStartIndex: Int = -1,
                            startPartitionId: Int = -1): Map[Int, Seq[Int]] = {
  if (nPartitions <= 0)
    throw new InvalidPartitionsException("Number of partitions must be larger than 0.")
  if (replicationFactor <= 0)
    throw new InvalidReplicationFactorException("Replication factor must be larger than 0.")
  if (replicationFactor > brokerMetadatas.size)
    throw new InvalidReplicationFactorException(s"Replication factor: $replicationFactor larger than available brokers: ${brokerMetadatas.size}.")
  if (brokerMetadatas.forall(_.rack.isEmpty))
    assignReplicasToBrokersRackUnaware(nPartitions, replicationFactor, brokerMetadatas.map(_.id), fixedStartIndex,
      startPartitionId)
  else {
    if (brokerMetadatas.exists(_.rack.isEmpty))
      throw new AdminOperationException("Not all brokers have rack information for replica rack aware assignment.")
    assignReplicasToBrokersRackAware(nPartitions, replicationFactor, brokerMetadatas, fixedStartIndex,
      startPartitionId)
  }
}

IntelliJの[Find Usage]を使って、呼び出し階層を遡っていくと、(遡り過程は省略)
以下の設定値が上記エラーが起きた各パラメータに渡されていることがわかります。
なので、このパラメータを書き換えればOKです。

  • transaction.state.log.replication.factor
  • transaction.state.log.min.isr
kafka.server.KafkaConfig
val OffsetCommitRequiredAcksProp = "offsets.commit.required.acks"
/** ********* Transaction management configuration ***********/
val TransactionalIdExpirationMsProp = "transactional.id.expiration.ms"
val TransactionsMaxTimeoutMsProp = "transaction.max.timeout.ms"
val TransactionsTopicMinISRProp = "transaction.state.log.min.isr"
val TransactionsLoadBufferSizeProp = "transaction.state.log.load.buffer.size"
val TransactionsTopicPartitionsProp = "transaction.state.log.num.partitions"
val TransactionsTopicSegmentBytesProp = "transaction.state.log.segment.bytes"
val TransactionsTopicReplicationFactorProp = "transaction.state.log.replication.factor"
val TransactionsAbortTimedOutTransactionCleanupIntervalMsProp = "transaction.abort.timed.out.transaction.cleanup.interval.ms"
val TransactionsRemoveExpiredTransactionalIdCleanupIntervalMsProp = "transaction.remove.expired.transaction.cleanup.interval.ms"

docker-compose.ymlに設定を追加する

docker-compose上に設定を書く場合は、

KAFKA_

に続けて、スネークケース & 大文字で設定値のキーを書いて、その値を設定すれば良いので以下のようになります。(下2行を追加)

docker-compose.yml
    environment:
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: "false"
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: DOCKER_LISTENER:PLAINTEXT,LOCALHOST_LISTENER:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: DOCKER_LISTENER://kafka:9092,LOCALHOST_LISTENER://localhost:19092
      KAFKA_INTER_BROKER_LISTENER_NAME: DOCKER_LISTENER
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181/kafka
      KAFKA_BROKER_ID: 1
      # In local env, save consumer offset to only 1 replica
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      # In local env, transaction's inner topic settings in kafka set to 1 only.
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 # <- add
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 # <- add

テストを実行

今度はちゃんと動きました。
めでたい!
image.png

終わりに

kafka Producerのトランザクションを使用した開発を行いたい場合に、
単一ブローカーで簡単に動かせるようになりました。
ローカル環境でテストが簡単に行えるようになったので、自信を持って実行環境にデプロイできるようになりますね。

ドキュメントのこの辺を見ると、もっと簡単に辿り着けたよ、などあれば教えていただけると嬉しいです。

また、このページのデモコードは以下にあります。
https://github.com/sh-ogawa/kafka-demo

6
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
6
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?