はじめに
Apache CamelのKafkaコンポーネントを使用して、簡単にkafkaとのメッセージの送受信が実現できます。
これを使用して、次のプログラムを作成してみます。
・1秒間に1回、Topic(test_queue)に対してメッセージを送信する。
・Topic(test_queue)のメッセージを受信し、ログに出力する。
※ 本記事ではrouteの定義にXML DSLを使用していますが、次の記事でJava DSL
版を書いています。それ以外の内容も本記事より情報を多くしています。ご参考まで。
pom.xml修正
まずはじめに、pom.xmlにkafka用のライブラリを追加します。
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-kafka</artifactId>
<version>2.21.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.0.0</version>
</dependency>
Producer作成
1秒間に1回、Topic(test_queue)に対してメッセージを送信します。
送信するメッセージは「${date:now}: test message」というように送信時間を含めた文字列にしました。
XML DSLを書いたrouteは以下になります。
<route id="producer_test">
<from uri="quartz2://test/testTimer?cron=0/1+*+*+*+*+?" />
<setBody>
<simple>${date:now}: test message</simple>
</setBody>
<to
uri="kafka:test_queue?brokers=192.168.10.122:9092" />
<log message="producer_test end" />
</route>
「<to uri="kafka:test_queue?brokers=192.168.10.122:9092" />」がProducerの設定で、
「kafka:Topic名?brokers=brokerのホスト:ポート」のように指定します。
Topicは自動で作成されますが、Topicのレプリケーション数等の設定を変えたい場合は事前に作成しておきます。
Consumer作成
Topic(test_queue)のメッセージを受信し、ログに出力します。
XML DSLを書いたrouteは以下になります。
<route id="consumer_test">
<from
uri="kafka:test_queue?brokers=192.168.10.122:9092&groupId=TESTGROUP" />
<log message="body = ${body}" />
</route>
「<from uri="kafka:test_queue?brokers=192.168.10.122:9092&groupId=TESTGROUP" />」がConsumerの設定で、「kafka:Topic名?brokers=brokerのホスト:ポート」のように指定します。
「groupId=TESTGROUP」はConsumerが今回は1つなので、プロパティの設定をしなくても構いません。
作成したCamelアプリケーションを実行する
作成したCamelアプリケーションを実行すると、以下のように「body = Sun Sep 02 10:15:01 JST 2018: test message」のログが毎秒1回出力されていることを確認できます。
[2018-09-02 10:15:01.041], [INFO ], consumer_test, Camel (camel-1) thread #1 - KafkaConsumer[test_queue2], consumer_test, body = Sun Sep 02 10:15:01 JST 2018: test message
[2018-09-02 10:15:02.011], [INFO ], producer_test, Camel (camel-1) thread #3 - KafkaProducer[test_queue2], producer_test, producer_test end
[2018-09-02 10:15:02.011], [INFO ], consumer_test, Camel (camel-1) thread #1 - KafkaConsumer[test_queue2], consumer_test, body = Sun Sep 02 10:15:02 JST 2018: test message
[2018-09-02 10:15:03.011], [INFO ], producer_test, Camel (camel-1) thread #4 - KafkaProducer[test_queue2], producer_test, producer_test end
[2018-09-02 10:15:03.011], [INFO ], consumer_test, Camel (camel-1) thread #1 - KafkaConsumer[test_queue2], consumer_test, body = Sun Sep 02 10:15:03 JST 2018: test message
[2018-09-02 10:15:04.011], [INFO ], producer_test, Camel (camel-1) thread #5 - KafkaProducer[test_queue2], producer_test, producer_test end
[2018-09-02 10:15:04.011], [INFO ], consumer_test, Camel (camel-1) thread #1 - KafkaConsumer[test_queue2], consumer_test, body = Sun Sep 02 10:15:04 JST 2018: test message
[2018-09-02 10:15:05.005], [INFO ], producer_test, Camel (camel-1) thread #6 - KafkaProducer[test_queue2], producer_test, producer_test end
Kafkaコンポーネントには今回利用したプロパティ以外にもたくさんのプロパティがあります。
詳細は以下のページを参照してください。
全ソース
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>sample</groupId>
<artifactId>kafka-camel_simple</artifactId>
<version>1.0.0</version>
<name>kafka-camel_simple</name>
<description>kafka-camel_simple</description>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
<camel.version>2.22.0</camel.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-core</artifactId>
<version>${camel.version}</version>
</dependency>
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-spring</artifactId>
<version>${camel.version}</version>
</dependency>
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-quartz2</artifactId>
<version>${camel.version}</version>
</dependency>
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-kafka</artifactId>
<version>${camel.version}</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>2.8.2</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.8.2</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>2.8.2</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.25</version>
</dependency>
</dependencies>
</project>
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:camel="http://camel.apache.org/schema/spring"
xmlns:util="http://www.springframework.org/schema/util"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-4.2.xsd
http://camel.apache.org/schema/spring
http://camel.apache.org/schema/spring/camel-spring.xsd
http://www.springframework.org/schema/util
http://www.springframework.org/schema/util/spring-util-4.2.xsd">
<camelContext xmlns="http://camel.apache.org/schema/spring">
<route id="consumer_test">
<from
uri="kafka:test_queue?brokers=192.168.10.122:9092&groupId=TESTGROUP" />
<log message="body = ${body}" />
</route>
<route id="producer_test">
<from uri="quartz2://test/testTimer?cron=0/1+*+*+*+*+?" />
<setBody>
<simple>${date:now}: test message</simple>
</setBody>
<to
uri="kafka:test_queue?brokers=192.168.10.122:9092" />
<log message="producer_test end" />
</route>
</camelContext>
</beans>
package sample;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.support.ClassPathXmlApplicationContext;
public class TestMain {
Logger logger = LoggerFactory.getLogger("sample.TestMain");
public static void main(String[] args) throws Exception {
TestMain main = new TestMain();
main.start();
}
private void start() throws Exception {
logger.info("start ");
try (ClassPathXmlApplicationContext applicationContext =
new ClassPathXmlApplicationContext("camel-context.xml")) {
applicationContext.start();
Thread.sleep(60000);
}
}
}