この記事について
Money Forward Engineering 2 Advent Calendar 2022 - Qiita の 14日目 の記事です。
はじめに
Kafka Producerのトランザクションを使用したメッセージング処理を作るときに、
ローカル環境では単一ブローカーで最小で動かしたかったのですが、
はじめ全然動かったのを動くようにしたので、その経験を放出します。
これ、みんな通らないの?と思う程度には情報が見つからなかったので、だいぶ困りましたw
※kafkaのコード以外はkotlinです
環境
- kafka(docker image): confluentinc/cp-kafka:7.2.2
- kotlin: 1.7系
- OS: M1 Mac
コード
実装コード
今回は簡単にやりたいので、文字列を送るだけのシンプルなコードを用意しました。
ロールバックの時に意図的にエラーを発生させたいので、
mockをセットできるようにコンストラクタ引数でKafkaProducerを受け付けるようにしています。
class TransactionalKafkaProducer(
producer: KafkaProducer<Int, String>? = null
) {
private var producer: KafkaProducer<Int, String>
init {
this.producer = producer ?: ProducerFactory.createInstance()
}
@Suppress("TooGenericExceptionCaught")
fun messageSend(topic: String, messages: List<String>) {
producer.use {
try {
it.initTransactions()
it.beginTransaction()
messages.forEach { message ->
it.send(ProducerRecord(
topic,
Math.random().toInt(),
message,
))
}
it.commitTransaction()
} catch (e: Exception) {
it.abortTransaction()
println(e.stackTraceToString())
throw e
}
}
}
}
テストコード
実際にレコードは取れないという確認をするために、
トピックを消費するようにしてます。
ちゃんと動くとメッセージは取れないです。
class TransactionalKafkaProducerTests: AnnotationSpec() {
@BeforeClass
fun setup() {
val props = Properties()
props[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = "localhost:19092"
val adminClient = Admin.create(props)
adminClient.createTopics(listOf(
NewTopic(TEST_TOPIC, 50, 1)
))
}
@Test
fun confirmRollback() {
val producer = spyk(ProducerFactory.createInstance())
val messages = listOf("message1", "message2")
val testTarget = TransactionalKafkaProducer(producer)
every {
producer.commitTransaction()
} throws (TimeoutException("thrown by mock"))
shouldThrow<TimeoutException> {
testTarget.messageSend(TEST_TOPIC, messages)
}
val consumer = KafkaConsumerExecutor(listOf(TEST_TOPIC))
val records = consumer.consumerRecord()
records.count() shouldBe 0
}
}
設定
environment:
KAFKA_AUTO_CREATE_TOPICS_ENABLE: "false"
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: DOCKER_LISTENER:PLAINTEXT,LOCALHOST_LISTENER:PLAINTEXT
kafka cluster
KAFKA_ADVERTISED_LISTENERS: DOCKER_LISTENER://kafka:9092,LOCALHOST_LISTENER://localhost:19092
KAFKA_INTER_BROKER_LISTENER_NAME: DOCKER_LISTENER
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181/kafka
# When kafka cluster is running with single broker, broker id becomes 0 by default. So setting it explicitly.
KAFKA_BROKER_ID: 1
# In local env, save consumer offset to only 1 replica
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
テストの実行〜解決まで
簡単に組み込めてとても楽だったなーと思ってテストを動かすと、全く動いている気配がない・・・
何が起きているのか、ログを確認します。
アプリ側:
[Producer clientId=producer-tx-0, transactionalId=tx-0] Sending transactional request FindCoordinatorRequestData(key='', keyType=1, coordinatorKeys=[tx-0]) to node localhost:19092 (id: 1 rack: null) with correlation ID 205
[Producer clientId=producer-tx-0, transactionalId=tx-0] Sending FIND_COORDINATOR request with header RequestHeader(apiKey=FIND_COORDINATOR, apiVersion=4, clientId=producer-tx-0, correlationId=205) and timeout 30000 to node 1: FindCoordinatorRequestData(key='', keyType=1, coordinatorKeys=[tx-0])
[Producer clientId=producer-tx-0, transactionalId=tx-0] Received FIND_COORDINATOR response from node 1 for request with header RequestHeader(apiKey=FIND_COORDINATOR, apiVersion=4, clientId=producer-tx-0, correlationId=205): FindCoordinatorResponseData(throttleTimeMs=0, errorCode=0, errorMessage='', nodeId=0, host='', port=0, coordinators=[Coordinator(key='tx-0', nodeId=-1, host='', port=-1, errorCode=15, errorMessage='')])
errorCode: 15は、COORDINATOR_NOT_AVAILABLEで、コーディネータが利用不可らしいですけど、これだけしか書いてないと意味不明です。
なので、kafka側のログも見てみます。
kafka側:
INFO [Admin Manager on Broker 1]: Error processing create topic request CreatableTopic(name='__transaction_state', numPartitions=50, replicationFactor=3, assignments=[], configs=[CreateableTopicConfig(name='compression.type', value='uncompressed'), CreateableTopicConfig(name='cleanup.policy', value='compact'), CreateableTopicConfig(name='min.insync.replicas', value='2'), CreateableTopicConfig(name='segment.bytes', value='104857600'), CreateableTopicConfig(name='unclean.leader.election.enable', value='false')]) (kafka.server.ZkAdminManager)
org.apache.kafka.common.errors.InvalidReplicationFactorException: Replication factor: 3 larger than available brokers: 1.
kafkaトランザクションで使用する、内部的なトピックである「__transaction_state」の生成に失敗していることがわかります。
(わかります。と書いているけど、内部的なトピックを作ってるんだーへーって実際は思いましたw)
また、一番下に「Replication factor: 3 larger than available brokers: 1.」と書いてあり、
ブローカのノード数は3以上用意する必要がある、みたいなことが書いてあることがわかります。
そして、ネットを検索すると、トランザクションを使いたいならブローカーを3以上用意するのが必須という記述がチラホラ・・・。
でもCIでも動かしたいのに、ブローカー3つも立てたくないし、ローカル環境でブローカー1個で動かせないとか、開発者ライクじゃなさ過ぎるから、そんなことないでしょ。
と思ってkafka本体のコードを読むことに。
(Documentのどこかに書いてあるのかなぁと思いながら調べてはみたけど、見つけ出せなかった)
エラーメッセージを足がかりに設定を突き止める
「larger than available brokers」でコードを検索すると、以下のコードが見つかります。
3つ目の条件に引っかかってることがわかるので、
replicationFactorとbrokerMetadatasに、どこから何が渡されているのかを探します。
def assignReplicasToBrokers(brokerMetadatas: Iterable[BrokerMetadata],
nPartitions: Int,
replicationFactor: Int,
fixedStartIndex: Int = -1,
startPartitionId: Int = -1): Map[Int, Seq[Int]] = {
if (nPartitions <= 0)
throw new InvalidPartitionsException("Number of partitions must be larger than 0.")
if (replicationFactor <= 0)
throw new InvalidReplicationFactorException("Replication factor must be larger than 0.")
if (replicationFactor > brokerMetadatas.size)
throw new InvalidReplicationFactorException(s"Replication factor: $replicationFactor larger than available brokers: ${brokerMetadatas.size}.")
if (brokerMetadatas.forall(_.rack.isEmpty))
assignReplicasToBrokersRackUnaware(nPartitions, replicationFactor, brokerMetadatas.map(_.id), fixedStartIndex,
startPartitionId)
else {
if (brokerMetadatas.exists(_.rack.isEmpty))
throw new AdminOperationException("Not all brokers have rack information for replica rack aware assignment.")
assignReplicasToBrokersRackAware(nPartitions, replicationFactor, brokerMetadatas, fixedStartIndex,
startPartitionId)
}
}
IntelliJの[Find Usage]を使って、呼び出し階層を遡っていくと、(遡り過程は省略)
以下の設定値が上記エラーが起きた各パラメータに渡されていることがわかります。
なので、このパラメータを書き換えればOKです。
- transaction.state.log.replication.factor
- transaction.state.log.min.isr
val OffsetCommitRequiredAcksProp = "offsets.commit.required.acks"
/** ********* Transaction management configuration ***********/
val TransactionalIdExpirationMsProp = "transactional.id.expiration.ms"
val TransactionsMaxTimeoutMsProp = "transaction.max.timeout.ms"
val TransactionsTopicMinISRProp = "transaction.state.log.min.isr"
val TransactionsLoadBufferSizeProp = "transaction.state.log.load.buffer.size"
val TransactionsTopicPartitionsProp = "transaction.state.log.num.partitions"
val TransactionsTopicSegmentBytesProp = "transaction.state.log.segment.bytes"
val TransactionsTopicReplicationFactorProp = "transaction.state.log.replication.factor"
val TransactionsAbortTimedOutTransactionCleanupIntervalMsProp = "transaction.abort.timed.out.transaction.cleanup.interval.ms"
val TransactionsRemoveExpiredTransactionalIdCleanupIntervalMsProp = "transaction.remove.expired.transaction.cleanup.interval.ms"
docker-compose.ymlに設定を追加する
docker-compose上に設定を書く場合は、
KAFKA_
に続けて、スネークケース & 大文字で設定値のキーを書いて、その値を設定すれば良いので以下のようになります。(下2行を追加)
environment:
KAFKA_AUTO_CREATE_TOPICS_ENABLE: "false"
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: DOCKER_LISTENER:PLAINTEXT,LOCALHOST_LISTENER:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: DOCKER_LISTENER://kafka:9092,LOCALHOST_LISTENER://localhost:19092
KAFKA_INTER_BROKER_LISTENER_NAME: DOCKER_LISTENER
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181/kafka
KAFKA_BROKER_ID: 1
# In local env, save consumer offset to only 1 replica
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
# In local env, transaction's inner topic settings in kafka set to 1 only.
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 # <- add
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 # <- add
テストを実行
終わりに
kafka Producerのトランザクションを使用した開発を行いたい場合に、
単一ブローカーで簡単に動かせるようになりました。
ローカル環境でテストが簡単に行えるようになったので、自信を持って実行環境にデプロイできるようになりますね。
ドキュメントのこの辺を見ると、もっと簡単に辿り着けたよ、などあれば教えていただけると嬉しいです。
また、このページのデモコードは以下にあります。
https://github.com/sh-ogawa/kafka-demo