Help us understand the problem. What is going on with this article?

AWS SQSをひと通り(Simple,Batch,DeadLetter,LongPolling)

More than 5 years have passed since last update.

はじめに

SQSとSDKを使ってあれこれやります。SQSはAWSの中でも古いサービスだってことらしいのでSDK javaの情報って結構出回っているのかなと思いきや、意外になかったようでして、今回ひと通り試してみました。

1.シンプルなメッセージのやりとり
2.Batch処理を使ったメッセージ送信
3.LongPolling
4.DeadLetterQueue

スケールするアーキテクチャを組むときに疎結合させることがポイントとなりますが、キューは疎結合を実現するときの大きな武器になります。

これまではRabbitMQ、AcviveMQなどを使ったりWebSphereMQなど商用のサービスがありましたが、構築するのも面倒な上、高可用性のメッセージングサービスを作りこむのは非常に骨が折れます。

その点SQSは高い可用性を持った分散キューサービスをSDKを使うことで簡単に利用できるのが超魅力的ですね。

SQSのポイント
1.メッセージの順序性は保証されない。
  保証したい場合は自分で作りこむかSWFを使いましょう。
2.メッセージの最低1回の到達性保証

早速やってみます。

シンプルなメッセージのやり取り

1.Queueに対してメッセージを送る

ManagementConsoleを使って先にQueueを作っておいても良いですが、下記のコードではなければ作ってしまいます。
メッセージの送信も非同期で行えるのでメッセージが到達した場合の処理と、失敗した場合の処理をAsyncHandlerに実装することができます。

SampleQueueSender.java
package aws.sqs;

import java.util.Date;

import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.profile.ProfileCredentialsProvider;
import com.amazonaws.handlers.AsyncHandler;
import com.amazonaws.services.sqs.AmazonSQSAsync;
import com.amazonaws.services.sqs.AmazonSQSAsyncClient;
import com.amazonaws.services.sqs.model.CreateQueueRequest;
import com.amazonaws.services.sqs.model.SendMessageRequest;
import com.amazonaws.services.sqs.model.SendMessageResult;

public class SampleQueueSender {

    private static final String SQS_ENDPOINT = "http://sqs.ap-northeast-1.amazonaws.com";

    private static final String QUEUE = "SampleQueue";

    public static void main(String[] args) {

        AWSCredentialsProvider provider = new ProfileCredentialsProvider(
                "uzresk");

        AmazonSQSAsync sqs = new AmazonSQSAsyncClient(provider);
        sqs.setEndpoint(SQS_ENDPOINT);

        CreateQueueRequest request = new CreateQueueRequest(QUEUE);
        String queueUrl = sqs.createQueue(request).getQueueUrl();

        // 非同期リクエストにするとハンドラが使えます。
        sqs.sendMessageAsync(new SendMessageRequest(queueUrl, new Date().toString()),
                new AsyncHandler<SendMessageRequest, SendMessageResult>() {

                    @Override
                    public void onSuccess(SendMessageRequest request,
                            SendMessageResult result) {
                        System.out.println("成功しました。" + result.getMessageId());
                    }

                    @Override
                    public void onError(Exception exception) {
                        System.out.println("失敗しました。");
                        exception.printStackTrace();
                    }
                });
    }
}

2.メッセージを受信する。

メッセージを受信するときはVisibilityTimeoutを設定しましょう。
これは、メッセージを受け取ってから他の受信者がその時間受け取れないようにするための時間です。処理の長さによって変えてあげます。
VisibilityTimeoutを過ぎたメッセージはキューに戻されてしまいますので注意が必要です。

SampleQueueReceiver.java
package aws.sqs;

import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.profile.ProfileCredentialsProvider;
import com.amazonaws.services.sqs.AmazonSQS;
import com.amazonaws.services.sqs.AmazonSQSClient;
import com.amazonaws.services.sqs.model.DeleteMessageRequest;
import com.amazonaws.services.sqs.model.ReceiveMessageRequest;
import com.amazonaws.services.sqs.model.ReceiveMessageResult;

public class SampleQueueReceiver {

    private static final String SQS_ENDPOINT = "http://sqs.ap-northeast-1.amazonaws.com";

    private static final String QUEUE = "QueueSample";

    public static void main(String[] args) {

        AWSCredentialsProvider provider = new ProfileCredentialsProvider(
                "uzresk");
        AmazonSQS sqs = new AmazonSQSClient(provider);
        sqs.setEndpoint(SQS_ENDPOINT);

        String queueUrl = sqs.createQueue(QUEUE).getQueueUrl();
        ReceiveMessageRequest request = new ReceiveMessageRequest(QUEUE)
                .withQueueUrl(queueUrl);

        // 他のReceiverが受信できなくなる時間
        request.setVisibilityTimeout(5);

        ReceiveMessageResult result = sqs.receiveMessage(request);

        System.out.println(result.getMessages());

        // maxNumberOfMessagesのデフォルト値は1なのでget(0)としています。
        DeleteMessageRequest deleteRequest = new DeleteMessageRequest();
        deleteRequest.setQueueUrl(sqs.createQueue(QUEUE).getQueueUrl());
        deleteRequest.setReceiptHandle(result.getMessages().get(0)
                .getReceiptHandle());
        sqs.deleteMessage(deleteRequest);
    }
}

