Edited at

Amazon SQSを利用するJavaアプリを開発する(その1)


この記事について

とある仕事でAmazon SQSをJavaアプリケーションで利用する方法について調査しました。

結局SQSの使用することは無かったのですが、調査した内容をここでご紹介します。


Amazon SQSとは

Amazon SQS(Simple Queue Service)はAWSで利用可能なマネージド型のメッセージキューイングサービスです。

AWS SDKを使用して様々なアプリケーションから利用できます。詳しくは以下。

Amazon Simple Queue Service のドキュメント


ElasticMQの利用

実は諸般の事情でAWSアカウントを利用することができませんでした。

そこでSQS互換のインターフェースでメッセージングが利用できるElasticMQを使用してみました。

ElasticMQ - Github

今回は以下のDockerイメージを利用します。

$ docker run --name test-sqs -p 9324:9324 softwaremill/elasticmq

Unable to find image 'softwaremill/elasticmq:latest' locally
latest: Pulling from softwaremill/elasticmq
・・・
07:14:26.858 [main] INFO org.elasticmq.server.Main$ - Starting ElasticMQ server (0.14.7) ...
07:14:27.625 [elasticmq-akka.actor.default-dispatcher-3] INFO akka.event.slf4j.Slf4jLogger - Slf4jLogger started
07:14:28.722 [elasticmq-akka.actor.default-dispatcher-3] INFO o.e.rest.sqs.TheSQSRestServerBuilder - Started SQS rest server, bind address 0.0.0.0:9324, visible server address http://localhost:9324
07:14:28.723 [main] INFO org.elasticmq.server.Main$ - === ElasticMQ server (0.14.7) started in 2402 ms ===

AWS CLIでアクセスしてみます。事前にダミーのCredentialを設定しておきます。

$ aws configure 

AWS Access Key ID [None]: xxxxxxxxxxxxxxxxxx
AWS Secret Access Key [None]: xxxxxxxxxxxxxxxxx
・・・

キューを作成して、メッセージを送信してみます。(SQSにはスタンダードキューとFIFOキューの2種類のキューが作成可能ですが、この記事ではスタンダードキューを使用します)

ローカルで稼働するElasticMQコンテナにアクセスするため、--endpoint-urlオプションを指定する必要があります。

$ aws sqs create-queue --queue-name test --endpoint-url http://localhost:9324

{
"QueueUrl": "http://localhost:9324/queue/testqueue"
}

$ aws sqs send-message --queue-url http://localhost:9324/queue/testqueue --message-body "THIS IS TEST MESSAGE." --endpoint-url http://localhost:9324
{
"MD5OfMessageBody": "81fe6881acb296073763f9e5af225aa9",
"MD5OfMessageAttributes": "d41d8cd98f00b204e9800998ecf8427e",
"MessageId": "bfb653eb-390a-45d1-bf26-bf52d795099e"
}

キューに格納されたメッセージを受信します。

$ aws sqs receive-message --queue-url http://localhost:9324/queue/testqueue --endpoint-url http://localhost:9324

{
"Messages": [
{
"MessageId": "bfb653eb-390a-45d1-bf26-bf52d795099e",
"ReceiptHandle": "bfb653eb-390a-45d1-bf26-bf52d795099e#eea50e0b-dd35-4caf-bce8-479f4b1168f3",
"MD5OfBody": "81fe6881acb296073763f9e5af225aa9",
"Body": "THIS IS TEST MESSAGE."
}
]
}

ちなみに、SQSでは受信されたメッセージはキューから自動的に消えるわけではありません。メッセージの受信の後、一定時間経過すると再度メッセージは受信可能となります(可視性タイムアウト)。そのため以下のように受信したメッセージは明示的に削除しておきます。

$ aws sqs delete-message --queue-url http://localhost:9324/queue/testqueue --endpoint-url http://localhost:9324 --receipt-handle bfb653eb-390a-45d1-bf26-bf52d795099e#28831fc9-1c77-44e1-813d-7715bde62312

