LoginSignup
2
0

More than 1 year has passed since last update.

Open LibertyとMicroProfile Reactive MessagingでKafkaのイベントを読み取る

Last updated at Posted at 2022-11-07

Javaのアプリケーションサーバである Open Liberty では、MicroProfile Reactive Messagingと呼ばれる仕様のAPIを使うことでイベント処理することが可能です。

この記事ではMicroProfile Reactive MessagingでApache Kafkaのイベントを読み取って処理するConsumerを実装してみます。イベントの送信については別の記事で紹介する予定です。

スクリーンショット 2022-11-06 21.54.44.png

本記事で使用するコンポーネント

本記事では以下のコンポーネント・バージョンを使用します。
今後のバージョンアップで動作しない可能性がありますので、本番投入する際には事前に手元で動作確認することをお勧めします。

コンポーネント名 バージョン
JDK OpenJDK 17.0.4.1 (IBM Semeru Runtime Open Edition)
Open Liberty 22.0.0.11

Open Liberty Javaアプリケーションの雛形生成

まずはConsumerを実装するOpen Libertyアプリケーションを作成します。
アプリケーションの雛形は、Open Libertyのサイトにある雛形生成機能を使用すると簡単です。

Starterアプリケーションの雛形生成ページから、下図の通り値を入力して「Generate Project」ボタンを押下します。
「Group」と「Artifact」は任意の値でOKです。

スクリーンショット 2022-11-06 21.56.54.png

この記事を書いている時点では、Java EE/Jakarta EE Versionに「9.1」を指定するとConsumerとして動作しないので注意してください。

依存ライブラリのインストール

MicroProfile Reactive MessagingのAPIからKafkaのイベントを扱うために、依存ライブラリをインストールしましょう。
先ほど生成した雛形プロジェクトではライブラリの管理方法にMavenを指定していました。Mavenの設定ファイル pom.xmldependencies 要素に以下の依存ライブラリを指定します。インストールはEclipseやIntelliJ等のIDE側で自動的に実行されます。

    <!-- 必要なものだけ記載 -->
    <dependencies>
        <dependency>
            <groupId>org.eclipse.microprofile.reactive.messaging</groupId>
            <artifactId>microprofile-reactive-messaging-api</artifactId>
            <version>2.0.1</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>3.3.1</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-jdk14</artifactId>
            <version>1.7.36</version> <!-- 2.x.y はNG -->
        </dependency>
    </dependencies>

末尾に追加している slf4j-jdk14 は、Open LibertyでMicroProfile Reactive MessagingとKafkaクライアントライブラリのログを出力するために使用します。

slf4j-jdk14は本記事執筆時点 (2022/11) で表のバージョンよりも高い 2.x.y というバージョンが使用可能です。しかしバージョン2系を指定するとアプリケーションが正常に起動しない場合がありますので上記のバージョンを指定する方が安全です。

Featureの指定

続いてOpen LibertyにMicroProfile Reactive Messagingを使用するために、Featureを指定して読み込ませます。

Open Libertyで使用されるモジュールは「Feature」という形で細分化されています。必要なFeatureのみ使用することで、アプリケーションのサイズを小さく保つことが可能です。

今回使用するMicroProfile Reactive Messagingも専用のFeatureが用意されています。
Open Libertyの設定ファイル server.xmlmpReactiveMessaging-1.0 Featureを指定します。

ついでに <logging> 要素でログ出力も有効化しておきます。

<!-- src/main/liberty/config/server.xml -->
<?xml version="1.0" encoding="UTF-8"?>
<server description="My Liberty Server">

    <!-- Enable features -->
    <featureManager>
        <feature>mpReactiveMessaging-1.0</feature>
    </featureManager>

    <!-- ...中略... -->

    <!-- Logging -->
    <logging traceSpecification="REACTIVEMESSAGE=all:org.apache.kafka.*=all"/>

</server>

Kafkaの接続情報を定義

ライブラリとFeatureをインストールしたら、続いてApache Kafkaとの接続情報を定義します。接続情報は MicroProfile Config の設定ファイル microprofile-config.properties に定義します。

<プロジェクトルート>/src/main/resources/META-INF/microprofile-config.properties に以下の通り設定値を定義してください。

mp.messaging.incoming.purchases.connector=liberty-kafka
mp.messaging.incoming.purchases.bootstrap.servers=localhost:9092
mp.messaging.incoming.purchases.group.id=liberty-sample-consumer
mp.messaging.incoming.purchases.key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
mp.messaging.incoming.purchases.value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
mp.messaging.incoming.purchases.auto.offset.reset=earliest

