はじめに
Apache Kafkaは、非常に人気のあるストリーミングのプラットフォームです。私は2019年にApache Kafkaの勉強を始めました。Apache Kafkaのサイトには、詳しいチュートリアルなどがありますが、実際にアプリケーションを動作させようとすると、細かい問題がいくつかありました。ものすごく難しい問題があったわけではないのですが、全ての問題を解決するには、それなりの時間がかかりました。そこで、私の経験をもとに、IDEとしてEclipseを用い、簡単なKafka StreamsアプリケーションをJavaで作るチュートリアルを2020年にIBM Developerに公開しました。現在は、こちらに、2023年に改訂したものを公開しています。IBM Developerは、以前は日本語版もあったのですが、現在は日本語版は閉鎖されています。そこで、IBM Developerに書いたチュートリアルをもとに、Qiitaに日本語でチュートリアルを書くことにしました。このチュートリアルでは、私が遭遇した問題を回避する方法を中心に述べています。Apache KafkaやKafka Streamsの概念などは、Apache Kafkaのサイトを参照して下さい。
開発環境の準備
Kafka
Kafkaのダウンロードページからサポートされている最新のバイナリー(このチュートリアルの執筆時点では3.9.0)をダウンロードし、適当なディレクトリーにアンパックして下さい。
Java
KafkaのドキュメントによるとKafka 3.9.0では、Java 8、Java 11、及びJava 17をサポートしています。このチュートリアルでは、Java 17を使っています。
もし、これらのバージョンのJavaを導入していなければ、IBM Semeru Runtimesのダウンロードページなどから導入して下さい。
Eclipse
Eclipseのダウンロードページから最新のEclipse Installer(このチュートリアルの執筆時点では2024-12 R)をダウンロードし、ダウンロードしたEclipse Installerを実行し、パッケージとしてEclipse IDE for Java Developersを指定して導入して下さい。Eclipse Installerの最後の画面でLAUNCHボタンが表示されたら、そのボタンを押し、さらに必要に応じてワークスペースとして使うディレクトリーを変更し、Launchボタンを押して、Eclipseを起動して下さい。右上のHideボタンをクリックして下さい。
streams-quickstart-javaを使ったMavenプロジェクトの作成
Apache KafkaのサイトのKafka Streamsのチュートリアルでは、Mavenアーキタイプとしてstreams-quickstartを使ってプロジェクトを作成しています。同じことをEclipseで行ってみます。
(1) File > New > Projectと選択して下さい。
(2) New Projectの画面でMaven > Maven Projectを選択し、Nextをクリックして下さい。
(3) New Maven Projectの画面でNextをクリックして下さい。
(4) Filterにorg.apache.kafkaと入力し、streams-quickstart-javaを選択し、Nextをクリックして下さい。(初めて実行するときは、直ぐにはstreams-quickstart-javaが表示されないので、数分待って下さい。)
(5) 次のように入力し、Finishをクリックして下さい。
Group ID: streams.examples
Artifact ID: streams-quickstart
Version: 0.1
Package: myapps
run archetype generation interactively: チェック外す
(6) Package Exploreで、次のファイルが生成されていることを確認して下さい。
maven-archetype-quickstartを使ったMavenプロジェクトの作成
次にMavenアーキタイプとしてmaven-archetype-quickstartを使ってプロジェクトを作成します。streams-quickstartを使ったプロジェクトとmaven-archetype-quickstartを使ったプロジェクトの大きな違いは、プロジェクトネイチャーです。前者はMavenネイチャーのみを持ちますが、後者はMavenネイチャーとJavaネイチャーを持ちます。Mavenは多くの機能を持っていますが、このチュートリアルではJavaのライブラリーの依存関係を解決することのみにMavenを使います。それ以外のJavaの開発に関するEclipseの機能はJavaネイチャーを用いています。
(1) 同様に別のMavenプロジェクトを作成します。まず、次のMavenアーキタイプを指定し、Nextをクリックして下さい。
Group ID: org.apache.maven.archetypes
Artifact ID: maven-archetype-quickstart
(2) 次のように入力し、Finishをクリックして下さい。
Group ID: com.ibm.code
Artifact ID: streams.tutorial
Version: 0.1
Package: com.ibm.code.streams.tutorial
run archetype generation interactively: チェック外す
(3) Package Exploreにstreams.tutorialプロジェクトが表示されます。
(4) このプロジェクトではJava 17が使われることが分かります。もし別のバージョンが表示されていたら、JRE System Libraryを右クリックし、ポップアップメニューからPropertiesを選び、Execution environmentをJavaSE-17に設定して下さい。
Kafka Streams APIを使うためのMavenプロジェクトの設定
(1) Kafka Streams APIを使うためにはpom.xmlを編集します。Package Exploreでstreams.tutorialプロジェクトのpom.xmlをダブルクリックするとPOMエディターが開きます。Dependenciesタブをクリックして下さい。
(2) Dependenciesの下のAddボタンをクリックして下さい。Select Dependencyの画面が現れます。次の値を入力し、OKをクリックして下さい。
Group Id: org.apache.kafka
Artifact Id: kafka-streams
Version: 3.9.0
Scope: compile
(3) ライブラリーの依存関係を確認するには、ツールバーのSaveアイコンをクリックし、Dependency Hierarchyタブをクリックして下さい。
(4) この依存関係から、Apache KafkaはSimple Logging Facade for Java (SLF4J)を使っていることが分かります。ログを出力するにはSLF4Jがサポートするロギングのフレームワークを使う必要があります。ここでは、reload4j を使うことにします。Dependenciesタブをクリックし、Dependenciesの下のAddボタンをクリックします。次の値を入力し、OKをクリックして下さい。
Group Id: org.slf4j
Artifact Id: slf4j-reload4j
Version: 1.7.36
Scope: runtime
(5) ライブラリーの依存関係を確認するには、ツールバーのSaveアイコンをクリックし、Dependency Hierarchyタブをクリックして下さい。
ここに絵を入れる。
(6) Log4jを使うにはlog4j.propertiesファイルを用意する必要があります。streams-quickstartプロジェクトは、このファイルを含みますので、今回は、こちらから複写することにします。Package Exploreでstream-quickstartプロジェクトのsrc及びmainを展開し、resourcesフォルダーを選択し、Edit > Copyを選択して下さい。
(7) streams.tutorialプロジェクトのsrcを展開し、mainフォルダーを選択し、Edit > Pasteを選択して下さい。
(8) src/main/resourcesの中にlog4.propertiesファイルがあることが確認できます。
(9) 不要なファイルを削除します。src/main/java及びcom.ibm.code.streams.tutorialを展開し、App.javaを選択し、Edit > Deleteを選択して下さい。Deleteの画面が開くので、OKボタンをクリックして下さい。
(10) 同様に、src/test/java及びcom.ibm.code.streams.tutorialを展開し、AppTest.javaを削除して下さい。
単純なKafka Streamsアプリケーションの開発
ここでは、integer、even、oddという三つのトピックを処理する単純なKafka Streamsアプリケーションを作ります。全てのトピックは、整数のキーと文字列の値を持ちます。このアプリケーションは、integerトピックからイベントを読み、キーが偶数のときはevenトピックに、キーが奇数のときはoddトピックに書きます。evenトピックへのイベントでは、値の文字列を大文字に変換します。また、oddトピックへのイベントでは、値の文字列を小文字に変換します。
(1) com.ibm.code.streams.tutorialパッケージを選択し、File > New > Classを選択して下さい。New Java Classの画面が開きます。
(2) 次の値を入力し、Finishボタンをクリックして下さい。
Name: EvenOddBranchApp
(3) EvenOddBranchApp.javaを編集するJavaエディターが開きます。トピック名を定義するために、Javaエディターで作成されたクラスの中に次のコードを追加して下さい。
public static final String INTEGER_TOPIC_NAME = "integer";
public static final String EVEN_TOPIC_NAME = "even";
public static final String ODD_TOPIC_NAME = "odd";
(4) createPropertiesメソッドを定義するために、クラスの中に次のコードを追加して下さい。
public static Properties createProperties() {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "even-odd-branch");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
return props;
}
このメソッドは、Kafka Streamsを設定するためのjava.util.Propertiesのインスタンスを返します。StreamsConfig.APPLICATION_ID_CONFIGは、アプリケーションの識別子です。StreamsConfig.BOOTSTRAP_SERVERS_CONFIGは、Kafkaクラスターに接続するためのホストとポートです。StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIGは、デフォルトのキーのシリアライザー/デシリアライザーのクラスです。 StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIGは、デフォルトの値のシリアライザー/デシリアライザーのクラスです。詳しくは、Apache Kafkaのドキュメントを参照して下さい。
(5) createTopologyメソッドを定義するために、クラスの中に次のコードを追加します。
public static Topology createTopology() {
final StreamsBuilder builder = new StreamsBuilder();
builder.<Integer, String>stream(INTEGER_TOPIC_NAME)
.split()
.branch(
(key, value) -> key % 2 == 0,
Branched.withConsumer(ks->ks
.peek((key, value) -> System.out.printf("even: %s, %s%n", key, value))
.mapValues(v -> v.toUpperCase())
.to(EVEN_TOPIC_NAME)))
.branch(
(key, value) -> true,
Branched.withConsumer(ks->ks
.peek((key, value) -> System.out.printf("odd: %s, %s%n", key, value))
.mapValues(v -> v.toLowerCase())
.to(ODD_TOPIC_NAME)));
return builder.build();
}
このメソッドは、ストリームプロセッサーのトポロジーを返します。
(6) mainメソッドを定義するために、クラスの中に次のコードを追加して下さい。
public static void main(String[] args) {
Properties props = createProperties();
final Topology topology = createTopology();
final KafkaStreams streams = new KafkaStreams(topology, props);
final CountDownLatch latch = new CountDownLatch(1);
Runtime.getRuntime().addShutdownHook(new Thread("kafka-streams-shutdown-hook") {
@Override
public void run() {
streams.close();
latch.countDown();
}
});
try {
streams.start();
latch.await();
} catch (Throwable e) {
System.exit(1);
}
System.exit(0);
}
org.apache.kafka.streams.KafkaStreamsのインスタンスを作成します。startメソッドで処理を開始し、closeメソッドで終了します。
(7) Javaエディターは、パッケージをインポートしていないため、沢山のエラーを表示しています。パッケージをインポートするには、メニューからSource > Organize importsを選んで下さい。するとエラーが消えます。
(8) 編集したEvenOddBranchApp.javaを保管するには、File > Saveを選択して下さい。
Kafkaサーバーを用いたテスト
ここでは、ローカルホストでZooKeeperサーバーとKafkaサーバーを起動してKafka Streamsアプリケーションをテストします。
(1) ZooKeeperサーバーを起動するために、ターミナルウィンドウを開き、Kafkaのバイナリーをアンパックしてできたディレクトリーに移動し、次のコマンドを実行して下さい。
bin/zookeeper-server-start.sh config/zookeeper.properties
(2) Kafkaサーバーを起動するために、ターミナルウィンドウで新しいタブを開き、同じディレクトリーに移動し、次のコマンドを実行して下さい。
bin/kafka-server-start.sh config/server.properties
(3) 三つのトピックを作成するために、ターミナルウィンドウで新しいタブを開き、同じディレクトリーに移動し、次のコマンドを実行して下さい。
bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic integer
bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic even
bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic odd
(4) 作成したトピックをリストするには、次のコマンドを実行して下さい。
bin/kafka-topics.sh --list --bootstrap-server localhost:9092
(5) Package ExploreでEvenOddBranchApp.javaを選択し、メニューからRun > Run Configurationsを選択して下さい。
(6) Run Coinfigurationsの画面でJava Applicationをダブルクリックして下さい。
(7) Show Command Lineボタンをクリックし、Copy & Closeボタンをクリックし、Closeボタンをクリックして下さい。
(8) テキストエディターを開き、ペーストして下さい。ペーストしたコマンドが複数の行に分かれていたら、最後の行以外の全ての行の終わりに空白とバックスラッシュを追加して下さい。修正したコマンドをコピーし、ターミナルウィンドウにペーストして実行して下さい。
EclipseからJavaアプリケーションを実行することもできますが、Eclipseから実行するとシャットダウンフックを実行することができません。EvenOddBranchAppではシャットダウンフックを使っているため、Eclipseの外から実行しています。
(9) evenトピックを表示させるために、ターミナルウィンドウで新しいタブを開き、同じディレクトリーに移動し、次のコマンドを実行して下さい。
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
--topic even \
--from-beginning \
--formatter kafka.tools.DefaultMessageFormatter \
--property print.key=true \
--property print.value=true \
--property key.deserializer=org.apache.kafka.common.serialization.IntegerDeserializer \
--property value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
(10) oddトピックを表示させるために、ターミナルウィンドウで新しいタブを開き、同じディレクトリーに移動し、次のコマンドを実行して下さい。
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
--topic odd \
--from-beginning \
--formatter kafka.tools.DefaultMessageFormatter \
--property print.key=true \
--property print.value=true \
--property key.deserializer=org.apache.kafka.common.serialization.IntegerDeserializer \
--property value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
(11) inputトピックにイベントを生成するために、com.ibm.code.streams.tutorialパッケージにIntegerProducerというクラスを作り、次のmainメソッドを追加して下さい。
public static void main(String[] args) {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
Producer<Integer, String> producer = new KafkaProducer<>(props);
for (int i = 0; i < 10; i++) {
producer.send(new ProducerRecord<Integer, String>(EvenOddBranchApp.INTEGER_TOPIC_NAME,
Integer.valueOf(i), "Value - " + i));
}
producer.close();
}
このメソッドは、integerトピックに10個のイベントを生成します。
(12) パッケージをインポートするために、メニューからSource > Organize importsを選択して下さい。
(13) 保管するために、メニューからFile > Saveを選択して下さい。
(14) 実行するために、Package ExploreからIntegerProducer.javaを選び、Run > Run As > Java Applicationを選択して下さい。
このJavaアプリケーションは、シャットダウンフックを含んでいないためEclipseから実行できます。
(15) ターミナルウィンドウでEvenOddBranchAppを実行しているタブを選ぶと、次のように表示されています。
even: 0, Value - 0
odd: 1, Value - 1
even: 2, Value - 2
odd: 3, Value - 3
even: 4, Value - 4
odd: 5, Value - 5
even: 6, Value - 6
odd: 7, Value - 7
even: 8, Value - 8
odd: 9, Value - 9
(16) evenトピックを表示するタブを選ぶと、次のように表示されています。
0 VALUE - 0
2 VALUE - 2
4 VALUE - 4
6 VALUE - 6
8 VALUE - 8
(17) oddトピックを表示するタブを選ぶと、次のように表示されています。
1 value - 1
3 value - 3
5 value - 5
7 value - 7
9 value – 9
(18) EvenOddBranchAppを実行しているタブを選びコントロールとCを同時に押すと、実行が終了します。evenトピックを表示するタブ、oddトピックを表示するタブ、Kafkaサーバーを実行するタブ、ZooKeeperサーバーを実行するタブでも同様にコントロールとCを同時に押し、実行を終了して下さい。
kafka-streams-test-utilsとJUnit 5を用いたテスト
Kafkaサーバーを使わないでkafka-streams-test-utilsとJUnit 5を用いてKafka Streamsアプリケーションをテストすることもできます。
(1) pom.xmlを開き、Dependenciesタブをクリックし、Dependenciesの下のAddボタンをクリックし、次の値を入力し、OKボタンをクリックして下さい。
Group ID: org.apache.kafka
Artifact ID: kafka-streams-test-utils
Version: 3.9.0
Scope: test
(2) ライブラリーの依存関係を確認するには、ツールバーのSaveアイコンをクリックし、Dependency Hierarchyタブをクリックして下さい。
(3) JUnitのテストケースのクラスを作るために、Package Explorerでsrc/test/javaの下のcom.ibm.code.streams.tutorialパッケージを選択し、File > New > JUnit Test Caseを選択して下さい。
(4) JUnit Test Caseの画面で次の値を入力し、Finishボタンをクリックして下さい。
最初の行のJUnitのバージョンの選択: New JUnit Jupiter test
Name: EvenOddBranchAppTest
(5) もし、"JUnit 5 is not on the build path."という画面が表示されたら、"Add JUnit 5 library to the build path"を選択して、OKをクリックして下さい。
(6) EvenOddBranchAppTest.javaがJavaエディターで開きます。testメソッドを削除し、次のコードをクラスの中に追加して下さい。
private TopologyTestDriver testDriver = null;
private TestInputTopic<Integer, String> integerInputTopic = null;
private TestOutputTopic<Integer, String> evenOutputTopic = null;
private TestOutputTopic<Integer, String> oddOutputTopic = null;
private Serde<Integer> integerSerde = new Serdes.IntegerSerde();
private Serde<String> stringSerde = new Serdes.StringSerde();
@BeforeEach
void setUp() throws Exception {
Properties props = EvenOddBranchApp.createProperties();
Topology topology = EvenOddBranchApp.createTopology();
testDriver = new TopologyTestDriver(topology, props);
integerInputTopic = testDriver.createInputTopic(EvenOddBranchApp.INTEGER_TOPIC_NAME,
integerSerde.serializer(), stringSerde.serializer());
evenOutputTopic = testDriver.createOutputTopic(EvenOddBranchApp.EVEN_TOPIC_NAME,
integerSerde.deserializer(), stringSerde.deserializer());
oddOutputTopic = testDriver.createOutputTopic(EvenOddBranchApp.ODD_TOPIC_NAME,
integerSerde.deserializer(), stringSerde.deserializer());
}
@AfterEach
void tearDown() throws Exception {
testDriver.close();
}
@Test
void testEven() {
int key = 0;
String value = "Value - 0";
integerInputTopic.pipeInput(key, value);
KeyValue<Integer, String> keyValue = evenOutputTopic.readKeyValue();
assertEquals(keyValue, new KeyValue<>(key, value.toUpperCase()));
assertTrue(evenOutputTopic.isEmpty());
assertTrue(oddOutputTopic.isEmpty());
}
@Test
void testOdd() {
int key = 1;
String value = "Value - 1";
integerInputTopic.pipeInput(key, value);
KeyValue<Integer, String> keyValule = oddOutputTopic.readKeyValue();
assertEquals(keyValule, new KeyValue<>(key, value.toLowerCase()));
assertTrue(oddOutputTopic.isEmpty());
assertTrue(evenOutputTopic.isEmpty());
}
setUpメソッドは、それぞれのテストの前に呼ばれます。tearDownメソッドは、それぞれのテストの後に呼ばれます。testEvenメソッドは、0をキーに持ち”Value – 0”を値に持つイベントをintegerトピックに入力したときのテストを行います。testOddメソッドは、1をキーに持ち”Value – 1”を値に持つイベントをintegerトピックに入力したときのテストを行います。
(7) パッケージをインポートするために、メニューからSource > Organize importsを選択して下さい。KeyValueは、org.apache.kafka.streams.KeyValueを選択して下さい。
(8) 保管するために、メニューからFile > Saveを選択して下さい。
(9) JUnitテストケースを実行するために、Package ExplorerでEvenOddBranchAppTest.javaを選択し、Run > Run As > JUnit Testを選択して下さい。
(10) JUnitビューが表示されます。EvenOddBranchAppTestを展開すると、テスト結果を確認することができます。
(11) (オプション)Javaエディターでブレークポイントを設定し、Run > Debug As > JUnit Testでデバックすることもできます。
おわりに
このチュートリアルでは、Eclipseを用いてKafka Streamsアプリケーションを開発するための環境を準備し、単純なアプリケーションを作成し、テストする方法を示しました。