Javaのアプリケーションサーバである Open Liberty では、MicroProfile Reactive Messagingと呼ばれる仕様のAPIを使うことでイベント処理することが可能です。
この記事ではMicroProfile Reactive MessagingでApache Kafkaのイベントを読み取って処理するConsumerを実装してみます。イベントの送信については別の記事で紹介する予定です。
本記事で使用するコンポーネント
本記事では以下のコンポーネント・バージョンを使用します。
今後のバージョンアップで動作しない可能性がありますので、本番投入する際には事前に手元で動作確認することをお勧めします。
コンポーネント名 | バージョン |
---|---|
JDK | OpenJDK 17.0.4.1 (IBM Semeru Runtime Open Edition) |
Open Liberty | 22.0.0.11 |
Open Liberty Javaアプリケーションの雛形生成
まずはConsumerを実装するOpen Libertyアプリケーションを作成します。
アプリケーションの雛形は、Open Libertyのサイトにある雛形生成機能を使用すると簡単です。
Starterアプリケーションの雛形生成ページから、下図の通り値を入力して「Generate Project」ボタンを押下します。
「Group」と「Artifact」は任意の値でOKです。
この記事を書いている時点では、Java EE/Jakarta EE Versionに「9.1」を指定するとConsumerとして動作しないので注意してください。
依存ライブラリのインストール
MicroProfile Reactive MessagingのAPIからKafkaのイベントを扱うために、依存ライブラリをインストールしましょう。
先ほど生成した雛形プロジェクトではライブラリの管理方法にMavenを指定していました。Mavenの設定ファイル pom.xml
の dependencies
要素に以下の依存ライブラリを指定します。インストールはEclipseやIntelliJ等のIDE側で自動的に実行されます。
<!-- 必要なものだけ記載 -->
<dependencies>
<dependency>
<groupId>org.eclipse.microprofile.reactive.messaging</groupId>
<artifactId>microprofile-reactive-messaging-api</artifactId>
<version>2.0.1</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.3.1</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-jdk14</artifactId>
<version>1.7.36</version> <!-- 2.x.y はNG -->
</dependency>
</dependencies>
末尾に追加している slf4j-jdk14
は、Open LibertyでMicroProfile Reactive MessagingとKafkaクライアントライブラリのログを出力するために使用します。
slf4j-jdk14は本記事執筆時点 (2022/11) で表のバージョンよりも高い 2.x.y というバージョンが使用可能です。しかしバージョン2系を指定するとアプリケーションが正常に起動しない場合がありますので上記のバージョンを指定する方が安全です。
Featureの指定
続いてOpen LibertyにMicroProfile Reactive Messagingを使用するために、Featureを指定して読み込ませます。
Open Libertyで使用されるモジュールは「Feature」という形で細分化されています。必要なFeatureのみ使用することで、アプリケーションのサイズを小さく保つことが可能です。
今回使用するMicroProfile Reactive Messagingも専用のFeatureが用意されています。
Open Libertyの設定ファイル server.xml
に mpReactiveMessaging-1.0
Featureを指定します。
ついでに <logging>
要素でログ出力も有効化しておきます。
<!-- src/main/liberty/config/server.xml -->
<?xml version="1.0" encoding="UTF-8"?>
<server description="My Liberty Server">
<!-- Enable features -->
<featureManager>
<feature>mpReactiveMessaging-1.0</feature>
</featureManager>
<!-- ...中略... -->
<!-- Logging -->
<logging traceSpecification="REACTIVEMESSAGE=all:org.apache.kafka.*=all"/>
</server>
Kafkaの接続情報を定義
ライブラリとFeatureをインストールしたら、続いてApache Kafkaとの接続情報を定義します。接続情報は MicroProfile Config の設定ファイル microprofile-config.properties
に定義します。
<プロジェクトルート>/src/main/resources/META-INF/microprofile-config.properties
に以下の通り設定値を定義してください。
mp.messaging.incoming.purchases.connector=liberty-kafka
mp.messaging.incoming.purchases.bootstrap.servers=localhost:9092
mp.messaging.incoming.purchases.group.id=liberty-sample-consumer
mp.messaging.incoming.purchases.key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
mp.messaging.incoming.purchases.value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
mp.messaging.incoming.purchases.auto.offset.reset=earliest
設定値は mp.messaging.<incoming|outgoing>.<チャンネル名>.<プロパティ名>=<設定値>
という書式で定義します。
ドット区切り3つ目の要素は、イベントを読み取る場合は incoming
を指定してください。逆にイベントを送る場合は outgoing
を指定します。
<プロパティ名>
はApache Kafkaの設定値名に準拠するため、さらに詳細に設定したい場合はApache Kafkaのドキュメントを参照してください。
ここではKafka Broker localhost:9092
に接続し、イベントのKeyとValueはそれぞれ String
型であることを定義しています。connector
プロパティのみ特殊なプロパティであり、Kafkaと接続する際には固定値 liberty-kafka
を指定します。
参考: Apache Kafka Consumer Config
イベント操作アプリケーションの実装
最後にKafkaからイベントを読み取るアプリケーションを実装します。
ここではイベントのValueに指定された購買商品名をログ出力する想定で実装を行います。
実装サンプルは以下の通りです。
たった20行程度の簡単なサンプルですが、これだけでイベントを処理可能です。
// PurchaseResource.java
package com.example.resource;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import javax.enterprise.context.ApplicationScoped;
import java.util.logging.Logger;
@ApplicationScoped
public class PurchaseResource {
private static final Logger logger = Logger.getLogger(PurchaseResource.class.getName());
@Incoming("purchases")
public void consumePurchases(String item) {
final String itemMessage = String.format("Consumed item purchase: item name = %s", item);
logger.info(itemMessage);
}
}
最も重要なパートは @Incoming("<チャンネル名>")
です。
@Incoming
はMicroProfile Reactive Messagingで提供された、イベントを読み取るアノテーションです。アノテーションに指定したチャンネル名は microprofile-config.properties
で定義したチャンネル名に対応する設定値と紐づけられます。
このサンプルコードでは @Incoming
を付加したメソッドの引数にイベントのValueのみ指定しています。イベントのKeyやTopic名を取得したい場合は、以下の3つの実装を追加します。
(1) 引数に Message
型を指定する
(2) ConsumerRecord
型で引数の値をキャストする、
(3) メソッドが返却する型に CompletionStage
を指定する
詳しいサンプルは以下のBlog記事を参照ください。
Blog - Access specific properties in Kafka messages and set the SameSite attribute on cookies in Open Liberty 20.0.0.3
アプリケーションの稼働確認
前節まででアプリケーションの準備が整ったので、実際にKafkaと接続してイベントを読み取ります。
アプリケーション稼働前にKafka Brokerを動作させ、Topic purchases
を作成してください。
また、Starterアプリケーションには既にOpen Libertyを起動するMaven Pluginが組み込まれています。以下のコマンドでOpen Libertyを起動します。ビルドされたWARファイルは自動的にOpen Libertyサーバにデプロイされます。
$ mvn liberty:dev
大量のログが出力されますが、Successfully joined group with generation Generation
というログが出力されていればKafka Brokerに接続してイベントを読み出す準備ができています。
kafka-console-producer
等のProducerからイベントを送信してみます。
$ kafka-console-producer.sh --bootstrap-server localhost:9092 \
--topic purchases \
--property "parse.key=true" --property "key.separator=:"
>alice:apple
>bob:banana
>cris:orange
>^C
Open Libertyアプリケーション側には、ほぼリアルタイムにイベントのValueがログ出力されています。
...
[INFO] [2022/11/06 22:36:06:474 JST] 0000003c com.example.resource.PurchaseResource I Consumed item purchase: item name = apple
[INFO] [2022/11/06 22:36:08:018 JST] 0000003a com.example.resource.PurchaseResource I Consumed item purchase: item name = banana
[INFO] [2022/11/06 22:36:10:404 JST] 0000003d com.example.resource.PurchaseResource I Consumed item purchase: item name = orange
最終系の構成
アプリケーションの最終的なディレクトリ構成は以下の通りです。
/path/to/project
は任意のプロジェクトルートのパスが入ります。
$ tree /path/to/project
/path/to/project
├── pom.xml
└── src
└── main
├── java
│ └── com
│ └── example
│ └── resource
│ └── PurchaseResource.java
├── liberty
│ └── config
│ └── server.xml
└── resources
└── META-INF
└── microprofile-config.properties