設定値は mp.messaging.<incoming|outgoing>.<チャンネル名>.<プロパティ名>=<設定値> という書式で定義します。

ドット区切り3つ目の要素は、イベントを読み取る場合は incoming を指定してください。逆にイベントを送る場合は outgoing を指定します。
<プロパティ名> はApache Kafkaの設定値名に準拠するため、さらに詳細に設定したい場合はApache Kafkaのドキュメントを参照してください。

ここではKafka Broker localhost:9092 に接続し、イベントのKeyとValueはそれぞれ String 型であることを定義しています。connector プロパティのみ特殊なプロパティであり、Kafkaと接続する際には固定値 liberty-kafka を指定します。

参考: Apache Kafka Consumer Config

イベント操作アプリケーションの実装

最後にKafkaからイベントを読み取るアプリケーションを実装します。
ここではイベントのValueに指定された購買商品名をログ出力する想定で実装を行います。

実装サンプルは以下の通りです。
たった20行程度の簡単なサンプルですが、これだけでイベントを処理可能です。

// PurchaseResource.java
package com.example.resource;

import org.eclipse.microprofile.reactive.messaging.Incoming;
import javax.enterprise.context.ApplicationScoped;
import java.util.logging.Logger;

@ApplicationScoped
public class PurchaseResource {
    private static final Logger logger = Logger.getLogger(PurchaseResource.class.getName());

    @Incoming("purchases")
    public void consumePurchases(String item) {
        final String itemMessage = String.format("Consumed item purchase: item name = %s", item);
        logger.info(itemMessage);
    }
}

最も重要なパートは @Incoming("<チャンネル名>") です。
@Incoming はMicroProfile Reactive Messagingで提供された、イベントを読み取るアノテーションです。アノテーションに指定したチャンネル名は microprofile-config.properties で定義したチャンネル名に対応する設定値と紐づけられます。

このサンプルコードでは @Incoming を付加したメソッドの引数にイベントのValueのみ指定しています。イベントのKeyやTopic名を取得したい場合は、以下の3つの実装を追加します。

(1) 引数に Message 型を指定する
(2) ConsumerRecord 型で引数の値をキャストする、
(3) メソッドが返却する型に CompletionStage を指定する

詳しいサンプルは以下のBlog記事を参照ください。
Blog - Access specific properties in Kafka messages and set the SameSite attribute on cookies in Open Liberty 20.0.0.3

アプリケーションの稼働確認

前節まででアプリケーションの準備が整ったので、実際にKafkaと接続してイベントを読み取ります。
アプリケーション稼働前にKafka Brokerを動作させ、Topic purchases を作成してください。

また、Starterアプリケーションには既にOpen Libertyを起動するMaven Pluginが組み込まれています。以下のコマンドでOpen Libertyを起動します。ビルドされたWARファイルは自動的にOpen Libertyサーバにデプロイされます。

$ mvn liberty:dev

大量のログが出力されますが、Successfully joined group with generation Generation というログが出力されていればKafka Brokerに接続してイベントを読み出す準備ができています。

kafka-console-producer 等のProducerからイベントを送信してみます。

$ kafka-console-producer.sh --bootstrap-server localhost:9092 \
    --topic purchases \
    --property "parse.key=true" --property "key.separator=:"

>alice:apple
>bob:banana
>cris:orange
>^C

Open Libertyアプリケーション側には、ほぼリアルタイムにイベントのValueがログ出力されています。

...
[INFO] [2022/11/06 22:36:06:474 JST] 0000003c com.example.resource.PurchaseResource                        I Consumed item purchase: item name = apple
[INFO] [2022/11/06 22:36:08:018 JST] 0000003a com.example.resource.PurchaseResource                        I Consumed item purchase: item name = banana
[INFO] [2022/11/06 22:36:10:404 JST] 0000003d com.example.resource.PurchaseResource                        I Consumed item purchase: item name = orange

最終系の構成

アプリケーションの最終的なディレクトリ構成は以下の通りです。
/path/to/project は任意のプロジェクトルートのパスが入ります。

$ tree /path/to/project

/path/to/project
├── pom.xml
└── src
    └── main
        ├── java
        │   └── com
        │       └── example
        │           └── resource
        │               └── PurchaseResource.java
        ├── liberty
        │   └── config
        │       └── server.xml
        └── resources
            └── META-INF
                └── microprofile-config.properties

参考

2
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
0