2
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

MicronautでApache Kafkaにアクセスする

Posted at

Micronaut Kafka

MicronautにApache Kafkaのサポートがあるというので、試してみることにしました。

Kafka Support

Micronautを使って、Apache KafkaのProducerおよびConsumerを作ることができるようです。

Micronaut Kafka

環境

今回使ったMicronautのバージョン。

$ mn -V
| Micronaut Version: 1.0.4
| JVM Version: 1.8.0_191

Micronaut 1.0.4では、Apache Kafka 2.0.1に対応しているようです。

Upgrade to Kafka 2.0.1

Micronaut Kafka

これに合わせて、今回はApache Kafka 2.0.1をダウンロード。

$ wget http://ftp.tsukuba.wide.ad.jp/software/apache/kafka/2.0.1/kafka_2.12-2.0.1.tgz
$ tar xf kafka_2.12-2.0.1.tgz
$ cd kafka_2.12-2.0.1

Apache KafkaのQuick Startに習い、Apache ZooKeeperとApache Kafka Brokerを起動します。

Quick Start / Start the server

## Apache Zookeeper
$ bin/zookeeper-server-start.sh config/zookeeper.properties

## Apache Kafka(Broker)
$ bin/kafka-server-start.sh config/server.properties

Topicは、my-topicという名前で作成しました。

$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic my-topic
Created topic "my-topic".

アプリケーションを作る

それでは、アプリケーションを作ってみます。ProducerとConsumer(ここではListenerとしていますが)用のプロジェクトを作成。

## Producer
$ mn create-app --features kafka --build maven hello-kafka-producer

## Listener(Consumer)
$ mn create-app --features kafka --build maven hello-kafka-listener

以降、この2つのプロジェクトに対してソースコードを追加、編集していきます。

Producerを作る

では、Apache Kafka Brokerにメッセージを送信するProducerを書いてみます。

$ cd hello-kafka-producer

mainクラスは、自動生成されたまま使います。

src/main/java/hello/kafka/producer/Application.java

package hello.kafka.producer;

import io.micronaut.runtime.Micronaut;

public class Application {

    public static void main(String[] args) {
        Micronaut.run(Application.class);
    }
}

Producerは、@KafkaClientアノテーションを付けたインターフェースを作成するようです。

Defining @KafkaClient Methods

こんな感じで作成。インターフェースに対する実装クラスの作成は不要です。

src/main/java/hello/kafka/producer/MessageClient.java

package hello.kafka.producer;

import io.micronaut.configuration.kafka.annotation.KafkaClient;
import io.micronaut.configuration.kafka.annotation.KafkaKey;
import io.micronaut.configuration.kafka.annotation.Topic;
import io.reactivex.Single;

@KafkaClient(acks = KafkaClient.Acknowledge.ALL)
public interface MessageClient {
    @Topic("my-topic")
    Single<String> send(@KafkaKey String key, Single<String> message);
}

設定は、@KafkaClientアノテーションおよび設定ファイルで行います。

@KafkaClientアノテーションでは、ACKの設定だけ行いました。

@KafkaClient(acks = KafkaClient.Acknowledge.ALL)
public interface MessageClient {

設定ファイル、こちら。

src/main/resources/application.yml

---
micronaut:
    application:
        name: hello-kafka-producer

---
kafka:
    bootstrap:
        servers: localhost:9092
    producers:
        default:
            retries: 5

kafka.bootstrap.serversでは、Apache Kafka Brokerに対する接続設定を行います。

Producerの振る舞いについては、kafka.producers.[client-id]で行います。

idを指定することで、@KafkaClient単位に設定を行うことができます。

Per @KafkaClient Producer Properties

今回は特にidを指定していないので、defaultという名前になっています。

メッセージの送信は、@Topicアノテーションにトピック名を指定したメソッドを使って行います。

