0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

Kafkaで要求・応答?

Posted at

はじめに

Kafkaは要求・応答型のメッセージングには向かないと思っていましたが、Springで要求・応答用のTemplate(ReplyingKafkaTemplate)が提供されていたのでどんなものか動かしてみました。

ReplyingKafkaTemplate を使用する

前提

  • 上記リンクにあるサンプル・コードを基本、そのまま使っています。
  • Java 17
  • Apache Kafka 3.4.0

POM

Kafka周りで以下を設定しています。(他は省略)

  <parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>3.2.0</version>
  </parent>


  <dependencies>

    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter</artifactId>
    </dependency>
    <dependency>
	  <groupId>org.springframework.kafka</groupId>
	  <artifactId>spring-kafka</artifactId>
      <version>3.1.0</version>
	</dependency>

  </dependencies>

Requester

要求メッセージを送信して、その応答メッセージを受信するプログラムは以下になります。

package com.example;

import java.time.Duration;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.TopicBuilder;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.requestreply.ReplyingKafkaTemplate;
import org.springframework.kafka.requestreply.RequestReplyFuture;
import org.springframework.kafka.support.SendResult;

@SpringBootApplication
public class App 
{
    public static void main( String[] args )
    {
        System.out.println( "Requester!" );

        SpringApplication.run(App.class, args).close();
    }

    @Bean
    public ApplicationRunner runner(ReplyingKafkaTemplate<String, String, String> template) {
        return args -> {
            if (!template.waitForAssignment(Duration.ofSeconds(10))) {
                throw new IllegalStateException("Reply container did not initialize");
            }
            ProducerRecord<String, String> record = new ProducerRecord<>("kRequests", "test request");
            RequestReplyFuture<String, String, String> replyFuture = template.sendAndReceive(record);
            SendResult<String, String> sendResult = replyFuture.getSendFuture().get(10, TimeUnit.SECONDS);
            System.out.println("Sent ok: " + sendResult.getRecordMetadata());
            ConsumerRecord<String, String> consumerRecord = replyFuture.get(30, TimeUnit.SECONDS);
            System.out.println("Return value: " + consumerRecord.value());
        };
    }

    @Bean
    public ReplyingKafkaTemplate<String, String, String> replyingTemplate(
            ProducerFactory<String, String> pf,
            ConcurrentMessageListenerContainer<String, String> repliesContainer) {

        return new ReplyingKafkaTemplate<>(pf, repliesContainer);
    }

    @Bean
    public ConcurrentMessageListenerContainer<String, String> repliesContainer(
            ConcurrentKafkaListenerContainerFactory<String, String> containerFactory) {

        ConcurrentMessageListenerContainer<String, String> repliesContainer =
                containerFactory.createContainer("kReplies");
        repliesContainer.getContainerProperties().setGroupId("repliesGroup");
        repliesContainer.setAutoStartup(false);
        return repliesContainer;
    }
}

このコードでは、kRequestトピックに要求メッセージ(valueは"test request")を送信して、kRepliesトピックから応答メッセージを受信するようになっています。

Responder

要求メッセージを受信して、応答メッセージを返信するプログラムは以下になります。

package com.example;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.messaging.handler.annotation.SendTo;

@SpringBootApplication
public class App 
{
    public static void main( String[] args )
    {
        System.out.println( "Responder!" );

        SpringApplication.run(App.class, args);
    }

    @KafkaListener(id="server", topics = "kRequests")
    @SendTo // use default replyTo expression
    public String listen(String in) {
        System.out.println("Server received: " + in);
        return in.toUpperCase();
    }
}

このコードでは、kRequestsトピックをListenし、メッセージを受信したら、そのデータを大文字に変換して、応答メッセージとして返信しています。

動作確認

では、動かしてみます。
リンク先のサンプル・コードではプログラム内でトピックを作成していましたが、今回その部分は省いたので、事前にkRequestsトピックとkRepliesトピックは作成しておきます。

最初に、Responderを起動して、Requesterを起動します。

Requesterのコンソールには以下が出力されました。

Sent ok: kRequests-0@0
Return value: TEST REQUEST

要求メッセージの送信が成功し、応答メッセージが返ってきたことがわかります。
要求メッセージのvalue"test request"が、応答メッセージでは大文字に変換されています。

