[SQS + Node.js] SQSのキューに溜まっているメッセージを1つずつ処理するConsumerを実装してみた
概要
今回はSQSのキューに溜まっているメッセージを1つずつ処理するConsumerというものを実装してみました。
あのBBCが似たようなライブラリをOSSとして提供してくれていますが、SQSを勉強する良い機会なので自分で簡単なものを実装してみました。
https://github.com/bbc/sqs-consumer
開発環境
実行環境
Node.js v10.4.1
開発ツール
Babel
SQSConsumerクラスの実装
まずはクラスのconstructorを作成します。引数としてキューのURLとキューのオプションを受け取ります。
aws-sdk
を利用してSQSクライアントのオブジェクトを生成します。
import AWS from 'aws-sdk';
export default class SQSConsumer {
constructor(queueUrl, options) {
this.sqs = new AWS.SQS({
region: process.env.SQS_REGION,
endpoint: process.env.SQS_ENDPOINT
});
this.queueUrl = queueUrl;
this.options = options || {
VisibilityTimeout: 15
};
}
...
}
続いて、メッセージを1つ1つ取得して処理する consume
メソッドを実装します。 handler
にはメッセージを処理する関数が入ります。
import AWS from 'aws-sdk';
export default class SQSConsumer {
...
async consume(handler) {
const result = await this.sqs.receiveMessage(Object.assign({
QueueUrl: this.queueUrl
}, this.options)).promise();
if (!result.hasOwnProperty('Messages')) return;
const message = result['Messages'].pop();
try {
await handler(message['Body']);
await this.sqs.deleteMessage({
QueueUrl: this.queueUrl,
ReceiptHandle: message.ReceiptHandle
}).promise();
} catch (e) {
console.error('Error occurred while consuming a message: ' + e.message);
await this.sqs.changeMessageVisibility({
QueueUrl: this.queueUrl,
ReceiptHandle: message.ReceiptHandle,
VisibilityTimeout: 0
}).promise();
}
}
}
上記処理の内容を一つずつ見ていくと、まずはキューからメッセージを1つ取得します。
const result = await this.sqs.receiveMessage(Object.assign({
QueueUrl: this.queueUrl
}, this.options)).promise();
constructorで指定した、キューのURLとキューのオプションを渡して receiveMessage
を呼び出します。引数の内容は以下の通りです。
{
QueueUrl: 'キューのURL',
VisibilityTimeout: '15',
// MaxNumberOfMessages: 1
}
デフォルトでは1メッセージを取得します。 MaxNumberOfMessages
を指定することで複数メッセージが取得可能です。(10メッセージまで取得可能)
そして、receiveMessages
の引数には VisibilityTimeout
というオプションを指定しています。SQSでは receiveMessage
リクエストをしてメッセージを取得すると、 VisibilityTimeout
で指定している時間(秒)の間はキューからそのメッセージが見えなくなります。なのでその間に新たに receiveMessage
リクエストをしてもそのメッセージは取得できません。VisibilityTimeout
が過ぎると再びキュー内でそのメッセージが見えるようになります。今回はその値を15秒で設定しています。(変更可能)
this.sqs.receiveMessage(/*引数*/).promise();
また上記のように最後に .promise()
を呼び出すことで、結果をPromise型で返してくれます。
.promise()
をつけない場合はコールバックで結果が渡されるため、以下のようにボイラープレートコードが発生します。これを毎回書くのは微妙なので、 .promise()
を使いましょう。
const result = await new Promise((resolve, reject) => {
this.sqs.receiveMessage(Object.assign({
QueueUrl: this.queueUrl
}, this.options), function (err, data) {
return !err ? resolve(data) : reject(err);
});
});
次に、receiveMessage
でメッセージが取得できたか確認します。取得したメッセージはMessagesプロパティ内に配列で返ってきます。もし1つもメッセージを取得できなかった場合は結果にMessagesプロパティは含まれません。よってMessagesプロパティが結果にない場合はそれ以降の処理は行わないようにしています。
if (!result.hasOwnProperty('Messages')) return;
続いて取得したメッセージの処理を行います。
pop()
でメッセージ配列から1つメッセージを取得します。
const message = result['Messages'].pop();
その後、メッセージ処理を実行します。
try {
await handler(message['Body']);
await this.sqs.deleteMessage({
QueueUrl: this.queueUrl,
ReceiptHandle: message.ReceiptHandle
}).promise();
} catch (e) {
...
}
handler
関数を取得したメッセージの内容を渡して実行します。handler
でメッセージ処理に成功したら、そのメッセージは deleteMessage
で削除します。メッセージは VisibilityTimeout
が有効な間でも削除できます。
deleteMessage
にはメッセージがもつ ReceiptHandle
を渡します。この ReceiptHandle
というのは1つ1つのメッセージが受信される度に付与されるIDみたいなもので、メッセージを削除したり、このあと出てくるメッセージの VisibilityTimeout
を変更する際に利用します。
もし handler
の実行に失敗した場合は、catch内の処理が実行されます。
try {
...
} catch (e) {
console.error('Error occurred while consuming a message: ' + e.message);
await this.sqs.changeMessageVisibility({
QueueUrl: this.queueUrl,
ReceiptHandle: message.ReceiptHandle,
VisibilityTimeout: 0
}).promise();
}
handler
の実行に失敗して処理されずに残ったメッセージは削除せずに、次にconsumeが呼ばれたときに再び処理します。
メッセージは削除されないため、このままでもキューに残り続けます。しかし、メッセージの VisibilityTimeout
が次に receiveMessage
が呼ばれたときにも有効だった場合、そのメッセージは取得できません。
再び consume
が呼び出された時も失敗したメッセージを取得して処理してほしいので、 changeMessageVisibility
を使って、VisibilityTimeout
を0に設定します。また、ここでも ReceiptHandle
をオプションとして渡します。
これで失敗した直後に VisibilityTimeout
がリセットされるので、再びキュー上にメッセージが見えるようになり、次の consume
呼び出しでこの失敗したメッセージを必ず取得できます。
全体の実装は以下の通りです。
import AWS from 'aws-sdk';
export default class SQSConsumer {
constructor(queueUrl, options) {
this.sqs = new AWS.SQS({
region: process.env.KEYWORD_SQS_REGION,
endpoint: process.env.KEYWORD_SQS_ENDPOINT
});
this.queueUrl = queueUrl;
this.options = options || {
VisibilityTimeout: 15
};
}
async consume(handler) {
const result = await this.sqs.receiveMessage(Object.assign({
QueueUrl: this.queueUrl
}, this.options)).promise();
if (!result.hasOwnProperty('Messages')) return;
const message = result['Messages'].pop();
try {
await handler(message['Body']);
await this.sqs.deleteMessage({
QueueUrl: this.queueUrl,
ReceiptHandle: message.ReceiptHandle
}).promise();
} catch (e) {
console.error('Error occurred while consuming a message: ' + e.message);
await this.sqs.changeMessageVisibility({
QueueUrl: this.queueUrl,
ReceiptHandle: message.ReceiptHandle,
VisibilityTimeout: 0
}).promise();
}
}
}
SQSConsumerクラスの利用
実装したSQSConsumerクラスは以下のように使います。
consumer.consume((message) => {
console.log(`Processed a message: ${message}`);
});
↑の処理をずっと繰り返し実行したり、一定時間ごとに実行したり用途は様々です。
総括
上記の実装ではhandlerが処理に失敗したメッセージは次にも実行されるため、handlerがずっと失敗していると無限ループにハマります。この辺はRetry数を決めれるようにしたり、Retry数を超えるとDead Letter Queueに失敗したメッセージを送るなど解決方法はあると思います。
また、SQSクライアントが実行に失敗しない前提なので、SQS側との通信エラーがあった場合などはプログラムが落ちてしまします。
さらに、今回は一度に1メッセージしか取得しないため、SQSへの通信オーバーヘッドが発生してしまうみたいですし、メッセージ量 = SQSのキューへのリクエスト数になってしまいます。1リクエストで10メッセージを取得してしまったほうが良さそうです。
SQSのベスト・プラクティスについてはAWSさんが以下で提示してくれています。
https://docs.aws.amazon.com/ja_jp/AWSSimpleQueueService/latest/SQSDeveloperGuide/working-with-messages.html#setting-up-long-polling
このようにいろいろ改良するべき点はあると思いますが、それを抜きにして今回はSQSを使ってみる良い勉強になったのではないかと思います。最後まで読んでいただきありがとうございました。