この記事ではAmazonMQをブローカーとしたPub/Sub通信を実現します。
前回のAmazonMQの起動を行う記事は以下になりますので、まだの人はこちらから確認お願いします。
https://qiita.com/sorasoraso/items/3f2d549b35f01896dedd
Pub/Sub通信とは
ソフトウェアアーキテクチャの一つで、非同期で疎結合なメッセージ通信を実現するための仕組みです。
関連用語
Publisher(発行者)
メッセージを送る側です。特定の「トピック」に対してメッセージを送信します。
Subscriber(購読者)
メッセージを受け取る側です。興味のある「トピック」を監視を行い、関連するメッセージが届くと受信します。
Broker(仲介者)
PublisherとSubscriberの間を取り持つシステム。メッセージの配信を管理します。今回はAmazonMQのActiveMQがブローカーに当たります。
パブリッシャーとサブスクライバーを実装し、Pub/Sub通信の動作確認を行いたいと思います。
パブリッシャーの実装
パブリッシャーとしてメッセージをブローカーに送信するプロジェクトを作成します。
実行環境
・Java21
・IDE:Eclipse
・ビルドプラグイン:Maven
・プロトコル:AMQP
pom.xml
pom.xmlのdependenciesタグには以下の内容を記載しました。
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.qpid/qpid-jms-client -->
<dependency>
<groupId>org.apache.qpid</groupId>
<artifactId>qpid-jms-client</artifactId>
<version>2.7.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/jakarta.jms/jakarta.jms-api -->
<dependency>
<groupId>jakarta.jms</groupId>
<artifactId>jakarta.jms-api</artifactId>
<version>3.1.0</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<scope>runtime</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
jakarta.jmsは使用するJavaバージョンによっては使用することができないため、javax.jmsのライブラリに切り替える必要がある。
パブリッシュを実装するクラス
前回作成したAmazonMQで確認できたエンドポイント情報を使用します。

