はじめに
Kafkaは要求・応答型のメッセージングには向かないと思っていましたが、Springで要求・応答用のTemplate(ReplyingKafkaTemplate)が提供されていたのでどんなものか動かしてみました。
前提
- 上記リンクにあるサンプル・コードを基本、そのまま使っています。
- Java 17
- Apache Kafka 3.4.0
POM
Kafka周りで以下を設定しています。(他は省略)
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>3.2.0</version>
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>3.1.0</version>
</dependency>
</dependencies>
Requester
要求メッセージを送信して、その応答メッセージを受信するプログラムは以下になります。
package com.example;
import java.time.Duration;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.TopicBuilder;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.requestreply.ReplyingKafkaTemplate;
import org.springframework.kafka.requestreply.RequestReplyFuture;
import org.springframework.kafka.support.SendResult;
@SpringBootApplication
public class App
{
public static void main( String[] args )
{
System.out.println( "Requester!" );
SpringApplication.run(App.class, args).close();
}
@Bean
public ApplicationRunner runner(ReplyingKafkaTemplate<String, String, String> template) {
return args -> {
if (!template.waitForAssignment(Duration.ofSeconds(10))) {
throw new IllegalStateException("Reply container did not initialize");
}
ProducerRecord<String, String> record = new ProducerRecord<>("kRequests", "test request");
RequestReplyFuture<String, String, String> replyFuture = template.sendAndReceive(record);
SendResult<String, String> sendResult = replyFuture.getSendFuture().get(10, TimeUnit.SECONDS);
System.out.println("Sent ok: " + sendResult.getRecordMetadata());
ConsumerRecord<String, String> consumerRecord = replyFuture.get(30, TimeUnit.SECONDS);
System.out.println("Return value: " + consumerRecord.value());
};
}
@Bean
public ReplyingKafkaTemplate<String, String, String> replyingTemplate(
ProducerFactory<String, String> pf,
ConcurrentMessageListenerContainer<String, String> repliesContainer) {
return new ReplyingKafkaTemplate<>(pf, repliesContainer);
}
@Bean
public ConcurrentMessageListenerContainer<String, String> repliesContainer(
ConcurrentKafkaListenerContainerFactory<String, String> containerFactory) {
ConcurrentMessageListenerContainer<String, String> repliesContainer =
containerFactory.createContainer("kReplies");
repliesContainer.getContainerProperties().setGroupId("repliesGroup");
repliesContainer.setAutoStartup(false);
return repliesContainer;
}
}
このコードでは、kRequestトピックに要求メッセージ(valueは"test request")を送信して、kRepliesトピックから応答メッセージを受信するようになっています。
Responder
要求メッセージを受信して、応答メッセージを返信するプログラムは以下になります。
package com.example;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.messaging.handler.annotation.SendTo;
@SpringBootApplication
public class App
{
public static void main( String[] args )
{
System.out.println( "Responder!" );
SpringApplication.run(App.class, args);
}
@KafkaListener(id="server", topics = "kRequests")
@SendTo // use default replyTo expression
public String listen(String in) {
System.out.println("Server received: " + in);
return in.toUpperCase();
}
}
このコードでは、kRequestsトピックをListenし、メッセージを受信したら、そのデータを大文字に変換して、応答メッセージとして返信しています。
動作確認
では、動かしてみます。
リンク先のサンプル・コードではプログラム内でトピックを作成していましたが、今回その部分は省いたので、事前にkRequestsトピックとkRepliesトピックは作成しておきます。
最初に、Responderを起動して、Requesterを起動します。
Requesterのコンソールには以下が出力されました。
Sent ok: kRequests-0@0
Return value: TEST REQUEST
要求メッセージの送信が成功し、応答メッセージが返ってきたことがわかります。
要求メッセージのvalue"test request"が、応答メッセージでは大文字に変換されています。
Responderのコンソールには以下が出力されています。
Server received: test request
別のプログラム(自作)を使って、要求メッセージと応答メッセージの中身を見てみます。
要求メッセージ(3件分)
Ts: 1701650039532, FmdTs: 2023-12-04 09:33:59.532, Topic: kRequests, Partition: 0, Offset: 0, Key: null, Value: test request, Headers: key=kafka_replyTopic value(str)=kReplies value(byte)=746f7069633032 key=kafka_correlationId value(str)=�s�^ǡI���$"�� value(byte)=8673b95ec7a149f18ac4fb24221994f2
Ts: 1701650844371, FmdTs: 2023-12-04 09:47:24.371, Topic: kRequests, Partition: 2, Offset: 0, Key: null, Value: test request, Headers: key=kafka_replyTopic value(str)=kReplies value(byte)=746f7069633032 key=kafka_correlationId value(str)=s�u�RAE��qvOO��U value(byte)=73a775f0524145f49071764f4fedd855
Ts: 1701650914858, FmdTs: 2023-12-04 09:48:34.858, Topic: kRequests, Partition: 1, Offset: 1, Key: null, Value: test request, Headers: key=kafka_replyTopic value(str)=kReplies value(byte)=746f7069633032 key=kafka_correlationId value(str)=�BʞH�Mӭo�gc2M� value(byte)=b842ca9e48de4dd3ad6fb86763324da7
要求メッセージには以下の2つのヘッダー(key=value)がセットされています。
- kafka_replyTopic=kReplies
- kafka_correlationId=8673b95ec7a149f18ac4fb24221994f2(バイナリデータのHex表記)
応答メッセージ(3件分)
Ts: 1701650039646, FmdTs: 2023-12-04 09:33:59.646, Topic: kReplies, Partition: 2, Offset: 0, Key: null, Value: TEST REQUEST, Headers: key=kafka_correlationId value(str)=�s�^ǡI���$"�� value(byte)=8673b95ec7a149f18ac4fb24221994f2
Ts: 1701650844407, FmdTs: 2023-12-04 09:47:24.407, Topic: kReplies, Partition: 2, Offset: 1, Key: null, Value: TEST REQUEST, Headers: key=kafka_correlationId value(str)=s�u�RAE��qvOO��U value(byte)=73a775f0524145f49071764f4fedd855
Ts: 1701650914886, FmdTs: 2023-12-04 09:48:34.886, Topic: kReplies, Partition: 2, Offset: 2, Key: null, Value: TEST REQUEST, Headers: key=kafka_correlationId value(str)=�BʞH�Mӭo�gc2M� value(byte)=b842ca9e48de4dd3ad6fb86763324da7
応答メッセージには以下のヘッダーがセットされています。
- kafka_correlationId=8673b95ec7a149f18ac4fb24221994f2(バイナリデータのHex表記)
仕組み
ReplyingKafkaTemplateは、kafka_correlationIdヘッダーを利用して要求メッセージと応答メッセージの紐付けを行なっています。
Requester側では、ReplyingKafkaTemplateのsendAndReceiveメソッドで要求メッセージを送信すると、内部的に要求メッセージのkafka_correlationIdヘッダーにユニークな値がセットされます。応答メッセージの受信処理ではkafka_correlationIdヘッダーに同じ値を持つ応答メッセージを受信します。
setCorrelationHeaderName(String replyTopicHeaderName)やsetBinaryCorrelation(boolean binaryCorrelation)を利用して、ヘッダー名やヘッダーに付与する値(デフォルトはバイナリ・データ)を変更することができます。
Responder側では、@SendTo アノテーション(プロパティなし)を使うことで受信した要求メッセージのkafka_correlationIdヘッダーをそのまま応答メッセージにも付加し、kafka_replyTopicヘッダーに指定されているトピックに応答メッセージを返信します。
Forwarding Listener Results using @SendTo
まとめ
SpringのReplyingKafkaTemplateを使うことでKafkaでも簡単に要求・応答型のメッセージングができることがわかりました。
Kafkaで要求・応答を実装する意味は?ってのはありますが、とりあえず今回は動かしてみた報告でした。他にもいろいろできることがありそうなので、時間があれば試してみたいと思います。