LoginSignup
5
0

LibertyからEvent Streams(Kafka)を利用してみた

Posted at

はじめに

この記事はWebSphere 25周年、DB2/MQ30周年 アニバーサリー記念 Qiita記事投稿キャンペーンの参加記事です。

Libertyから、Event Streams(Kafka)へメッセージを送受信する簡単なアプリを作成してみました。

行ったこと

  • IBM Cloud 無料版Event Streamsを利用
  • Spring Bootを利用したWebアプリを作成
  • ローカル環境のLibertyからEvent Streamsに接続

IBM Cloud 無料版Event Streamsを利用

IBM Cloudにあるフル・マネージドの Kafka サービスであるEvent Streamsの無料のライト・プランを利用します。ライト・プランはお試し用で、リージョンがダラスに限られる、Partition1個、Kafka Connect, Streamsが利用できないなど制約事項があります。

プロビジョニング後はすぐに利用することができます。
トピック "libertyTopic" を作成します。メッセージ保存期間を設定することができます。ライト・プランは、Partitionは1個です。

サービス資格情報を作成します。表右のコピーをクリックして内容をテキスト・ファイルにコピーします。

接続に必要な情報になります。

{
  "api_key": "xxxxxxxxxxxxxxxxxxxxx",
  "apikey": "xxxxxxxxxxxxxxxxxxxxx",
  "bootstrap_endpoints": "broker-xxxx.ibm.com:9093,broker-yyyy.ibm.com:9093,broker-zzzz.ibm.com:9093,broker-aaaa.ibm.com:9093,broker-bbbb.ibm.com:9093,broker-cccc.ibm.com:9093",
  "iam_apikey_description": "Auto-generated for key xxxxxxxxx,
  "iam_apikey_name": "cre1",
  "iam_role_crn": "xxxxxxxxx",
  "iam_serviceid_crn": "xxxxxxxxxxx",
  "instance_id": "aaaaaa-aaaa-bbbb-cccc-xxxxxxxxxx",
  "kafka_admin_url": "https://xxxxxxxxxxx.ibm.com",
  "kafka_brokers_sasl": [
    "broker-xxxx.ibm.com:9093",
    "broker-yyyy.ibm.com:9093",
    "broker-zzzz.ibm.com:9093",
    "broker-aaaa.ibm.com:9093",
    "broker-bbbb.ibm.com:9093",
    "broker-cccc.ibm.com:9093"
  ],
  "kafka_http_url": "https://xxxxxxxxxxx.ibm.com",
  "password": "XXXXXXXXXXXXXXXXXXXXXX",
  "user": "token"
}

Spring Bootを利用したWebアプリを作成

Spring Bootを利用した、以下のような簡単なWebアプリを作成します。

  • index.htmlからEvent Streamsにメッセージを送信
  • 送信されたメッセージは、KafkaListnerでログに出力

pom.xmlへ、Spring BootとSpring Kafkaを依存関係に追加します。

pom.xml
-----略------
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-thymeleaf</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-web</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.kafka</groupId>
			<artifactId>spring-kafka</artifactId>
		</dependency>

application.propertiesに先程コピーしたEvent Streamsの接続情報を記述します。Event Streamsに接続するにはsecurity設定が必要です。

application.properties
# Kafkaブローカーリスト
spring.kafka.bootstrap-servers=broker-xxxx.ibm.com:9093,broker-yyyy.ibm.com:9093,broker-zzzz.ibm.com:9093,broker-aaaa.ibm.com:9093,broker-bbbb.ibm.com:9093,broker-cccc.ibm.com:9093

# Producer設定
spring.kafka.producer.acks=1
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer

#Consumer設定
spring.kafka.consumer.group-id=group_id
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer


## Security options
spring.kafka.jaas.enabled=true
spring.kafka.properties.security.protocol=SASL_SSL
spring.kafka.properties.sasl.mechanism=PLAIN
spring.kafka.properties.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="token" password="XXXXXXXXXXXXXXXXXXXXXX";

メッセージ送信、送った内容を表示するHTMLファイルを2つ用意します。

index.html
<!DOCTYPE html><html>
<head>
    <meta charset="UTF-8">
    <title>Kafka Sender</title>
</head>
<body>
	<h1>Hello Event Streams!</h1>

    <form method="post" action="/EventApp/send">
        <label for="message">Message:</label>
        <input type="text" name="message" id="message">
        <button type="submit">Send</button>
    </form>
</body></html>
result.html
<!DOCTYPE html>
<html xmlns:th="http://www.thymeleaf.org">
<head>
    <meta charset="UTF-8">
    <title>Kafka Sender</title>
</head>
<body>
    <p>Message sent: [[${message}]]</p> 
</body>
</html>

JavaクラスはControllerクラスを1つ作成します。

  • コンテキスト・ルートへのアクセスをindex.htmlへ
  • Spring KafkaのKafkaTemplateを利用してメッセージを送信
  • spring KafkaのKafkaListenrを利用してメッセージを受信し、SystemOutに出力
MessageController.java
package com.example.demo;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.servlet.ModelAndView;

@Controller
public class MessageController {

	@Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

	@RequestMapping(value = "/", method = RequestMethod.GET)
    public ModelAndView index(final ModelAndView mv) {
		mv.setViewName("index");
        return mv;
    }

    @RequestMapping(value = "/send", method = RequestMethod.POST)
    public ModelAndView send(@RequestParam("message") final String message, final ModelAndView mv) {
    	kafkaTemplate.send("libertyTopic", message);
    	mv.setViewName("result");
        mv.addObject("message", message);
        return mv;
    }

    @KafkaListener(topics = "libertyTopic", groupId = "")
    public void listen(String message) {
        System.out.println("Received message: " + message);
    }    
}

ローカル環境のLibertyからEvent Streamsに接続

作成したアプリをLibertyにデプロイして稼働してみます。

メッセージを送信すると、結果画面が表示されます。

複数メッセージを送信しログを確認すると以下のようにメッセージが受信されているのも確認できました。

まとめ

LibertyからEvent Streams(Kafka)を利用する簡単なアプリケーションを作成しました。

5
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
5
0