4
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

Apache CamelフレームワークのKafkaコンポーネントを使用して簡単なメッセージの送受信

Last updated at Posted at 2018-09-02

はじめに

Apache CamelのKafkaコンポーネントを使用して、簡単にkafkaとのメッセージの送受信が実現できます。
これを使用して、次のプログラムを作成してみます。

・1秒間に1回、Topic(test_queue)に対してメッセージを送信する。
・Topic(test_queue)のメッセージを受信し、ログに出力する。

※ 本記事ではrouteの定義にXML DSLを使用していますが、次の記事でJava DSL
版を書いています。それ以外の内容も本記事より情報を多くしています。ご参考まで。

pom.xml修正

まずはじめに、pom.xmlにkafka用のライブラリを追加します。

pom.xml
		<dependency>
			<groupId>org.apache.camel</groupId>
			<artifactId>camel-kafka</artifactId>
			<version>2.21.0</version>
		</dependency>
		<dependency>
			<groupId>org.apache.kafka</groupId>
			<artifactId>kafka-clients</artifactId>
			<version>2.0.0</version>
		</dependency>

Producer作成

1秒間に1回、Topic(test_queue)に対してメッセージを送信します。
送信するメッセージは「${date:now}: test message」というように送信時間を含めた文字列にしました。

XML DSLを書いたrouteは以下になります。

        <route id="producer_test">
            <from uri="quartz2://test/testTimer?cron=0/1+*+*+*+*+?" />
            <setBody>
                <simple>${date:now}: test message</simple>
            </setBody>
            <to
                uri="kafka:test_queue?brokers=192.168.10.122:9092" />
            <log message="producer_test end" />
        </route>

「<to uri="kafka:test_queue?brokers=192.168.10.122:9092" />」がProducerの設定で、
「kafka:Topic名?brokers=brokerのホスト:ポート」のように指定します。

Topicは自動で作成されますが、Topicのレプリケーション数等の設定を変えたい場合は事前に作成しておきます。

Consumer作成

Topic(test_queue)のメッセージを受信し、ログに出力します。
XML DSLを書いたrouteは以下になります。

        <route id="consumer_test">
            <from
                uri="kafka:test_queue?brokers=192.168.10.122:9092&amp;groupId=TESTGROUP" />
            <log message="body = ${body}" />
        </route>

「<from uri="kafka:test_queue?brokers=192.168.10.122:9092&groupId=TESTGROUP" />」がConsumerの設定で、「kafka:Topic名?brokers=brokerのホスト:ポート」のように指定します。
「groupId=TESTGROUP」はConsumerが今回は1つなので、プロパティの設定をしなくても構いません。

作成したCamelアプリケーションを実行する

作成したCamelアプリケーションを実行すると、以下のように「body = Sun Sep 02 10:15:01 JST 2018: test message」のログが毎秒1回出力されていることを確認できます。

[2018-09-02 10:15:01.041], [INFO ], consumer_test, Camel (camel-1) thread #1 - KafkaConsumer[test_queue2], consumer_test, body = Sun Sep 02 10:15:01 JST 2018: test message
[2018-09-02 10:15:02.011], [INFO ], producer_test, Camel (camel-1) thread #3 - KafkaProducer[test_queue2], producer_test, producer_test end
[2018-09-02 10:15:02.011], [INFO ], consumer_test, Camel (camel-1) thread #1 - KafkaConsumer[test_queue2], consumer_test, body = Sun Sep 02 10:15:02 JST 2018: test message
[2018-09-02 10:15:03.011], [INFO ], producer_test, Camel (camel-1) thread #4 - KafkaProducer[test_queue2], producer_test, producer_test end
[2018-09-02 10:15:03.011], [INFO ], consumer_test, Camel (camel-1) thread #1 - KafkaConsumer[test_queue2], consumer_test, body = Sun Sep 02 10:15:03 JST 2018: test message
[2018-09-02 10:15:04.011], [INFO ], producer_test, Camel (camel-1) thread #5 - KafkaProducer[test_queue2], producer_test, producer_test end
[2018-09-02 10:15:04.011], [INFO ], consumer_test, Camel (camel-1) thread #1 - KafkaConsumer[test_queue2], consumer_test, body = Sun Sep 02 10:15:04 JST 2018: test message
[2018-09-02 10:15:05.005], [INFO ], producer_test, Camel (camel-1) thread #6 - KafkaProducer[test_queue2], producer_test, producer_test end