Batch処理を使ったメッセージ送信

次はメッセージ送信をまとめてみます。SQSの課金はSQSリクエスト単位なのでまとめて送るとお得なのです。まぁただ100万リクエスト0.5USDだから・・・・。

メッセージは1度に10件まとめて送ることができます。11件以上だとエラーが発生します。

Exception in thread "main" com.amazonaws.services.sqs.model.TooManyEntriesInBatchRequestException: Maximum number of entries per request are 10. You have sent 11. (Service: AmazonSQS; Status Code: 400; Error Code: AWS.SimpleQueueService.TooManyEntriesInBatchRequest; Request ID: bd1f179e-c369-5329-9cab-5a9e65c7f4b4)
    (以下略)

では早速。

SampleQueueBatchSender.java
package aws.sqs;

import java.util.ArrayList;
import java.util.Date;
import java.util.List;

import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.profile.ProfileCredentialsProvider;
import com.amazonaws.services.sqs.AmazonSQS;
import com.amazonaws.services.sqs.AmazonSQSClient;
import com.amazonaws.services.sqs.model.CreateQueueRequest;
import com.amazonaws.services.sqs.model.SendMessageBatchRequest;
import com.amazonaws.services.sqs.model.SendMessageBatchRequestEntry;

public class SampleQueueBatchSender {

    private static final String SQS_ENDPOINT = "http://sqs.ap-northeast-1.amazonaws.com";

    private static final String QUEUE = "SampleBatchQueue";

    public static void main(String[] args) {

        AWSCredentialsProvider provider = new ProfileCredentialsProvider(
                "uzresk");
        AmazonSQS sqs = new AmazonSQSClient(provider);
        sqs.setEndpoint(SQS_ENDPOINT);

        CreateQueueRequest request = new CreateQueueRequest(QUEUE);
        String queueUrl = sqs.createQueue(request).getQueueUrl();

        SendMessageBatchRequest batchRequest = new SendMessageBatchRequest(queueUrl);
        batchRequest.setQueueUrl(queueUrl);

        List<SendMessageBatchRequestEntry> messages = new ArrayList<SendMessageBatchRequestEntry>();
        for (int i=0; i<10 ; i++) {
            messages.add(new SendMessageBatchRequestEntry(Integer.toString(i),new Date().toString()));
        }
        batchRequest.setEntries(messages);

        sqs.sendMessageBatch(batchRequest);

    }
}

LongPolling

ぐるぐるループを回してQueueを確認するコードを書かなくてもN秒間Queueを監視してくれます。コードもスッキリしますし、リクエストの数が減りますね。
LongPollingは最大20秒まで設定することが可能です。設定自体は非常に簡単で、ReceiveMessageRequestにWaitTimeSecondsを設定するだけです。

LongPollingの逆はShortPollingといいます。単一スレッドでworkerを動かしている時にはwaittimeseconds待ってしまいますので即時処理が必要な場合はshortpollingで動かします。

SampleLongPollQueueReceiver.java
package aws.sqs;

import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.profile.ProfileCredentialsProvider;
import com.amazonaws.services.sqs.AmazonSQS;
import com.amazonaws.services.sqs.AmazonSQSClient;
import com.amazonaws.services.sqs.model.DeleteMessageRequest;
import com.amazonaws.services.sqs.model.Message;
import com.amazonaws.services.sqs.model.ReceiveMessageRequest;
import com.amazonaws.services.sqs.model.ReceiveMessageResult;

public class SampleLongPollQueueReceiver {

    private static final String SQS_ENDPOINT = "http://sqs.ap-northeast-1.amazonaws.com";

    private static final String QUEUE = "SampleLongPollQueue";

    public static void main(String[] args) {

        AWSCredentialsProvider provider = new ProfileCredentialsProvider(
                "uzresk");
        AmazonSQS sqs = new AmazonSQSClient(provider);
        sqs.setEndpoint(SQS_ENDPOINT);

        ReceiveMessageRequest request = new ReceiveMessageRequest(QUEUE);
        request.setQueueUrl(sqs.createQueue(QUEUE).getQueueUrl());
        // 他のReceiverが受信できなくなる時間
        request.setVisibilityTimeout(10);
        // Long pollの設定
        request.setWaitTimeSeconds(20);

        ReceiveMessageResult result = sqs.receiveMessage(request);

        for (Message message : result.getMessages()) {
            // 受信したメッセージを表示
            System.out.println(message.getMessageId() + ":" + message.getBody());
            // メッセージを削除
            DeleteMessageRequest deleteRequest = new DeleteMessageRequest();
            deleteRequest.setQueueUrl(sqs.createQueue(QUEUE).getQueueUrl());
            deleteRequest.setReceiptHandle(message.getReceiptHandle());
            sqs.deleteMessage(deleteRequest);               
        }

    }
}

