LoginSignup
37
37

More than 5 years have passed since last update.

AWS LambdaでSQSのコンシューマを作る

Posted at

AWS Lambdaの無限ループパターンで何かできないかなと考えてたら思いついたので試してみた話。

構成

KinesisのストリームからAWS Lambdaを起動して、処理が完了したら再度AWS Kinesisにプッシュすることで無限ループを行います。
この無限ループ中でSQSからメッセージを受け取り、メッセージ処理用のAWS Lambdaを起動して、メッセージを処理できたらSQSからメッセージを削除するという動きになります。
もし、失敗した場合はメッセージがそのままSQSに残るので、Dead Letter Queueに入るまでは何度でもリトライ可能になります。

コード

consumer.js
// require
var Promise = require('bluebird');
var AWS     = require('aws-sdk');
var config  = require('config');
var uuid    = require('node-uuid');
var sqs     = Promise.promisifyAll(new AWS.SQS());
var lambda  = Promise.promisifyAll(new AWS.Lambda());
var kinesis = Promise.promisifyAll(new AWS.Kinesis());
/**
 * AWS Lambda entry point
 *
 * @param {Object} event lambda event object
 * @param {Object} context lambda context
 */
exports.handler = function(event, context) {
  // 
  Promise.resolve().then(function() {
    // receive messages from SQS
    var params = {
      QueueUrl: config.get('sqs.url'),
      MaxNumberOfMessages: config.get('sqs.max_number_of_messages'),
      VisibilityTimeout: config.get('sqs.visiblity_timeout'),
      WaitTimeSeconds: config.get('sqs.wait_time_seconds')
    };
    return sqs.receiveMessageAsync(params).then(function(data) {
      return data.Messages || [];
    });

  }).each(function(data) {
    // invoke lambda
    var args = {Records: [
      {
        sqs: {
          QueueUrl: config.get('sqs.url'),
          MessageId: data.MessageId,
          ReceiptHandle: data.ReceiptHandle,
          MD5OfBody: data.MD5OfBody,
          Body: data.Body
        }
      }
    ]};
    var params = {
      FunctionName: config.get('lambda.function_name'),
      InvokeArgs: JSON.stringify(args)
    };
    return lambda.invokeAsyncAsync(params);

  }).then(function() {
    // put to kinesis stream for retry
    var params = {
      Records: [
        {
          PartitionKey: uuid.v4(),
          Data: ''
        }
      ],
      StreamName: config.get('kinesis.stream_name')
    };
    return kinesis.putRecordsAsync(params);

  }).then(function() {
    context.done(null, 'success consumer.');

  }).catch(function(err) {
    context.done(err);

  });

};

コード自体も簡単で、SQSからメッセージを受けとって、AWS Lambdaを起動して、Kinesisnに再プッシュしているだけになります。

ネットワーク障害とかで処理に失敗した場合はAWS Lambda自体のリトライ機能を使って、再実行を行います。
Kinesisをイベントソースにしているためリトライが10回できるので、それで十分かなという感じです。

あとはCloudWatchとかでSQSのMessagesVisibleを見て閾値を超えてたらKinesisに2回プッシュするとかすると、SQSの詰まり具合に合わせてスケールできて良いかもですね。

停止方法

AWS Lambda functionを削除するかKinesisのストリームを削除したら止まります。

感想

パターンとしてはおもしろいですが、Kinesis-Lambdaで安定的に無限ループさせるのが難しそうな雰囲気です。
監視とかパフォーマンスとかスケールとか考えだすと1年間安定運用させる自信がないです。

ちょっと誰か実運用に載せて感想文書いて欲しいです。

37
37
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
37
37