この記事について
以下の記事の続きになります。引き続きAmazon SQSにアクセスするJavaアプリケーションの開発について紹介します。
環境についてはSQSの代わりにElasticMQコンテナを使用し、キューは作成済みのスタンダードキューを使用します。
Amazon SQSを利用するJavaアプリを開発する(その1)
JMSの利用
前の記事ではAWS SDKを使用してSQSアクセスを行いましたが、Javaでメッセージングと言えば標準仕様であるJMS(Java Messaging Service)が思い浮かぶ方も多いかと思います。
JMSについては、SQS向けにAmazon SQS Java Messaging Libraryが提供されていますので、それを使用したSQSへのアクセス方法を紹介します。このライブラリにより、既存のJMSアプリケーションに対して最小限の変更でSQSを利用できるようになります。
JMS と Amazon SQS の使用 - Amazon Simple Queue Service
注意点としてはライブラリのサポート対象はJMS1.1の一部(ポイントツーポイントモデルにおけるメッセージ送受信)に限定されているというところですね。
準備
Amazon SQS Java Messaging Libraryを利用するには、SDKと共にライブラリの依存関係を追加する必要があります。Mavenを使用する場合はpom.xmlに以下のように定義してください。
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>sqs</artifactId>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>amazon-sqs-java-messaging-lib</artifactId>
<version>1.0.4</version>
<type>jar</type>
</dependency>
SQS(ElasticMQ)への接続
SQSへの接続にはライブラリ提供のConnectionFactoryを使用します。今回はElasticMQコンテナを使用するので、前の記事同様エンドポイントURLを上書き設定する必要があります。
また、SDKで直接接続する場合とは異なるクライアントビルダーを使用します。
なお、ここからのコード例は以下を参考にしています。
JMS を使用した Amazon SQS スタンダード キューでの Java の使用例
AmazonSQSClientBuilder builder = AmazonSQSClientBuilder.standard();
builder.setEndpointConfiguration(new EndpointConfiguration("http://localhost:9324", "us-east-1"));
SQSConnectionFactory connectionFactory = new SQSConnectionFactory(
new ProviderConfiguration(),
builder
);
SQSConnection connection = connectionFactory.createConnection();
セッションの確立からメッセージ送信
接続した後にJMSセッションを確立すると、メッセージの送受信が可能になります。ここ以降はJMS標準のインターフェースを使用することなります。以下はメッセージ送信を行うコード例です。
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue("testqueue");
MessageProducer producer = session.createProducer(queue);
TextMessage message = session.createTextMessage("Hello World!");
producer.send(message);
メッセージの受信
メッセージの受信は以下のように実装します。
Message receivedMessage = consumer.receive(1000);
if (receivedMessage != null) {
System.out.println("Received: " + ((TextMessage) receivedMessage).getText());
}
JMSのリスナーによるメッセージの非同期受信も可能です。
MessageConsumer consumer = session.createConsumer(queue);
consumer.setMessageListener(new MyListener());
connection.start();
public class MyListener implements MessageListener {
@Override
public void onMessage(Message message) {
// メッセージを処理
}
}
SQSからの切断
SDKの場合と同様に切断します。
connection.close();
JMSによるSQSアクセスはここまでです。JMSの知識のある方なら同様の実装で利用できることがわかるかと思います。
Spring JMSの利用
標準と言えばJMSになりますが、最近はSpring BootやSpring JMSを利用して開発されてる方も多いはず、ということで、最後にこれらを利用する場合のアクセス方法についてご紹介します。
ここでは以下リンク先の内容を参考にさせて頂きました。
Developing messaging system with Spring Boot, JMS and AWS SQS
準備
ここではpom.xmlにSpring-JMSとSpring boot starterの依存関係を追加します。
Springには様々なコンポーネントが存在しますが、以降のコード例を動作させる場合は、これまでの定義に以下内容を追記します。
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-tomcat</artifactId>
<version>2.1.2.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>5.1.4.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jms</artifactId>
<version>5.1.4.RELEASE</version>
</dependency>
SQS(ElasticMQ)への接続
Spring BootではJavaでBean定義を実装することができるので、Spring JMSを有効にするためのConfigクラスを実装します。
EnableJmsアノテーションを付与することによりSpring JMSが有効になります。
また、ConnectionFactoryの生成(内容はJMSの場合と同じです)や、それを使用するJmsTemplateのBean定義の実装を行います。
@EnableJms
@Configuration
public class JmsConfig {
public SQSConnectionFactory sqsConnectionFactory() {
AmazonSQSClientBuilder builder = AmazonSQSClientBuilder.standard();
builder.setEndpointConfiguration(new EndpointConfiguration("http://localhost:9324", "us-east-1"));
SQSConnectionFactory connectionFactory = new SQSConnectionFactory(
new ProviderConfiguration(),
builder
);
return connectionFactory;
}
@Bean
public JmsTemplate defaultJmsTemplate() {
return new JmsTemplate(sqsConnectionFactory());
}
JmsTemplateを使用したメッセージ送信
Configクラスで定義されたJmsTemplateをインジェクションすることにより、メッセージの送受信を行うクラスではSQSへの接続を意識する必要は無くなります。
JmsTemplateはSpring JMSが提供するメッセージ送受信のためのTemplateクラスで、単純な送受信処理であれば非常に簡単に実装することが可能です。
@Autowired
private JmsTemplate jmsTemplate;
...
public void sendToSQS(String destination, String message) {
jmsTemplate.convertAndSend(destination, message);
}
メッセージ受信
Spring JMSではMDP(Message-Driven POJOs)と呼ばれる、メッセージ駆動で処理を行うPOJOを実装できます。
以下がMDPによるメッセージ受信のコード例です。JmsListenerアノテーションを付与することにより、destination属性に指定されたキューにメッセージが入るとそれを受信してメソッドの処理が呼び出されます。
@JmsListener(destination="testqueue")
public void receive(TextMessage message) {
System.out.println("received: "+message);
}
なお、MDPを使用するためにConfigクラスに以下のようなListenerContainerFactoryのBean定義を実装しておく必要があります。
@Bean
public DefaultJmsListenerContainerFactory jmsListenerContainerFactory() {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(sqsConnectionFactory());
factory.setDestinationResolver(new DynamicDestinationResolver());
factory.setConcurrency("3-10");
factory.setSessionAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE);
return factory;
}
SQSからの切断
Springが内部で行ってくれるため、アプリケーションで意識する必要はありません。
Spring JMSを利用することによりかなりシンプルな方法でSQSへのアクセスが実施できることがわかります。
SQSの代替としてElasticMQを使用する場合、接続のためにエンドポイントURLを上書き設定する必要がある点が気になるところですが、SpringのProfile機能等をうまく使えば環境ごとの切り替えもスマートに行えるのではないかと思います。
終わりに
実際のSQSで検証できるのはいつになるやら・・・。