3
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

AmazonMQを使用したPub/Sub通信(プログラム編)

Last updated at Posted at 2025-09-30

この記事ではAmazonMQをブローカーとしたPub/Sub通信を実現します。
前回のAmazonMQの起動を行う記事は以下になりますので、まだの人はこちらから確認お願いします。
https://qiita.com/sorasoraso/items/3f2d549b35f01896dedd

Pub/Sub通信とは

ソフトウェアアーキテクチャの一つで、非同期で疎結合なメッセージ通信を実現するための仕組みです。

関連用語

Publisher(発行者)

メッセージを送る側です。特定の「トピック」に対してメッセージを送信します。

Subscriber(購読者)

メッセージを受け取る側です。興味のある「トピック」を監視を行い、関連するメッセージが届くと受信します。

Broker(仲介者)

PublisherとSubscriberの間を取り持つシステム。メッセージの配信を管理します。今回はAmazonMQのActiveMQがブローカーに当たります。

パブリッシャーとサブスクライバーを実装し、Pub/Sub通信の動作確認を行いたいと思います。

パブリッシャーの実装

パブリッシャーとしてメッセージをブローカーに送信するプロジェクトを作成します。

実行環境

・Java21
・IDE:Eclipse
・ビルドプラグイン:Maven
・プロトコル:AMQP

pom.xml

pom.xmlのdependenciesタグには以下の内容を記載しました。

pom.xml
        <dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-activemq</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-amqp</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-web</artifactId>
		</dependency>

		<!-- https://mvnrepository.com/artifact/org.apache.qpid/qpid-jms-client -->
		<dependency>
			<groupId>org.apache.qpid</groupId>
			<artifactId>qpid-jms-client</artifactId>
			<version>2.7.0</version>
		</dependency>

		<!-- https://mvnrepository.com/artifact/jakarta.jms/jakarta.jms-api -->
		<dependency>
			<groupId>jakarta.jms</groupId>
			<artifactId>jakarta.jms-api</artifactId>
			<version>3.1.0</version>
		</dependency>

		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-devtools</artifactId>
			<scope>runtime</scope>
			<optional>true</optional>
		</dependency>
		<dependency>
			<groupId>org.projectlombok</groupId>
			<artifactId>lombok</artifactId>
			<optional>true</optional>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-actuator</artifactId>
		</dependency>

jakarta.jmsは使用するJavaバージョンによっては使用することができないため、javax.jmsのライブラリに切り替える必要がある。

パブリッシュを実装するクラス

前回作成したAmazonMQで確認できたエンドポイント情報を使用します。
image.png
今回はプロトコルにAMQPを使用するため、AMQPのエンドポイントをコピーしておきます。

AmazonmqTestProjectApplication.java
package com.example.demo;


import jakarta.jms.TextMessage;
import jakarta.jms.Topic;
import jakarta.jms.TopicConnection;
import jakarta.jms.TopicConnectionFactory;
import jakarta.jms.TopicPublisher;
import jakarta.jms.TopicSession;

import org.apache.qpid.jms.JmsConnectionFactory;


