LoginSignup
3
1

More than 5 years have passed since last update.

[SQS + Node.js] SQSのキューに溜まっているメッセージを1つずつ処理するConsumerを実装してみた

Last updated at Posted at 2018-09-22

[SQS + Node.js] SQSのキューに溜まっているメッセージを1つずつ処理するConsumerを実装してみた

概要

今回はSQSのキューに溜まっているメッセージを1つずつ処理するConsumerというものを実装してみました。
あのBBCが似たようなライブラリをOSSとして提供してくれていますが、SQSを勉強する良い機会なので自分で簡単なものを実装してみました。
https://github.com/bbc/sqs-consumer

開発環境

実行環境
Node.js v10.4.1

開発ツール
Babel

SQSConsumerクラスの実装

まずはクラスのconstructorを作成します。引数としてキューのURLとキューのオプションを受け取ります。
aws-sdk を利用してSQSクライアントのオブジェクトを生成します。

SQSConsumer.js
import AWS from 'aws-sdk';

export default class SQSConsumer {

    constructor(queueUrl, options) {
        this.sqs = new AWS.SQS({
            region: process.env.SQS_REGION,
            endpoint: process.env.SQS_ENDPOINT
        });
        this.queueUrl = queueUrl;
        this.options = options || {
            VisibilityTimeout: 15
        };
    }

    ...
}

続いて、メッセージを1つ1つ取得して処理する consume メソッドを実装します。 handler にはメッセージを処理する関数が入ります。

SQSConsumer.js
import AWS from 'aws-sdk';

export default class SQSConsumer {

    ...

    async consume(handler) {

        const result = await this.sqs.receiveMessage(Object.assign({
            QueueUrl: this.queueUrl
        }, this.options)).promise();

        if (!result.hasOwnProperty('Messages')) return;

        const message = result['Messages'].pop();

        try {
            await handler(message['Body']);

            await this.sqs.deleteMessage({
                QueueUrl: this.queueUrl,
                ReceiptHandle: message.ReceiptHandle
            }).promise();
        } catch (e) {
            console.error('Error occurred while consuming a message: ' + e.message);
            await this.sqs.changeMessageVisibility({
                QueueUrl: this.queueUrl,
                ReceiptHandle: message.ReceiptHandle,
                VisibilityTimeout: 0
            }).promise();
        }
    }
}

上記処理の内容を一つずつ見ていくと、まずはキューからメッセージを1つ取得します。

        const result = await this.sqs.receiveMessage(Object.assign({
            QueueUrl: this.queueUrl
        }, this.options)).promise();

constructorで指定した、キューのURLとキューのオプションを渡して receiveMessage を呼び出します。引数の内容は以下の通りです。

{
    QueueUrl: 'キューのURL',
    VisibilityTimeout: '15',
    // MaxNumberOfMessages: 1
}

デフォルトでは1メッセージを取得します。 MaxNumberOfMessages を指定することで複数メッセージが取得可能です。(10メッセージまで取得可能)

そして、receiveMessages の引数には VisibilityTimeout というオプションを指定しています。SQSでは receiveMessage リクエストをしてメッセージを取得すると、 VisibilityTimeout で指定している時間(秒)の間はキューからそのメッセージが見えなくなります。なのでその間に新たに receiveMessage リクエストをしてもそのメッセージは取得できません。VisibilityTimeout が過ぎると再びキュー内でそのメッセージが見えるようになります。今回はその値を15秒で設定しています。(変更可能)

this.sqs.receiveMessage(/*引数*/).promise();

また上記のように最後に .promise() を呼び出すことで、結果をPromise型で返してくれます。
.promise() をつけない場合はコールバックで結果が渡されるため、以下のようにボイラープレートコードが発生します。これを毎回書くのは微妙なので、 .promise() を使いましょう。

        const result = await new Promise((resolve, reject) => {
            this.sqs.receiveMessage(Object.assign({
                QueueUrl: this.queueUrl
            }, this.options), function (err, data) {
                return !err ? resolve(data) : reject(err);
            });
        });

次に、receiveMessage でメッセージが取得できたか確認します。取得したメッセージはMessagesプロパティ内に配列で返ってきます。もし1つもメッセージを取得できなかった場合は結果にMessagesプロパティは含まれません。よってMessagesプロパティが結果にない場合はそれ以降の処理は行わないようにしています。

        if (!result.hasOwnProperty('Messages')) return;

続いて取得したメッセージの処理を行います。
pop() でメッセージ配列から1つメッセージを取得します。

        const message = result['Messages'].pop();

