Micronaut Kafka
MicronautにApache Kafkaのサポートがあるというので、試してみることにしました。
Micronautを使って、Apache KafkaのProducerおよびConsumerを作ることができるようです。
環境
今回使ったMicronautのバージョン。
$ mn -V
| Micronaut Version: 1.0.4
| JVM Version: 1.8.0_191
Micronaut 1.0.4では、Apache Kafka 2.0.1に対応しているようです。
Upgrade to Kafka 2.0.1
これに合わせて、今回はApache Kafka 2.0.1をダウンロード。
$ wget http://ftp.tsukuba.wide.ad.jp/software/apache/kafka/2.0.1/kafka_2.12-2.0.1.tgz
$ tar xf kafka_2.12-2.0.1.tgz
$ cd kafka_2.12-2.0.1
Apache KafkaのQuick Startに習い、Apache ZooKeeperとApache Kafka Brokerを起動します。
Quick Start / Start the server
## Apache Zookeeper
$ bin/zookeeper-server-start.sh config/zookeeper.properties
## Apache Kafka(Broker)
$ bin/kafka-server-start.sh config/server.properties
Topicは、my-topic
という名前で作成しました。
$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic my-topic
Created topic "my-topic".
アプリケーションを作る
それでは、アプリケーションを作ってみます。ProducerとConsumer(ここではListenerとしていますが)用のプロジェクトを作成。
## Producer
$ mn create-app --features kafka --build maven hello-kafka-producer
## Listener(Consumer)
$ mn create-app --features kafka --build maven hello-kafka-listener
以降、この2つのプロジェクトに対してソースコードを追加、編集していきます。
Producerを作る
では、Apache Kafka Brokerにメッセージを送信するProducerを書いてみます。
$ cd hello-kafka-producer
main
クラスは、自動生成されたまま使います。
src/main/java/hello/kafka/producer/Application.java
package hello.kafka.producer;
import io.micronaut.runtime.Micronaut;
public class Application {
public static void main(String[] args) {
Micronaut.run(Application.class);
}
}
Producerは、@KafkaClient
アノテーションを付けたインターフェースを作成するようです。
こんな感じで作成。インターフェースに対する実装クラスの作成は不要です。
src/main/java/hello/kafka/producer/MessageClient.java
package hello.kafka.producer;
import io.micronaut.configuration.kafka.annotation.KafkaClient;
import io.micronaut.configuration.kafka.annotation.KafkaKey;
import io.micronaut.configuration.kafka.annotation.Topic;
import io.reactivex.Single;
@KafkaClient(acks = KafkaClient.Acknowledge.ALL)
public interface MessageClient {
@Topic("my-topic")
Single<String> send(@KafkaKey String key, Single<String> message);
}
設定は、@KafkaClient
アノテーションおよび設定ファイルで行います。
@KafkaClient
アノテーションでは、ACKの設定だけ行いました。
@KafkaClient(acks = KafkaClient.Acknowledge.ALL)
public interface MessageClient {
設定ファイル、こちら。
src/main/resources/application.yml
---
micronaut:
application:
name: hello-kafka-producer
---
kafka:
bootstrap:
servers: localhost:9092
producers:
default:
retries: 5
kafka.bootstrap.servers
では、Apache Kafka Brokerに対する接続設定を行います。
Producerの振る舞いについては、kafka.producers.[client-id]
で行います。
id
を指定することで、@KafkaClient
単位に設定を行うことができます。
Per @KafkaClient Producer Properties
今回は特にid
を指定していないので、default
という名前になっています。
メッセージの送信は、@Topic
アノテーションにトピック名を指定したメソッドを使って行います。
@Topic("my-topic")
Single<String> send(@KafkaKey String key, Single<String> message);
キーを指定する場合は、@KafkaKey
を使用します。キーを使わない場合は、指定不要です(キーの引数自体を省略できます)。
また、RxJavaなど、Reactive Streamsまわりの型も使えるので、今回はこちらを利用しました。
Reactive and Non-Blocking Method Definitions
最後に、@KafkaClient
を使う@Controller
。
src/main/java/hello/kafka/producer/MessageController.java
package hello.kafka.producer;
import javax.inject.Inject;
import io.micronaut.http.MediaType;
import io.micronaut.http.annotation.Body;
import io.micronaut.http.annotation.Controller;
import io.micronaut.http.annotation.Post;
import io.reactivex.Single;
@Controller("/message")
public class MessageController {
@Inject
MessageClient messageClient;
@Post(value = "/{key}", consumes = MediaType.TEXT_PLAIN, produces = MediaType.TEXT_PLAIN)
public Single<String> message(String key, @Body Single<String> value) {
return messageClient.send(key, value).map(v -> String.format("message [%s] sended", v));
}
}
ビルド。
$ ./mvnw package
これで、Producer側は完了。
Listener(Consumer)
続いて、Consumer側。
$ cd hello-kafka-listener
Kafka Consumers Using @KafkaListener
Consumerは、@KafkaListener
アノテーションを使って作成します。こちらは、class
定義になります。
src/main/java/hello/kafka/listener/MessageListener.java
package hello.kafka.listener;
import java.util.List;
import io.micronaut.configuration.kafka.Acknowledgement;
import io.micronaut.configuration.kafka.annotation.KafkaKey;
import io.micronaut.configuration.kafka.annotation.KafkaListener;
import io.micronaut.configuration.kafka.annotation.OffsetReset;
import io.micronaut.configuration.kafka.annotation.OffsetStrategy;
import io.micronaut.configuration.kafka.annotation.Topic;
import io.reactivex.Single;
@KafkaListener(groupId = "message-group", offsetReset = OffsetReset.EARLIEST, offsetStrategy = OffsetStrategy.DISABLED)
public class MessageListener {
@Topic("my-topic")
public void receive(@KafkaKey String key, Single<String> message, Acknowledgement acknowledgement) {
message.subscribe(m -> {
System.out.printf("Received key / message = %s / %s%n", key, m);
acknowledgement.ack();
});
}
}
@KafkaListener
アノテーションでConsumerの設定を行い、メッセージを受け取るメソッドには@Topic
アノテーションを指定します。
@KafkaListener(groupId = "message-group", offsetReset = OffsetReset.EARLIEST, offsetStrategy = OffsetStrategy.DISABLED)
public class MessageListener {
@Topic("my-topic")
public void receive(@KafkaKey String key, Single<String> message, Acknowledgement acknowledgement) {
使う型は、Reactiveなものを。
Receiving and returning Reactive Types
また、ACKを使うようにもしています。
message.subscribe(m -> {
System.out.printf("Received key / message = %s / %s%n", key, m);
acknowledgement.ack();
});
今回は設定はBrokerへの接続先のみとしました。あと、同一ホストでProducerも立ち上げることにするので、micronaut.server.port
は8080
以外で設定。
src/main/resources/application.yml
---
micronaut:
application:
name: hello-kafka-listener
server:
port: 9080
---
kafka:
bootstrap:
servers: localhost:9092
こちらもビルド。
$ ./mvnw package
これで、Consumer側も準備完了です。
動かしてみる
では、動作確認してみましょう。
## Producerを起動
$ java -jar target/hello-kafka-producer-0.1.jar
## Consumerを起動
$ java -jar target/hello-kafka-listener-0.1.jar
Producerに適当にメッセージを放り込んでみます。
$ curl -H 'Content-Type: text/plain' localhost:8080/message/key1 -d 'value1'
message [value1] sended
$ curl -H 'Content-Type: text/plain' localhost:8080/message/key2 -d 'value2'
message [value2] sended
$ curl -H 'Content-Type: text/plain' localhost:8080/message/key3 -d 'value3'
message [value3] sended
Consumer側には、こんな感じで受信したメッセージが表示されます。
Received key / message = key1 / value1
Received key / message = key2 / value2
Received key / message = key3 / value3
動かせたようですね。