public class AmazonmqTestProjectApplication {	
	public static void main(String[] args){
        try {
            // 送信するメッセージ
            String tjm = "テストメッセージ2";

            // JMSサーバ接続情報
            String jmsServName = {{AmazonMQのAMQPのエンドポイント}};
            String jmsServPort = "5671";
            // 該当のトピック名が存在しない場合はブローカーがトピックを自動生成します。
            String jmsTopicNm  = "testTopic"; 
            String clientUser = {{AmazonMQで作成したユーザー名}};
            String clientPass = {{AmazonMQで作成したパスワード}};

            // JMSサーバのURLを生成
            String broker_url = "amqps://" + jmsServName + ":" + jmsServPort;
            // JMSコネクションファクトリの取得
            TopicConnectionFactory factory = new JmsConnectionFactory(clientUser, clientPass, broker_url);
            System.out.println("OK");

			// JMSコネクションファクトリからJMSコネクションを取得
			TopicConnection connection = factory.createTopicConnection();
			TopicSession session = connection.createTopicSession(false, TopicSession.AUTO_ACKNOWLEDGE);

			// コネクションの開始
			connection.start();
			System.out.println("OK");

			// トピックの生成
            Topic topic = session.createTopic(jmsTopicNm);
            // メッセージプロデューサの生成
            TopicPublisher publisher = session.createPublisher(topic);

            // メッセージの生成
            TextMessage msg = session.createTextMessage(tjm);
            System.out.println(msg);

            // メッセージの送信
            publisher.publish(msg);
            //publisher.publish(msg, DeliveryMode.NON_PERSISTENT, 1, 2000);
            
            msg.acknowledge();

            // コネクションの終了
            publisher.close();
            session.close();
            connection.close();

        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

サブスクライバーの実装

サブスクライバーとしてブローカーのトピックにサブスクライブを行い、メッセージの取得とコンソール出力を行うプロジェクトを作成します。

実行環境

・Java21
・IDE:Eclipse
・ビルドプラグイン:Maven
・プロトコル:AMQP

pom.xml

pom.xmlのdependenciesタグには以下の内容を記載しました。

pom.xml
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-activemq</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-amqp</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-web</artifactId>
		</dependency>

		<!-- https://mvnrepository.com/artifact/org.apache.qpid/qpid-jms-client -->
		<dependency>
			<groupId>org.apache.qpid</groupId>
			<artifactId>qpid-jms-client</artifactId>
			<version>2.7.0</version>
		</dependency>

		<!--
		https://mvnrepository.com/artifact/org.apache.activemq/activemq-client-jakarta -->
		<dependency>
			<groupId>org.apache.activemq</groupId>
			<artifactId>activemq-client-jakarta</artifactId>
			<version>6.1.0</version>
			<type>pom</type>
		</dependency>

		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-web</artifactId>
		</dependency>

		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-devtools</artifactId>
			<scope>runtime</scope>
			<optional>true</optional>
		</dependency>
		<dependency>
			<groupId>org.projectlombok</groupId>
			<artifactId>lombok</artifactId>
			<optional>true</optional>
		</dependency>

サブスクライブを実装するクラス

エンドポイント情報や認証情報はパブリッシャーと同じものを使用します。

AmazonmqTestProjectApplication.java
package com.example.demo;

import jakarta.jms.Topic;
import jakarta.jms.TopicConnection;
import jakarta.jms.TopicConnectionFactory;
import jakarta.jms.TopicSession;
import jakarta.jms.TopicSubscriber;

import org.apache.qpid.jms.JmsConnectionFactory;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class AmazonmqSubscribe2ProjectApplication implements CommandLineRunner {

	TopicConnection connection = null;

	// JMSセッション生成用
	TopicSession session = null;

	// トピックの生成用
	Topic topic = null;

	// メッセージコンシューマの生成用
	TopicSubscriber subscriber = null;
	
	public static void main(String[] args) {
		SpringApplication.run(AmazonmqSubscribe2ProjectApplication.class, args);
		System.out.println("runOK");
	}

	@Override
	public void run(String... args) throws Exception {
		// JMSサーバ接続情報
		String jmsServName = {{AmazonMQのAMQPのエンドポイント}};
		String jmsServPort = "5671";
		String jmsTopicNm = "testTopic";
		String clientUser = {{AmazonMQで作成したユーザー名}};
		String clientPass = {{AmazonMQで作成したパスワード}};
		// JMSサーバのURLを生成
		String broker_url = "amqps://" + jmsServName + ":" + jmsServPort;
		
		try {
			// JMSコネクションファクトリの取得
			TopicConnectionFactory factory = new JmsConnectionFactory(clientUser, clientPass, broker_url);

			// JMSコネクションファクトリからJMSコネクションを取得
			connection = factory.createTopicConnection();
			connection.setClientID("サブスクライバー"); // 一意のclientIDを設定

			// JMSセッションの生成
			session = connection.createTopicSession(false, TopicSession.AUTO_ACKNOWLEDGE);

			// トピックの生成
			topic = session.createTopic(jmsTopicNm);

			// メッセージコンシューマの生成
			subscriber = session.createDurableSubscriber(topic, "サブスクライバー");

			subscriber.setMessageListener(new Subscriber());
			
			// コネクションの開始
			connection.start();

			// メインスレッドは終了させないように待機
			Thread.currentThread().join();
		} catch (Exception e) {
			e.printStackTrace();
		} finally {
			if (connection != null) {
				// コネクションの終了
				subscriber.close();
				session.close();
				connection.close();
			}
		}		
	}
}

メッセージを出力するクラス

取得したメッセージをコンソールに出力するクラスを作成します。

Subscriber.java
package com.example.demo;

import jakarta.jms.JMSException;
import jakarta.jms.Message;
import jakarta.jms.MessageListener;
import jakarta.jms.TextMessage;

public class Subscriber implements MessageListener{
	
	public void onMessage(Message message) {
		if (message instanceof TextMessage textMessage) {
			try {
				System.out.println(textMessage.getText());
			}
			catch (JMSException ex) {
				throw new RuntimeException(ex);
			}
		}
		else {
			throw new IllegalArgumentException("Message must be of type TextMessage");
		}
	}
}

このようにパブリッシャーとサブスクライバーは別々のプロジェクトとして動作します。

pub/sub通信の実行確認

パブリッシャーの動作確認

まずはパブリッシャーからメッセージをブローカーに送信します。
作成したプロジェクトを実行し、エラーなく動作したことを確認します。
ActiveMQのコンソール画面を確認すると「testTopic」というトピックにメッセージが1件送信されていることがわかります。
image.png

サブスクライバーの動作確認

サブスクライバーのプロジェクトを実行すると、メッセージが送られてくるのを待つ状態になります。
この状態でActiveMQのコンソール画面にてSubscribersタブを確認すると今サブスクライブしているアクティブな通信を一覧で確認することができます。
image.png
正常にサブスクライブされていることが確認できたため、パブリッシュを再度行います。
すると、コンソールログにパブリッシュで設定しているメッセージが表示されることを確認できます。
これにてPub/Sub通信の実行確認が完了しました。

さいごに

Pub/Sub通信には今回説明していない設定値もたくさんあります。
・永続サブスクライバー(サブスクライバーがオフラインの間もメッセージを保持できる設定)
・ActiveMQからのメッセージ送信
・メッセージ再送処理
・トピック名の階層構造
・トピック名のワイルドカード
・ブローカーの冗長構成とフェイルオーバー
などなど
要件に合わせて色々調べてみるのもいいかもしれません。

3
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
3
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?