その後、メッセージ処理を実行します。

        try {
            await handler(message['Body']);

            await this.sqs.deleteMessage({
                QueueUrl: this.queueUrl,
                ReceiptHandle: message.ReceiptHandle
            }).promise();
        } catch (e) {
            ...
        }

handler 関数を取得したメッセージの内容を渡して実行します。handler でメッセージ処理に成功したら、そのメッセージは deleteMessage で削除します。メッセージは VisibilityTimeout が有効な間でも削除できます。

deleteMessage にはメッセージがもつ ReceiptHandle を渡します。この ReceiptHandle というのは1つ1つのメッセージが受信される度に付与されるIDみたいなもので、メッセージを削除したり、このあと出てくるメッセージの VisibilityTimeout を変更する際に利用します。

もし handler の実行に失敗した場合は、catch内の処理が実行されます。

        try {
            ...
        } catch (e) {
            console.error('Error occurred while consuming a message: ' + e.message);
            await this.sqs.changeMessageVisibility({
                QueueUrl: this.queueUrl,
                ReceiptHandle: message.ReceiptHandle,
                VisibilityTimeout: 0
            }).promise();
        }

handler の実行に失敗して処理されずに残ったメッセージは削除せずに、次にconsumeが呼ばれたときに再び処理します。
メッセージは削除されないため、このままでもキューに残り続けます。しかし、メッセージの VisibilityTimeout が次に receiveMessage が呼ばれたときにも有効だった場合、そのメッセージは取得できません。

再び consume が呼び出された時も失敗したメッセージを取得して処理してほしいので、 changeMessageVisibility を使って、VisibilityTimeout を0に設定します。また、ここでも ReceiptHandle をオプションとして渡します。
これで失敗した直後に VisibilityTimeout がリセットされるので、再びキュー上にメッセージが見えるようになり、次の consume 呼び出しでこの失敗したメッセージを必ず取得できます。

全体の実装は以下の通りです。

SQSConsumer.js
import AWS from 'aws-sdk';

export default class SQSConsumer {

    constructor(queueUrl, options) {
        this.sqs = new AWS.SQS({
            region: process.env.KEYWORD_SQS_REGION,
            endpoint: process.env.KEYWORD_SQS_ENDPOINT
        });
        this.queueUrl = queueUrl;
        this.options = options || {
            VisibilityTimeout: 15
        };
    }

    async consume(handler) {

        const result = await this.sqs.receiveMessage(Object.assign({
            QueueUrl: this.queueUrl
        }, this.options)).promise();

        if (!result.hasOwnProperty('Messages')) return;

        const message = result['Messages'].pop();

        try {
            await handler(message['Body']);

            await this.sqs.deleteMessage({
                QueueUrl: this.queueUrl,
                ReceiptHandle: message.ReceiptHandle
            }).promise();
        } catch (e) {
            console.error('Error occurred while consuming a message: ' + e.message);
            await this.sqs.changeMessageVisibility({
                QueueUrl: this.queueUrl,
                ReceiptHandle: message.ReceiptHandle,
                VisibilityTimeout: 0
            }).promise();
        }
    }
}

SQSConsumerクラスの利用

実装したSQSConsumerクラスは以下のように使います。

consumer.consume((message) => {
    console.log(`Processed a message: ${message}`);
});

↑の処理をずっと繰り返し実行したり、一定時間ごとに実行したり用途は様々です。

総括

上記の実装ではhandlerが処理に失敗したメッセージは次にも実行されるため、handlerがずっと失敗していると無限ループにハマります。この辺はRetry数を決めれるようにしたり、Retry数を超えるとDead Letter Queueに失敗したメッセージを送るなど解決方法はあると思います。

また、SQSクライアントが実行に失敗しない前提なので、SQS側との通信エラーがあった場合などはプログラムが落ちてしまします。

さらに、今回は一度に1メッセージしか取得しないため、SQSへの通信オーバーヘッドが発生してしまうみたいですし、メッセージ量 = SQSのキューへのリクエスト数になってしまいます。1リクエストで10メッセージを取得してしまったほうが良さそうです。

SQSのベスト・プラクティスについてはAWSさんが以下で提示してくれています。
https://docs.aws.amazon.com/ja_jp/AWSSimpleQueueService/latest/SQSDeveloperGuide/working-with-messages.html#setting-up-long-polling

このようにいろいろ改良するべき点はあると思いますが、それを抜きにして今回はSQSを使ってみる良い勉強になったのではないかと思います。最後まで読んでいただきありがとうございました。

3
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
3
1