はじめに
この記事はWebSphere 25周年、DB2/MQ30周年 アニバーサリー記念 Qiita記事投稿キャンペーンの参加記事です。
Libertyから、Event Streams(Kafka)へメッセージを送受信する簡単なアプリを作成してみました。
行ったこと
- IBM Cloud 無料版Event Streamsを利用
- Spring Bootを利用したWebアプリを作成
- ローカル環境のLibertyからEvent Streamsに接続
IBM Cloud 無料版Event Streamsを利用
IBM Cloudにあるフル・マネージドの Kafka サービスであるEvent Streamsの無料のライト・プランを利用します。ライト・プランはお試し用で、リージョンがダラスに限られる、Partition1個、Kafka Connect, Streamsが利用できないなど制約事項があります。
プロビジョニング後はすぐに利用することができます。
トピック "libertyTopic" を作成します。メッセージ保存期間を設定することができます。ライト・プランは、Partitionは1個です。
サービス資格情報を作成します。表右のコピーをクリックして内容をテキスト・ファイルにコピーします。
接続に必要な情報になります。
{
"api_key": "xxxxxxxxxxxxxxxxxxxxx",
"apikey": "xxxxxxxxxxxxxxxxxxxxx",
"bootstrap_endpoints": "broker-xxxx.ibm.com:9093,broker-yyyy.ibm.com:9093,broker-zzzz.ibm.com:9093,broker-aaaa.ibm.com:9093,broker-bbbb.ibm.com:9093,broker-cccc.ibm.com:9093",
"iam_apikey_description": "Auto-generated for key xxxxxxxxx,
"iam_apikey_name": "cre1",
"iam_role_crn": "xxxxxxxxx",
"iam_serviceid_crn": "xxxxxxxxxxx",
"instance_id": "aaaaaa-aaaa-bbbb-cccc-xxxxxxxxxx",
"kafka_admin_url": "https://xxxxxxxxxxx.ibm.com",
"kafka_brokers_sasl": [
"broker-xxxx.ibm.com:9093",
"broker-yyyy.ibm.com:9093",
"broker-zzzz.ibm.com:9093",
"broker-aaaa.ibm.com:9093",
"broker-bbbb.ibm.com:9093",
"broker-cccc.ibm.com:9093"
],
"kafka_http_url": "https://xxxxxxxxxxx.ibm.com",
"password": "XXXXXXXXXXXXXXXXXXXXXX",
"user": "token"
}
Spring Bootを利用したWebアプリを作成
Spring Bootを利用した、以下のような簡単なWebアプリを作成します。
- index.htmlからEvent Streamsにメッセージを送信
- 送信されたメッセージは、KafkaListnerでログに出力
pom.xmlへ、Spring BootとSpring Kafkaを依存関係に追加します。
-----略------
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-thymeleaf</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
application.propertiesに先程コピーしたEvent Streamsの接続情報を記述します。Event Streamsに接続するにはsecurity設定が必要です。
# Kafkaブローカーリスト
spring.kafka.bootstrap-servers=broker-xxxx.ibm.com:9093,broker-yyyy.ibm.com:9093,broker-zzzz.ibm.com:9093,broker-aaaa.ibm.com:9093,broker-bbbb.ibm.com:9093,broker-cccc.ibm.com:9093
# Producer設定
spring.kafka.producer.acks=1
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
#Consumer設定
spring.kafka.consumer.group-id=group_id
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
## Security options
spring.kafka.jaas.enabled=true
spring.kafka.properties.security.protocol=SASL_SSL
spring.kafka.properties.sasl.mechanism=PLAIN
spring.kafka.properties.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="token" password="XXXXXXXXXXXXXXXXXXXXXX";
メッセージ送信、送った内容を表示するHTMLファイルを2つ用意します。
<!DOCTYPE html><html>
<head>
<meta charset="UTF-8">
<title>Kafka Sender</title>
</head>
<body>
<h1>Hello Event Streams!</h1>
<form method="post" action="/EventApp/send">
<label for="message">Message:</label>
<input type="text" name="message" id="message">
<button type="submit">Send</button>
</form>
</body></html>
<!DOCTYPE html>
<html xmlns:th="http://www.thymeleaf.org">
<head>
<meta charset="UTF-8">
<title>Kafka Sender</title>
</head>
<body>
<p>Message sent: [[${message}]]</p>
</body>
</html>
JavaクラスはControllerクラスを1つ作成します。
- コンテキスト・ルートへのアクセスをindex.htmlへ
- Spring KafkaのKafkaTemplateを利用してメッセージを送信
- spring KafkaのKafkaListenrを利用してメッセージを受信し、SystemOutに出力
package com.example.demo;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.servlet.ModelAndView;
@Controller
public class MessageController {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@RequestMapping(value = "/", method = RequestMethod.GET)
public ModelAndView index(final ModelAndView mv) {
mv.setViewName("index");
return mv;
}
@RequestMapping(value = "/send", method = RequestMethod.POST)
public ModelAndView send(@RequestParam("message") final String message, final ModelAndView mv) {
kafkaTemplate.send("libertyTopic", message);
mv.setViewName("result");
mv.addObject("message", message);
return mv;
}
@KafkaListener(topics = "libertyTopic", groupId = "")
public void listen(String message) {
System.out.println("Received message: " + message);
}
}
ローカル環境のLibertyからEvent Streamsに接続
作成したアプリをLibertyにデプロイして稼働してみます。
複数メッセージを送信しログを確認すると以下のようにメッセージが受信されているのも確認できました。
まとめ
LibertyからEvent Streams(Kafka)を利用する簡単なアプリケーションを作成しました。