この記事について
とある仕事でAmazon SQSをJavaアプリケーションで利用する方法について調査しました。
結局SQSの使用することは無かったのですが、調査した内容をここでご紹介します。
Amazon SQSとは
Amazon SQS(Simple Queue Service)はAWSで利用可能なマネージド型のメッセージキューイングサービスです。
AWS SDKを使用して様々なアプリケーションから利用できます。詳しくは以下。
Amazon Simple Queue Service のドキュメント
ElasticMQの利用
実は諸般の事情でAWSアカウントを利用することができませんでした。
そこでSQS互換のインターフェースでメッセージングが利用できるElasticMQを使用してみました。
今回は以下のDockerイメージを利用します。
$ docker run --name test-sqs -p 9324:9324 softwaremill/elasticmq
Unable to find image 'softwaremill/elasticmq:latest' locally
latest: Pulling from softwaremill/elasticmq
・・・
07:14:26.858 [main] INFO org.elasticmq.server.Main$ - Starting ElasticMQ server (0.14.7) ...
07:14:27.625 [elasticmq-akka.actor.default-dispatcher-3] INFO akka.event.slf4j.Slf4jLogger - Slf4jLogger started
07:14:28.722 [elasticmq-akka.actor.default-dispatcher-3] INFO o.e.rest.sqs.TheSQSRestServerBuilder - Started SQS rest server, bind address 0.0.0.0:9324, visible server address http://localhost:9324
07:14:28.723 [main] INFO org.elasticmq.server.Main$ - === ElasticMQ server (0.14.7) started in 2402 ms ===
AWS CLIでアクセスしてみます。事前にダミーのCredentialを設定しておきます。
$ aws configure
AWS Access Key ID [None]: xxxxxxxxxxxxxxxxxx
AWS Secret Access Key [None]: xxxxxxxxxxxxxxxxx
・・・
キューを作成して、メッセージを送信してみます。(SQSにはスタンダードキューとFIFOキューの2種類のキューが作成可能ですが、この記事ではスタンダードキューを使用します)
ローカルで稼働するElasticMQコンテナにアクセスするため、--endpoint-urlオプションを指定する必要があります。
$ aws sqs create-queue --queue-name test --endpoint-url http://localhost:9324
{
"QueueUrl": "http://localhost:9324/queue/testqueue"
}
$ aws sqs send-message --queue-url http://localhost:9324/queue/testqueue --message-body "THIS IS TEST MESSAGE." --endpoint-url http://localhost:9324
{
"MD5OfMessageBody": "81fe6881acb296073763f9e5af225aa9",
"MD5OfMessageAttributes": "d41d8cd98f00b204e9800998ecf8427e",
"MessageId": "bfb653eb-390a-45d1-bf26-bf52d795099e"
}
キューに格納されたメッセージを受信します。
$ aws sqs receive-message --queue-url http://localhost:9324/queue/testqueue --endpoint-url http://localhost:9324
{
"Messages": [
{
"MessageId": "bfb653eb-390a-45d1-bf26-bf52d795099e",
"ReceiptHandle": "bfb653eb-390a-45d1-bf26-bf52d795099e#eea50e0b-dd35-4caf-bce8-479f4b1168f3",
"MD5OfBody": "81fe6881acb296073763f9e5af225aa9",
"Body": "THIS IS TEST MESSAGE."
}
]
}
ちなみに、SQSでは受信されたメッセージはキューから自動的に消えるわけではありません。メッセージの受信の後、一定時間経過すると再度メッセージは受信可能となります(可視性タイムアウト)。そのため以下のように受信したメッセージは明示的に削除しておきます。
$ aws sqs delete-message --queue-url http://localhost:9324/queue/testqueue --endpoint-url http://localhost:9324 --receipt-handle bfb653eb-390a-45d1-bf26-bf52d795099e#28831fc9-1c77-44e1-813d-7715bde62312
このようにElasticMQでもAWS CLIを使ってSQSと同様に利用できることがわかります。
AWS SDK for Javaを利用してアクセス
いよいよ本題に入って、SQSにアクセスするJavaアプリの開発方法を説明します。
まずはAWSが提供しているAWS SDK for Java 2.0を使用します。
AWS SDK for Java 2.0 の開始方法 - AWS SDK for Java
Mavenを使っていれば、以下のように依存関係をpom.xmlに追加することになります。
(SDK全体ではなく、SQS用の依存関係のみ追加しています)
<dependencyManagement>
<dependencies>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>bom</artifactId>
<version>2.7.0</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
・・・
<dependencies>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>sqs</artifactId>
</dependency>
</dependencies>
SQS(ElasticMQ)への接続
ようやくJavaのコーディングに入ります。まずサービスへの接続ですが、SDK提供のクライアントビルダーに対して適切なCredentialやエンドポイントURLを設定する必要があります。AWS CLIのところで前述したように、ElasticMQに接続するためにエンドポイントURLを明示的に上書きしておかなければなりません(endpointOverride()の呼び出し)。
SqsClient sqsClient = SqsClient.builder()
.credentialsProvider(DefaultCredentialsProvider.create())
.region(Region.US_EAST_1)
.endpointOverride(new URI("http://localhost:9324")) // ElasticMQコンテナ向けのURL
.build();
キュー情報へのアクセス
ElasticMQ上に作成済みのキューのリストや、先ほどCLIで作成したキューのURLを取得してみましょう。
ここからは以下リンク先でAWSが提供するコード例の確認になります。
AWS SDK for Java を使用した Amazon SQS の例
String prefix = "test";
ListQueuesRequest listQueuesRequest = ListQueuesRequest.builder().queueNamePrefix(prefix).build();
ListQueuesResponse listQueuesResponse = sqsClient.listQueues(listQueuesRequest);
for (String url : listQueuesResponse.queueUrls()) {
System.out.println(url);
}
String queueName = "testqueue";
GetQueueUrlResponse getQueueUrlResponse =
sqsClient.getQueueUrl(GetQueueUrlRequest.builder().queueName(queueName).build());
String queueUrl = getQueueUrlResponse.queueUrl();
System.out.println(queueUrl);
メッセージの送信と受信
上で取得したキューのURLを使用して、メッセージの送信と受信を行います。
ここでは詳しく紹介しませんが、SDKでは複数メッセージのバッチ送信もサポートしています。
// メッセージ送信
SendMessageResponse res =
sqsClient.sendMessage(
SendMessageRequest.builder()
.queueUrl(queueUrl)
.messageBody("Hello world!")
.delaySeconds(10)
.build());
// メッセージ受信
ReceiveMessageRequest receiveMessageRequest =
ReceiveMessageRequest.builder()
.queueUrl(queueUrl)
.maxNumberOfMessages(5)
.build();
List<Message> messages= sqsClient.receiveMessage(receiveMessageRequest).messages();
messages.forEach(m -> System.out.println(m.body()));
そしてCLIのときと同様に削除も忘れずにやっておきます。
messages.forEach(m -> {
DeleteMessageRequest deleteMessageRequest = DeleteMessageRequest.builder()
.queueUrl(queueUrl)
.receiptHandle(m.receiptHandle())
.build();
DeleteMessageResponse delRes =
sqsClient.deleteMessage(deleteMessageRequest);
});
SQSからの切断
処理が完了したら接続をクローズします。close()を呼び出すだけです。
sqsClient.close();
さて次回は
いかがでしょうか。若干コードが冗長かな、というのはあるかもしれません。
接続のところに気をつければElasticMQも開発に十分使えそうです。
次回はJMSやSpringを利用したアクセスに挑戦したいと思います。