DeadLetterQueue

VisibilityTimeoutを過ぎたメッセージはキューに戻されますし、処理が正常に終わらずdelete messageされないメッセージは残り続けてしまう恐れがあります。
そこで登場するのがDeadLetterQueueです。指定された回数メッセージが戻されたら別のQueueに自動的に移動させておくことが可能です。

ManagementConsoleから設定します。

  1. DeadLetterQueueを作成する。(ただQueueを作るだけ)
  2. DeadLetterQueueの設定したいQueueを開き、設定を行います。
  • UseRedrivePolicyにチェック
  • DeadLetterQueueに1で指定したQueueの名前を入力
  • Maximum ReceivesにQueueに戻された回数を指定

image

DeadLetterQueueからメッセージを取り出します。通常のメッセージ受信のコードと同じですが、今回はReceiverのAttributeを利用して色々と情報を抜いてみます。

DeadLetterQueueReceiver.java
package aws.sqs;

import java.util.ArrayList;
import java.util.List;

import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.profile.ProfileCredentialsProvider;
import com.amazonaws.services.sqs.AmazonSQS;
import com.amazonaws.services.sqs.AmazonSQSClient;
import com.amazonaws.services.sqs.model.DeleteMessageRequest;
import com.amazonaws.services.sqs.model.Message;
import com.amazonaws.services.sqs.model.ReceiveMessageRequest;
import com.amazonaws.services.sqs.model.ReceiveMessageResult;

public class DeadLetterQueueReceiver {

    private static final String SQS_ENDPOINT = "http://sqs.ap-northeast-1.amazonaws.com";

    private static final String QUEUE = "SampleDeadLetterQueue";

    public static void main(String[] args) {

        AWSCredentialsProvider provider = new ProfileCredentialsProvider(
                "uzresk_nc2");
        AmazonSQS sqs = new AmazonSQSClient(provider);
        sqs.setEndpoint(SQS_ENDPOINT);

        ReceiveMessageRequest request = new ReceiveMessageRequest(QUEUE);
        List<String> attributeNames = new ArrayList<String>();
        attributeNames.add("ApproximateFirstReceiveTimestamp");
        attributeNames.add("ApproximateReceiveCount");
        attributeNames.add("SenderId");
        attributeNames.add("SentTimestamp");
        request.setAttributeNames(attributeNames);
        String queueUrl = sqs.createQueue(QUEUE).getQueueUrl();
        request.setQueueUrl(queueUrl);
        // 他のReceiverが受信できなくなる時間
        request.setVisibilityTimeout(10);
        // Long pollの設定
        request.setWaitTimeSeconds(10);

        ReceiveMessageResult result = sqs.receiveMessage(request);

        for (Message message : result.getMessages()) {
            // 受信したメッセージを表示
            System.out.println(message.getMessageId() + ":" + message.getBody());
            System.out.println("ApproximateFirstReceiveTimestamp:" + message.getAttributes().get("ApproximateFirstReceiveTimestamp"));
            System.out.println("ApproximateReceiveCount:" + message.getAttributes().get("ApproximateReceiveCount"));
            System.out.println("SenderId:" + message.getAttributes().get("SenderId"));
            System.out.println("SentTimestamp:" + message.getAttributes().get("SentTimestamp"));
            // メッセージを削除
            DeleteMessageRequest deleteRequest = new DeleteMessageRequest();
            deleteRequest.setQueueUrl(sqs.createQueue(QUEUE).getQueueUrl());
            deleteRequest.setReceiptHandle(message.getReceiptHandle());
            sqs.deleteMessage(deleteRequest);
        }

    }
}

requestにattributenamesを指定しないと、受信したレスポンスでは情報が取得できないので注意が必要です。今回は沢山attributeを設定しましたが、Allと設定しても同じ結果が得られます。

DeadLetterQueueの判断の元となっている回数はApproximateReceiveCountです。
確認した所ApproximateReceiveCountが増えるのはVisibilityTimeoutが発生してキューに戻されるタイミングと、メッセージは受信したがdeletemessageが送られない場合のようです。

また、Maximum Receivesを3に設定した場合、ApproximateReceiveCountが3となった時にDeadLetterQueueに移されますが、若干タイムラグがあるようで私が試した時は30秒くらい立ってからManagementConsole上で確認できました。

参考

http://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/SQSDeadLetterQueue.html
http://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_ReceiveMessage.html

Why not register and get more from Qiita?
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away
Comments
No comments
Sign up for free and join this conversation.
If you already have a Qiita account
Why do not you register as a user and use Qiita more conveniently?
You need to log in to use this function. Qiita can be used more conveniently after logging in.
You seem to be reading articles frequently this month. Qiita can be used more conveniently after logging in.
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away
ユーザーは見つかりませんでした