Responderのコンソールには以下が出力されています。

Server received: test request

別のプログラム(自作)を使って、要求メッセージと応答メッセージの中身を見てみます。

要求メッセージ(3件分)

Ts: 1701650039532, FmdTs: 2023-12-04 09:33:59.532, Topic: kRequests, Partition: 0, Offset: 0, Key: null, Value: test request, Headers: key=kafka_replyTopic value(str)=kReplies value(byte)=746f7069633032 key=kafka_correlationId value(str)=�s�^ǡI���$"�� value(byte)=8673b95ec7a149f18ac4fb24221994f2
Ts: 1701650844371, FmdTs: 2023-12-04 09:47:24.371, Topic: kRequests, Partition: 2, Offset: 0, Key: null, Value: test request, Headers: key=kafka_replyTopic value(str)=kReplies value(byte)=746f7069633032 key=kafka_correlationId value(str)=s�u�RAE��qvOO��U value(byte)=73a775f0524145f49071764f4fedd855
Ts: 1701650914858, FmdTs: 2023-12-04 09:48:34.858, Topic: kRequests, Partition: 1, Offset: 1, Key: null, Value: test request, Headers: key=kafka_replyTopic value(str)=kReplies value(byte)=746f7069633032 key=kafka_correlationId value(str)=�BʞH�Mӭo�gc2M� value(byte)=b842ca9e48de4dd3ad6fb86763324da7

要求メッセージには以下の2つのヘッダー(key=value)がセットされています。

  • kafka_replyTopic=kReplies
  • kafka_correlationId=8673b95ec7a149f18ac4fb24221994f2(バイナリデータのHex表記)

応答メッセージ(3件分)

Ts: 1701650039646, FmdTs: 2023-12-04 09:33:59.646, Topic: kReplies, Partition: 2, Offset: 0, Key: null, Value: TEST REQUEST, Headers: key=kafka_correlationId value(str)=�s�^ǡI���$"�� value(byte)=8673b95ec7a149f18ac4fb24221994f2
Ts: 1701650844407, FmdTs: 2023-12-04 09:47:24.407, Topic: kReplies, Partition: 2, Offset: 1, Key: null, Value: TEST REQUEST, Headers: key=kafka_correlationId value(str)=s�u�RAE��qvOO��U value(byte)=73a775f0524145f49071764f4fedd855
Ts: 1701650914886, FmdTs: 2023-12-04 09:48:34.886, Topic: kReplies, Partition: 2, Offset: 2, Key: null, Value: TEST REQUEST, Headers: key=kafka_correlationId value(str)=�BʞH�Mӭo�gc2M� value(byte)=b842ca9e48de4dd3ad6fb86763324da7

応答メッセージには以下のヘッダーがセットされています。

  • kafka_correlationId=8673b95ec7a149f18ac4fb24221994f2(バイナリデータのHex表記)

仕組み

ReplyingKafkaTemplateは、kafka_correlationIdヘッダーを利用して要求メッセージと応答メッセージの紐付けを行なっています。

Requester側では、ReplyingKafkaTemplateのsendAndReceiveメソッドで要求メッセージを送信すると、内部的に要求メッセージのkafka_correlationIdヘッダーにユニークな値がセットされます。応答メッセージの受信処理ではkafka_correlationIdヘッダーに同じ値を持つ応答メッセージを受信します。
setCorrelationHeaderName(String replyTopicHeaderName)やsetBinaryCorrelation(boolean binaryCorrelation)を利用して、ヘッダー名やヘッダーに付与する値(デフォルトはバイナリ・データ)を変更することができます。

image.png

Responder側では、@SendTo アノテーション(プロパティなし)を使うことで受信した要求メッセージのkafka_correlationIdヘッダーをそのまま応答メッセージにも付加し、kafka_replyTopicヘッダーに指定されているトピックに応答メッセージを返信します。
Forwarding Listener Results using @SendTo

まとめ

SpringのReplyingKafkaTemplateを使うことでKafkaでも簡単に要求・応答型のメッセージングができることがわかりました。
Kafkaで要求・応答を実装する意味は?ってのはありますが、とりあえず今回は動かしてみた報告でした。他にもいろいろできることがありそうなので、時間があれば試してみたいと思います。

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?