Kafkaコンポーネントには今回利用したプロパティ以外にもたくさんのプロパティがあります。
詳細は以下のページを参照してください。

全ソース

pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>
	<groupId>sample</groupId>
	<artifactId>kafka-camel_simple</artifactId>
	<version>1.0.0</version>
	<name>kafka-camel_simple</name>
	<description>kafka-camel_simple</description>

	<properties>
		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
		<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
		<java.version>1.8</java.version>
		<camel.version>2.22.0</camel.version>
	</properties>

	<dependencies>
		<dependency>
			<groupId>org.apache.camel</groupId>
			<artifactId>camel-core</artifactId>
			<version>${camel.version}</version>
		</dependency>
		<dependency>
			<groupId>org.apache.camel</groupId>
			<artifactId>camel-spring</artifactId>
			<version>${camel.version}</version>
		</dependency>
		<dependency>
			<groupId>org.apache.camel</groupId>
			<artifactId>camel-quartz2</artifactId>
			<version>${camel.version}</version>
		</dependency>
		<dependency>
			<groupId>org.apache.camel</groupId>
			<artifactId>camel-kafka</artifactId>
			<version>${camel.version}</version>
		</dependency>
		<dependency>
			<groupId>org.apache.kafka</groupId>
			<artifactId>kafka-clients</artifactId>
			<version>2.0.0</version>
		</dependency>
		<dependency>
			<groupId>org.apache.logging.log4j</groupId>
			<artifactId>log4j-api</artifactId>
			<version>2.8.2</version>
		</dependency>
		<dependency>
			<groupId>org.apache.logging.log4j</groupId>
			<artifactId>log4j-core</artifactId>
			<version>2.8.2</version>
		</dependency>
		<dependency>
			<groupId>org.apache.logging.log4j</groupId>
			<artifactId>log4j-slf4j-impl</artifactId>
			<version>2.8.2</version>
		</dependency>
		<dependency>
			<groupId>org.slf4j</groupId>
			<artifactId>slf4j-api</artifactId>
			<version>1.7.25</version>
		</dependency>
	</dependencies>

</project>
camel-context.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:camel="http://camel.apache.org/schema/spring"
	xmlns:util="http://www.springframework.org/schema/util"
	xsi:schemaLocation="http://www.springframework.org/schema/beans
          http://www.springframework.org/schema/beans/spring-beans-4.2.xsd
          http://camel.apache.org/schema/spring
          http://camel.apache.org/schema/spring/camel-spring.xsd
          http://www.springframework.org/schema/util
          http://www.springframework.org/schema/util/spring-util-4.2.xsd">

	<camelContext xmlns="http://camel.apache.org/schema/spring">
		<route id="consumer_test">
			<from
				uri="kafka:test_queue?brokers=192.168.10.122:9092&amp;groupId=TESTGROUP" />
			<log message="body = ${body}" />
		</route>

		<route id="producer_test">
			<from uri="quartz2://test/testTimer?cron=0/1+*+*+*+*+?" />
			<setBody>
				<simple>${date:now}: test message</simple>
			</setBody>
			<to
				uri="kafka:test_queue?brokers=192.168.10.122:9092" />
			<log message="producer_test end" />
		</route>
	</camelContext>
</beans>
TestMain.java
package sample;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.support.ClassPathXmlApplicationContext;

public class TestMain {

	Logger logger = LoggerFactory.getLogger("sample.TestMain");

	public static void main(String[] args) throws Exception {
		TestMain main = new TestMain();
		main.start();
	}

	private void start() throws Exception {
		logger.info("start ");
		try (ClassPathXmlApplicationContext applicationContext =
				  new ClassPathXmlApplicationContext("camel-context.xml")) {
			applicationContext.start();
			Thread.sleep(60000);
		}
	}
}
4
2
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
4
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?