1
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

【初心者向け】Kafka入門編

Last updated at Posted at 2021-04-03

皆さん、こんにちは!
最近、プロジェクトにおいて、Kafkaという技術を活用したため、今日は本記事でサンプルを作りながら解説してみましょう。

一、Kafkaとは

Kafkaは大規模なストリームデータを扱うことができるオープンソースの分散メッセージングシステムです。レコードのストリームをリアルタイムで公開、サブスクライブ、保存、処理できます。

二、Kafkaの仕組み

Kafkaには4つのコアAPIが存在します。

API 概要
Producer API 送信側のアプリケーションやDBから1つ又は、複数のTopicとしてのストリームデータの配信を許可します。
Consumer API 受信側に1つ又は、複数のTopicとしてのストリームデータの受信と処理を許可します。
Steams API ストリーム プロセッサとして機能します
Connector API Kafka トピックを既存のアプリケーションにリンクする再利用可能なプロデューサーまたはコンシューマー接続を構築することもできます。

KafkaはPublish-Subscribeメッセージモデルを利用して動作します。基本的に下記の通りです。

要素  説明
Producer メッセージの送信元
Consumer  メッセージの配信先
Broker メッセージの収集・配信役

全体的なモデルは下記のイメージを見て理解できると思います。
image.png

三、環境構築

1.インストールJava

ここは略です。

2.インストールKafka

このリンクから、Kafkaをダウンロードしてローカルにおいてください。
例:
image.png

3.インストールZookeeper

KafkaはZookeeperを依頼するので、必ず予めzookeeperをインストールし、起動します。
ただ、上記の「 2.インストールKafka」でインストールされたKafkaはzookeeperを付いたので、本記事は別途でzookeeperをインストールしない。

四、KafkaとZookeeperの起動

Kafkaを起動するように、必ずzookeeperを予め起動してください。
先ずはCMDを開いて\bin\windowsのしたに入ってください。

  • zookeeperは下記のコマンドで起動してください。(zookeeper.propertiesのパスに合わせてください。)

zookeeper-server-start.bat C:\software\kafka_2.12-2.6.0\config\zookeeper.properties

  • Kafkaは下記のコマンドで起動してください。(server.propertiesのパスに合わせてください。)

kafka-server-start.bat C:\software\kafka_2.12-2.6.0\config\server.properties

KafkaとZookeeper両方とも、無事に起動したら、下記のコマンドでKafkaのTopicを作成します。
本記事はTopicがtest-topic01となります。

kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test-topic01

五、サンプルを実装

1.先ず、Mavenプロジェクトを作って下記の依頼をpom.xmlに追加します。

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>0.11.0.0</version>
</dependency>

2.Producter クラスを作る

public class TestProducer {
    public static void main(String[] args) throws  Exception{
        Properties properties = new Properties();
        //Kafkaのホストとポートを設定する
        properties.setProperty("bootstrap.servers","127.0.0.1:9092");
        //key Serialize
        properties.setProperty("key.serializer", StringSerializer.class.getName());
        //value Serialize
        properties.setProperty("value.serializer",StringSerializer.class.getName());
        KafkaProducer<String,String> kafkaProducer = new KafkaProducer<String, String>(properties);

        // Messageを作る。
        // messageのkey:testKey,messageのvalue:hello, this is Kafka producer!
        ProducerRecord<String, String> stringStringProducerRecord = new ProducerRecord<String, String>("test-topic01",0,"testKey","hello, this is Kafka producer!");

        // ログ出力
        System.out.println("kafka producer start....");
        
        Future<RecordMetadata> send = kafkaProducer.send(stringStringProducerRecord);

        // ログ出力
        System.out.println("producer send data:" + stringStringProducerRecord.value());
        RecordMetadata recordMetadata = send.get();
        System.out.println(recordMetadata);
    }

}

3.Consumerクラスを作る

public class TestConsumer {
    public static void main(String[] args) {
        Properties properties = new Properties();
        // Kafkaのホストとポートを設定する
        properties.setProperty("bootstrap.servers","127.0.0.1:9092");
        properties.setProperty("key.deserializer", StringDeserializer.class.getName());
        properties.setProperty("value.deserializer",StringDeserializer.class.getName());
        properties.setProperty("group.id","1111");
        KafkaConsumer<String,String> consumer = new KafkaConsumer<String, String>(properties);

        // topicを設定する
        consumer.subscribe(Collections.singletonList("test-topic01"));

       // ログ出力
        System.out.println("kafka consumer start...");
        while (true){
            ConsumerRecords<String, String> poll = consumer.poll(500);
            for (ConsumerRecord<String, String> stringStringConsumerRecord : poll) {
                // Producerから取得されたデータを出力する
                System.out.println("Consumer get data from kafka producer:" + stringStringConsumerRecord.value());
            }
        }
    }

}

ここまで、Kafkaサンプルを作り終わりました。

六、動作確認

先ず、TestConsumerクラスをjava applicationの形で起動しておいてください。
下記のログ出力したら、OKです。

kafka consumer start...

後で、TestProducerクラスをjava applicationの形で起動してください。
正常に起動したら、TestProducerはメッセージを作ってKafkaまで渡して、
TestConsumerはKafkaからTestProducerが作ったメッセージを取得して出力します。
流れは基本的にこんな感じです。
じゃあ、TestConsumerの出力ログを確認しましょう。
image.png

上記のイメージを見ると、サンプルは正常に動けることがわかりました。

七、最後に

最後まで読んでいただき、ありがとうございます。

1
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?