このようにElasticMQでもAWS CLIを使ってSQSと同様に利用できることがわかります。


AWS SDK for Javaを利用してアクセス

いよいよ本題に入って、SQSにアクセスするJavaアプリの開発方法を説明します。

まずはAWSが提供しているAWS SDK for Java 2.0を使用します。

AWS SDK for Java 2.0 の開始方法 - AWS SDK for Java

Mavenを使っていれば、以下のように依存関係をpom.xmlに追加することになります。

(SDK全体ではなく、SQS用の依存関係のみ追加しています)

    <dependencyManagement>

<dependencies>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>bom</artifactId>
<version>2.7.0</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
   ・・・
<dependencies>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>sqs</artifactId>
</dependency>
</dependencies>


SQS(ElasticMQ)への接続

ようやくJavaのコーディングに入ります。まずサービスへの接続ですが、SDK提供のクライアントビルダーに対して適切なCredentialやエンドポイントURLを設定する必要があります。AWS CLIのところで前述したように、ElasticMQに接続するためにエンドポイントURLを明示的に上書きしておかなければなりません(endpointOverride()の呼び出し)。

    SqsClient sqsClient = SqsClient.builder()

.credentialsProvider(DefaultCredentialsProvider.create())
.region(Region.US_EAST_1)
.endpointOverride(new URI("http://localhost:9324")) // ElasticMQコンテナ向けのURL
.build();


キュー情報へのアクセス

ElasticMQ上に作成済みのキューのリストや、先ほどCLIで作成したキューのURLを取得してみましょう。

ここからは以下リンク先でAWSが提供するコード例の確認になります。

AWS SDK for Java を使用した Amazon SQS の例

    String prefix = "test";

ListQueuesRequest listQueuesRequest = ListQueuesRequest.builder().queueNamePrefix(prefix).build();
ListQueuesResponse listQueuesResponse = sqsClient.listQueues(listQueuesRequest);
for (String url : listQueuesResponse.queueUrls()) {
System.out.println(url);
}

String queueName = "testqueue";
GetQueueUrlResponse getQueueUrlResponse =
sqsClient.getQueueUrl(GetQueueUrlRequest.builder().queueName(queueName).build());
String queueUrl = getQueueUrlResponse.queueUrl();
System.out.println(queueUrl);


メッセージの送信と受信

上で取得したキューのURLを使用して、メッセージの送信と受信を行います。

ここでは詳しく紹介しませんが、SDKでは複数メッセージのバッチ送信もサポートしています。

// メッセージ送信

SendMessageResponse res =
sqsClient.sendMessage(
SendMessageRequest.builder()
.queueUrl(queueUrl)
.messageBody("Hello world!")
.delaySeconds(10)
.build());

// メッセージ受信
ReceiveMessageRequest receiveMessageRequest =
ReceiveMessageRequest.builder()
.queueUrl(queueUrl)
.maxNumberOfMessages(5)
.build();
List<Message> messages= sqsClient.receiveMessage(receiveMessageRequest).messages();
messages.forEach(m -> System.out.println(m.body()));

そしてCLIのときと同様に削除も忘れずにやっておきます。

    messages.forEach(m -> {

DeleteMessageRequest deleteMessageRequest = DeleteMessageRequest.builder()
.queueUrl(queueUrl)
.receiptHandle(m.receiptHandle())
.build();
DeleteMessageResponse delRes =
sqsClient.deleteMessage(deleteMessageRequest);
});


SQSからの切断

処理が完了したら接続をクローズします。close()を呼び出すだけです。

sqsClient.close();


さて次回は

いかがでしょうか。若干コードが冗長かな、というのはあるかもしれません。

接続のところに気をつければElasticMQも開発に十分使えそうです。

次回はJMSやSpringを利用したアクセスに挑戦したいと思います。