0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

Kafka Javaチュートリアル -コンシューマー-

Last updated at Posted at 2022-06-05

※Kafkaに関する情報は下記のURLから参照してください。
https://kafka.apache.org/intro

必要ライブラリ

  • kafka-clients-3.2.0
  • slf4j-api-1.7.36
  • slf4j-simple-1.7.36

事前準備

  • Kafkaの起動
    下記のコマンドで立ち上げを行います。
    <kafka-home>/bin/zookeeper-server-start.sh <kafka-home>/config/zookeeper.properties
  • Kafka brokcerの起動
    下記のコマンドで立ち上げを行います。
    <kafka-home>/bin/kafka-server-start.sh <kafka-home>/config/server.properties

Coding

  1. Kafkaのプロパティを定義します。

     final Properties props = new Properties();
     props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
     		BOOTSTRAP_SERVERS);
     props.put(ConsumerConfig.GROUP_ID_CONFIG,
     		"KafkaSimpleConsumer");
     props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
     		LongDeserializer.class.getName());
     props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
     		StringDeserializer.class.getName());
    
    1. ConsumerConfig.BOOTSTRAP_SERVERS_CONFIGはKafkaのサーバ情報を記載します。
    2. ConsumerConfig.GROUP_ID_CONFIGはコンシュマーのグループIDを記載します。
  2. コンシューマーを生成します。

     final Consumer<String, String> consumer =
     		new KafkaConsumer<>(props, new StringDeserializer(), new StringDeserializer());
    
  3. 受け付けるTopicを設定します。

     List<String> topics = new ArrayList<>();
    
     topics.add(TOPIC);
     consumer.subscribe(topics);
    
  4. イベントが発行がトリガーになるので、無限ループでイベントを待ち受けます。その際に上限リトライを設定します。

     final int giveUp = 100;   
     int noRecordsCount = 0;
    
     while (true) {
    
  5. Kafkaからイベントを取得します。今回の取得間隔は1000ミリ秒に設定します。

     final ConsumerRecords<String, String> consumerRecords =
     		consumer.poll(Duration.ofMillis(1000));
    
  6. 取得されたイベントをコンソールに出力します。

     consumerRecords.forEach(record -> {
     	System.out.printf("Consumer Record:(%s, %s, %d, %s, %s)\n",
     			record.key(), record.value(),
     			record.partition(), record.offset(), record.topic());
     });
    
  7. Kafkaサーバと同期を取ります。

     consumer.commitAsync();
    
  8. これ以上、取得するイベントがない際には接続をクローズします。

     consumer.close();
    

参考

下記のレポジトリにソースコードを確認いただけます。
https://github.com/dlstjq7685/KafkaSimpleConsumerTuttorial

また、前の記事のKafka Javaチュートリアル -プロデューサー-の続きになるので
合わせて見ていただければと思います。

ソースコードはkafkaのJavaDocの内容に沿って作成しております。
あくまで参考程度で書いたものですので、
その他の設定については別記事を参考にしてください。

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?