今回はプロトコルにAMQPを使用するため、AMQPのエンドポイントをコピーしておきます。
package com.example.demo;
import jakarta.jms.TextMessage;
import jakarta.jms.Topic;
import jakarta.jms.TopicConnection;
import jakarta.jms.TopicConnectionFactory;
import jakarta.jms.TopicPublisher;
import jakarta.jms.TopicSession;
import org.apache.qpid.jms.JmsConnectionFactory;
public class AmazonmqTestProjectApplication {
public static void main(String[] args){
try {
// 送信するメッセージ
String tjm = "テストメッセージ2";
// JMSサーバ接続情報
String jmsServName = {{AmazonMQのAMQPのエンドポイント}};
String jmsServPort = "5671";
// 該当のトピック名が存在しない場合はブローカーがトピックを自動生成します。
String jmsTopicNm = "testTopic";
String clientUser = {{AmazonMQで作成したユーザー名}};
String clientPass = {{AmazonMQで作成したパスワード}};
// JMSサーバのURLを生成
String broker_url = "amqps://" + jmsServName + ":" + jmsServPort;
// JMSコネクションファクトリの取得
TopicConnectionFactory factory = new JmsConnectionFactory(clientUser, clientPass, broker_url);
System.out.println("OK");
// JMSコネクションファクトリからJMSコネクションを取得
TopicConnection connection = factory.createTopicConnection();
TopicSession session = connection.createTopicSession(false, TopicSession.AUTO_ACKNOWLEDGE);
// コネクションの開始
connection.start();
System.out.println("OK");
// トピックの生成
Topic topic = session.createTopic(jmsTopicNm);
// メッセージプロデューサの生成
TopicPublisher publisher = session.createPublisher(topic);
// メッセージの生成
TextMessage msg = session.createTextMessage(tjm);
System.out.println(msg);
// メッセージの送信
publisher.publish(msg);
//publisher.publish(msg, DeliveryMode.NON_PERSISTENT, 1, 2000);
msg.acknowledge();
// コネクションの終了
publisher.close();
session.close();
connection.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
サブスクライバーの実装
サブスクライバーとしてブローカーのトピックにサブスクライブを行い、メッセージの取得とコンソール出力を行うプロジェクトを作成します。
実行環境
・Java21
・IDE:Eclipse
・ビルドプラグイン:Maven
・プロトコル:AMQP
pom.xml
pom.xmlのdependenciesタグには以下の内容を記載しました。
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.qpid/qpid-jms-client -->
<dependency>
<groupId>org.apache.qpid</groupId>
<artifactId>qpid-jms-client</artifactId>
<version>2.7.0</version>
</dependency>
<!--
https://mvnrepository.com/artifact/org.apache.activemq/activemq-client-jakarta -->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-client-jakarta</artifactId>
<version>6.1.0</version>
<type>pom</type>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<scope>runtime</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
サブスクライブを実装するクラス
エンドポイント情報や認証情報はパブリッシャーと同じものを使用します。
package com.example.demo;
import jakarta.jms.Topic;
import jakarta.jms.TopicConnection;
import jakarta.jms.TopicConnectionFactory;
import jakarta.jms.TopicSession;
import jakarta.jms.TopicSubscriber;
import org.apache.qpid.jms.JmsConnectionFactory;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class AmazonmqSubscribe2ProjectApplication implements CommandLineRunner {
TopicConnection connection = null;
// JMSセッション生成用
TopicSession session = null;
// トピックの生成用
Topic topic = null;
// メッセージコンシューマの生成用
TopicSubscriber subscriber = null;
public static void main(String[] args) {
SpringApplication.run(AmazonmqSubscribe2ProjectApplication.class, args);
System.out.println("runOK");
}
@Override
public void run(String... args) throws Exception {
// JMSサーバ接続情報
String jmsServName = {{AmazonMQのAMQPのエンドポイント}};
String jmsServPort = "5671";
String jmsTopicNm = "testTopic";
String clientUser = {{AmazonMQで作成したユーザー名}};
String clientPass = {{AmazonMQで作成したパスワード}};
// JMSサーバのURLを生成
String broker_url = "amqps://" + jmsServName + ":" + jmsServPort;
try {
// JMSコネクションファクトリの取得
TopicConnectionFactory factory = new JmsConnectionFactory(clientUser, clientPass, broker_url);
// JMSコネクションファクトリからJMSコネクションを取得
connection = factory.createTopicConnection();
connection.setClientID("サブスクライバー"); // 一意のclientIDを設定
// JMSセッションの生成
session = connection.createTopicSession(false, TopicSession.AUTO_ACKNOWLEDGE);
// トピックの生成
topic = session.createTopic(jmsTopicNm);
// メッセージコンシューマの生成
subscriber = session.createDurableSubscriber(topic, "サブスクライバー");
subscriber.setMessageListener(new Subscriber());
// コネクションの開始
connection.start();
// メインスレッドは終了させないように待機
Thread.currentThread().join();
} catch (Exception e) {
e.printStackTrace();
} finally {
if (connection != null) {
// コネクションの終了
subscriber.close();
session.close();
connection.close();
}
}
}
}
メッセージを出力するクラス
取得したメッセージをコンソールに出力するクラスを作成します。
package com.example.demo;
import jakarta.jms.JMSException;
import jakarta.jms.Message;
import jakarta.jms.MessageListener;
import jakarta.jms.TextMessage;
public class Subscriber implements MessageListener{
public void onMessage(Message message) {
if (message instanceof TextMessage textMessage) {
try {
System.out.println(textMessage.getText());
}
catch (JMSException ex) {
throw new RuntimeException(ex);
}
}
else {
throw new IllegalArgumentException("Message must be of type TextMessage");
}
}
}
このようにパブリッシャーとサブスクライバーは別々のプロジェクトとして動作します。
pub/sub通信の実行確認
パブリッシャーの動作確認
まずはパブリッシャーからメッセージをブローカーに送信します。
作成したプロジェクトを実行し、エラーなく動作したことを確認します。
ActiveMQのコンソール画面を確認すると「testTopic」というトピックにメッセージが1件送信されていることがわかります。

サブスクライバーの動作確認
サブスクライバーのプロジェクトを実行すると、メッセージが送られてくるのを待つ状態になります。
この状態でActiveMQのコンソール画面にてSubscribersタブを確認すると今サブスクライブしているアクティブな通信を一覧で確認することができます。

正常にサブスクライブされていることが確認できたため、パブリッシュを再度行います。
すると、コンソールログにパブリッシュで設定しているメッセージが表示されることを確認できます。
これにてPub/Sub通信の実行確認が完了しました。
さいごに
Pub/Sub通信には今回説明していない設定値もたくさんあります。
・永続サブスクライバー(サブスクライバーがオフラインの間もメッセージを保持できる設定)
・ActiveMQからのメッセージ送信
・メッセージ再送処理
・トピック名の階層構造
・トピック名のワイルドカード
・ブローカーの冗長構成とフェイルオーバー
などなど
要件に合わせて色々調べてみるのもいいかもしれません。