※Kafkaに関する情報は下記のURLから参照してください。
https://kafka.apache.org/intro
必要ライブラリ
- kafka-clients-3.2.0
- slf4j-api-1.7.36
- slf4j-simple-1.7.36
事前準備
- Kafkaの起動
下記のコマンドで立ち上げを行います。
<kafka-home>/bin/zookeeper-server-start.sh <kafka-home>/config/zookeeper.properties
- Kafka brokcerの起動
下記のコマンドで立ち上げを行います。
<kafka-home>/bin/kafka-server-start.sh <kafka-home>/config/server.properties
Coding
-
Kafkaのプロパティを定義します。
final Properties props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS); props.put(ConsumerConfig.GROUP_ID_CONFIG, "KafkaSimpleConsumer"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName()); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
-
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG
はKafkaのサーバ情報を記載します。 -
ConsumerConfig.GROUP_ID_CONFIG
はコンシュマーのグループIDを記載します。
-
-
コンシューマーを生成します。
final Consumer<String, String> consumer = new KafkaConsumer<>(props, new StringDeserializer(), new StringDeserializer());
-
受け付けるTopicを設定します。
List<String> topics = new ArrayList<>(); topics.add(TOPIC); consumer.subscribe(topics);
-
イベントが発行がトリガーになるので、無限ループでイベントを待ち受けます。その際に上限リトライを設定します。
final int giveUp = 100; int noRecordsCount = 0; while (true) {
-
Kafkaからイベントを取得します。今回の取得間隔は1000ミリ秒に設定します。
final ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMillis(1000));
-
取得されたイベントをコンソールに出力します。
consumerRecords.forEach(record -> { System.out.printf("Consumer Record:(%s, %s, %d, %s, %s)\n", record.key(), record.value(), record.partition(), record.offset(), record.topic()); });
-
Kafkaサーバと同期を取ります。
consumer.commitAsync();
-
これ以上、取得するイベントがない際には接続をクローズします。
consumer.close();
参考
下記のレポジトリにソースコードを確認いただけます。
https://github.com/dlstjq7685/KafkaSimpleConsumerTuttorial
また、前の記事のKafka Javaチュートリアル -プロデューサー-の続きになるので
合わせて見ていただければと思います。
ソースコードはkafkaのJavaDocの内容に沿って作成しております。
あくまで参考程度で書いたものですので、
その他の設定については別記事を参考にしてください。