    @Topic("my-topic")
    Single<String> send(@KafkaKey String key, Single<String> message);

キーを指定する場合は、@KafkaKeyを使用します。キーを使わない場合は、指定不要です(キーの引数自体を省略できます)。

また、RxJavaなど、Reactive Streamsまわりの型も使えるので、今回はこちらを利用しました。

Reactive and Non-Blocking Method Definitions

最後に、@KafkaClientを使う@Controller

src/main/java/hello/kafka/producer/MessageController.java

package hello.kafka.producer;

import javax.inject.Inject;

import io.micronaut.http.MediaType;
import io.micronaut.http.annotation.Body;
import io.micronaut.http.annotation.Controller;
import io.micronaut.http.annotation.Post;
import io.reactivex.Single;

@Controller("/message")
public class MessageController {
    @Inject
    MessageClient messageClient;

    @Post(value = "/{key}", consumes = MediaType.TEXT_PLAIN, produces = MediaType.TEXT_PLAIN)
    public Single<String> message(String key, @Body Single<String> value) {
        return messageClient.send(key, value).map(v -> String.format("message [%s] sended", v));
    }
}

ビルド。

$ ./mvnw package

これで、Producer側は完了。

Listener(Consumer)

続いて、Consumer側。

$ cd hello-kafka-listener

Kafka Consumers Using @KafkaListener

Consumerは、@KafkaListenerアノテーションを使って作成します。こちらは、class定義になります。

src/main/java/hello/kafka/listener/MessageListener.java

package hello.kafka.listener;

import java.util.List;

import io.micronaut.configuration.kafka.Acknowledgement;
import io.micronaut.configuration.kafka.annotation.KafkaKey;
import io.micronaut.configuration.kafka.annotation.KafkaListener;
import io.micronaut.configuration.kafka.annotation.OffsetReset;
import io.micronaut.configuration.kafka.annotation.OffsetStrategy;
import io.micronaut.configuration.kafka.annotation.Topic;
import io.reactivex.Single;

@KafkaListener(groupId = "message-group", offsetReset = OffsetReset.EARLIEST, offsetStrategy = OffsetStrategy.DISABLED)
public class MessageListener {
    @Topic("my-topic")
    public void receive(@KafkaKey String key, Single<String> message, Acknowledgement acknowledgement) {
        message.subscribe(m -> {
            System.out.printf("Received key / message = %s / %s%n", key, m);
            acknowledgement.ack();
        });
    }
}

@KafkaListenerアノテーションでConsumerの設定を行い、メッセージを受け取るメソッドには@Topicアノテーションを指定します。

@KafkaListener(groupId = "message-group", offsetReset = OffsetReset.EARLIEST, offsetStrategy = OffsetStrategy.DISABLED)
public class MessageListener {
    @Topic("my-topic")
    public void receive(@KafkaKey String key, Single<String> message, Acknowledgement acknowledgement) {

使う型は、Reactiveなものを。

Receiving and returning Reactive Types

また、ACKを使うようにもしています。

        message.subscribe(m -> {
            System.out.printf("Received key / message = %s / %s%n", key, m);
            acknowledgement.ack();
        });

今回は設定はBrokerへの接続先のみとしました。あと、同一ホストでProducerも立ち上げることにするので、micronaut.server.port8080以外で設定。

src/main/resources/application.yml

---
micronaut:
    application:
        name: hello-kafka-listener
    server:
        port: 9080

---
kafka:
    bootstrap:
        servers: localhost:9092

こちらもビルド。

$ ./mvnw package

これで、Consumer側も準備完了です。

動かしてみる

では、動作確認してみましょう。

## Producerを起動
$ java -jar target/hello-kafka-producer-0.1.jar


## Consumerを起動
$ java -jar target/hello-kafka-listener-0.1.jar

Producerに適当にメッセージを放り込んでみます。

$ curl -H 'Content-Type: text/plain' localhost:8080/message/key1 -d 'value1'
message [value1] sended

$ curl -H 'Content-Type: text/plain' localhost:8080/message/key2 -d 'value2'
message [value2] sended

$ curl -H 'Content-Type: text/plain' localhost:8080/message/key3 -d 'value3'
message [value3] sended

Consumer側には、こんな感じで受信したメッセージが表示されます。

Received key / message = key1 / value1
Received key / message = key2 / value2
Received key / message = key3 / value3

動かせたようですね。

2
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?