0
2

More than 1 year has passed since last update.

Amazon SQS互換のローカル実行環境ElasticMQの構築

Last updated at Posted at 2022-08-16

Amazon SQSと互換があるElasticMQをインストールし、ローカルにあるLambdaのソースコードと連携するローカル実行環境を構築します

softwaremill/elasticmq

ElasticMQのインストール

ElasticMQのインストール方法には複数ありますが、今回はDockerを使います。
特に、Docker環境としては、QNAPのNASにインストールします。理由は単にQNAP NASを利用しているから。

QNAPには、Docker環境として、ContainerStationがあります。
ElasticMQのDockerファイルは、リポジトリに対して、elasticmqの入力で検索にひっかかります。

Dockerに馴染みのある方は、以下がわかりやすいかと思います。

 docker run -p 9324:9324 -p 9325:9325 softwaremill/elasticmq-native

これでもよいのですが、インストールされるホスト名がlocalhostとなってしまい、かっこ悪いので、設定ファイルを変更します。

(以降はQNAPでの操作です)
そこで、エントリポイントには以下を指定します。

/sbin/tini -- /opt/docker/bin/elasticmq-native-server -Dconfig.file=/opt2/elasticmq.conf -Dlogback.configurationFile=/opt/logback.xml

Configファイルは、「/opt2/elasticmq.conf」を参照するようにしています。

image.png

そして、/opt2をNAS上のフォルダに割り当てるようにします。
例:
ホストからのボリューム:/Container/elasticmq/opt2
マウントポイント:/opt2

image.png

あとは、/Container/elasticmq/opt2の下に、elasticmq.confを作成すればよいです。QNAPのアプリである「Text Editor」を使うのがよいかと思います。

/Container/elasticmq/opt2/elasticmq.conf
include classpath("application.conf")

# What is the outside visible address of this ElasticMQ node
# Used to create the queue URL (may be different from bind address!)
node-address {
  protocol = http
  host = 【外部からアクセスされる際のホスト名】
  port = 9324
  context-path = ""
}

ネットワークは、「NAT」にして、コンテナ側のポート番号9324と9325はそのままホスト側に割り当てます。

image.png

以上で設定完了です。
インストール完了後、以下にアクセスすると、管理Webページが表示されます。

http://【外部からアクセスされる際のホスト名】:9325

image.png

Node.jsコード・AWS CLIの例

●キューの作成

(通常のキュー)

sqs_create_queue.js
const SQS_ENDPOINT = "http://【外部からアクセスされる際のホスト名】:9324";
const SQS_QUEUE_NAME = "test";

const AWS = require('aws-sdk');
AWS.config.update({
//	region: 'ap-northeast-1'
	region: 'elasticmq'
});
const sqs = new AWS.SQS({
	apiVersion: '2012-11-05',
	endpoint: SQS_ENDPOINT
});

const params = {
	QueueName: SQS_QUEUE_NAME,
	Attributes: {
//		FifoQueue: "true",
	},
};

sqs.createQueue(params, (err, data) => {
  if (err) {
    console.log("Error", err);
    return;
  }
  console.log("Success", JSON.stringify(data, null, '\t'));
});

(FIFOキュー)

sqs_create_queue.js
const SQS_ENDPOINT = "http://【外部からアクセスされる際のホスト名】:9324";
const SQS_QUEUE_NAME = "test.fifo";

const AWS = require('aws-sdk');
AWS.config.update({
//	region: 'ap-northeast-1'
	region: 'elasticmq'
});
const sqs = new AWS.SQS({
	apiVersion: '2012-11-05',
	endpoint: SQS_ENDPOINT
});

const params = {
	QueueName: SQS_QUEUE_NAME,
	Attributes: {
		FifoQueue: "true",
	},
};

sqs.createQueue(params, (err, data) => {
  if (err) {
    console.log("Error", err);
    return;
  }
  console.log("Success", JSON.stringify(data, null, '\t'));
});
> aws sqs create-queue --queue-name test --endpoint-url http://【外部からアクセスされる際のホスト名】:9324

●キューのリスト取得

sqs_list_queue.js
const SQS_ENDPOINT = "http://【外部からアクセスされる際のホスト名】:9324";

const AWS = require('aws-sdk');
AWS.config.update({
//	region: 'ap-northeast-1'
	region: 'elasticmq'
});
const sqs = new AWS.SQS({
	apiVersion: '2012-11-05',
	endpoint: SQS_ENDPOINT
});

const params = {
};

sqs.listQueues(params, (err, data) => {
  if (err) {
    console.log("Error", err);
    return;
  }
  console.log("Success", JSON.stringify(data, null, '\t'));
});
> aws sqs list-queues --endpoint-url http://【外部からアクセスされる際のホスト名】:9324

●キューの削除

sqs_delete_queue.js
const SQS_ENDPOINT = "http://【外部からアクセスされる際のホスト名】:9324";
const SQS_QUEUE_URL = "http://【外部からアクセスされる際のホスト名】:9324/000000000000/test";

const AWS = require('aws-sdk');
AWS.config.update({
//	region: 'ap-northeast-1'
	region: 'elasticmq'
});
const sqs = new AWS.SQS({
	apiVersion: '2012-11-05',
	endpoint: SQS_ENDPOINT
});

const params = {
	QueueUrl: SQS_QUEUE_URL
};

sqs.deleteQueue(params, (err, data) => {
  if (err) {
    console.log("Error", err);
    return;
  }
  console.log("Success", data);
});
> aws sqs delete-queue --queue-url http://【外部からアクセスされる際のホスト名】:9324/000000000000/test --endpoint-url http://【外部からアクセスされる際のホスト名】:9324

●メッセージの送信

sqs_send_message.js
const SQS_ENDPOINT = "http://【外部からアクセスされる際のホスト名】:9324";
const SQS_QUEUE_URL = "http://【外部からアクセスされる際のホスト名】:9324/000000000000/test";

const AWS = require('aws-sdk');
AWS.config.update({
//	region: 'ap-northeast-1'
	region: 'elasticmq'
});
const sqs = new AWS.SQS({
	apiVersion: '2012-11-05',
	endpoint: SQS_ENDPOINT
});

const params = {
  QueueUrl: SQS_QUEUE_URL,
  MessageBody: "This is test message.",
  MessageAttributes: {
  },
};

sqs.sendMessage(params, (err, data) => {
  if (err) {
    console.log("Error", err);
    return;
  }
  console.log("Success", data);
});
> aws sqs send-message --queue-url http://【外部からアクセスされる際のホスト名】:9324/000000000000/test --message-body "This is test message." --endpoint-url http://【外部からアクセスされる際のホスト名】:9324

●メッセージの受信

sqs_receive_message.js
const SQS_ENDPOINT = "http://【外部からアクセスされる際のホスト名】:9324";
const SQS_QUEUE_URL = "http://【外部からアクセスされる際のホスト名】:9324/000000000000/test";

const AWS = require('aws-sdk');
AWS.config.update({
//	region: 'ap-northeast-1'
	region: 'elasticmq'
});

const sqs = new AWS.SQS({
	apiVersion: '2012-11-05',
	endpoint: SQS_ENDPOINT
});

const params = {
 QueueUrl: SQS_QUEUE_URL,
 MaxNumberOfMessages: 10,
 MessageAttributeNames: [
    "All"
 ],
 VisibilityTimeout: 30,
 WaitTimeSeconds: 20
};

sqs.receiveMessage(params, (err, data) => {
  if (err) {
    console.log("Receive Error", err);
    return;
  }
  console.log(JSON.stringify(data, null, "\t"));
});
> aws sqs receive-message --queue-url http://【外部からアクセスされる際のホスト名】:9324/000000000000/test --endpoint-url http://【外部からアクセスされる際のホスト名】:9324

●メッセージの削除

sqs_delete_message.js
const SQS_ENDPOINT = "http://【外部からアクセスされる際のホスト名】:9324";
const SQS_QUEUE_URL = "http://【外部からアクセスされる際のホスト名】:9324/000000000000/test";
const SQS_RECEIPT_HANDLE = "【送信した後に取得されるMessageId】";

const AWS = require('aws-sdk');
AWS.config.update({
//	region: 'ap-northeast-1'
	region: 'elasticmq'
});
const sqs = new AWS.SQS({
	apiVersion: '2012-11-05',
	endpoint: SQS_ENDPOINT
});

const params = {
	QueueUrl: SQS_QUEUE_URL,
	ReceiptHandle: SQS_RECEIPT_HANDLE
};

sqs.deleteMessage(params, (err, data) => {
  if (err) {
    console.log("Error", err);
    return;
  }
  console.log("Success", data);
});
> aws sqs delete-message --queue-url http://【外部からアクセスされる際のホスト名】:9324/000000000000/test --receipt-handle 【送信した後に取得されるMessageId】--endpoint-url http://【外部からアクセスされる際のホスト名】:9324

メッセージ受信時にLambdaと連携する

面倒なので、簡単に設定できるようにしておきました。
使い方だけ。

以下に実装済みです。

poruruba/amplify_template

ZIPダウンロード後展開します。

> unzip amplify_template-master.zip
> cd amplify_template-master
> npm install
> npm install aws-sdk --save-dev
> vi .env

下記は、以降で作成するLambdaで共通で使われます。

.env
ROUTING_SQS_ENDPOINT=http://【外部からアクセスされる際のホスト名】:9324

Lambdaのロジックは以下のように作成します。

> cd api/controllers
> mkdir test-sqs
> cd test-sqs
> vi sqs.json
sqs.json
[
  {
    "enable": true,
    "QueueUrl": "http://【外部からアクセスされる際のホスト名】:9324/000000000000/【キュー名】"
  }
]

Lambdaのロジックを実装します。以下例です。

> vi index.js
api/controllers/test-sqs/index.js
'use strict';

exports.handler = async (event, context, callback) => {
  console.log(JSON.stringify(event, null, '\t'));
  return {};
};

これで完了です。
以下のように実行することで、ローカル実行環境が起動します。

> node app.js

sqs_send_message.js で送信すると、受信されてLambdaのロジック(index.js)が起動するかと思います。

内部で以下のような仲介処理を入れることで実現しています。

api/controllers/routing_sqs.js
'use strict';

const THIS_BASE_PATH = process.env.THIS_BASE_PATH;
const CONTROLLERS_BASE = THIS_BASE_PATH + '/api/controllers/';

//const SQS_REGION = "ap-northeast-1";
const SQS_REGION = "elasticmq";
const SQS_ENDPOINT = process.env.ROUTING_SQS_ENDPOINT;

const SQS_TARGET_FNAME = "sqs.json";
const DEFAULT_HANDLER = "handler";

const AWS = require('aws-sdk');
AWS.config.update({
	region: SQS_REGION
});
const sqs = new AWS.SQS({
	apiVersion: '2012-11-05',
	endpoint: SQS_ENDPOINT
});

const fs = require('fs');

function parse_sqs() {
  // sqs.jsonの検索
  const folders = fs.readdirSync(CONTROLLERS_BASE);
  folders.forEach(folder => {
    try {
      const stats_dir = fs.statSync(CONTROLLERS_BASE + folder);
      if (!stats_dir.isDirectory())
        return;
    
      const fname = CONTROLLERS_BASE + folder + "/" + SQS_TARGET_FNAME;
      if (!fs.existsSync(fname))
        return;
      const stats_file = fs.statSync(fname);
      if (!stats_file.isFile())
        return;

      // pollerの登録
      const defs = JSON.parse(fs.readFileSync(fname).toString());
      parse_sqs_json(defs, CONTROLLERS_BASE + folder, folder);
    } catch (error) {
      console.log(error);
    }
  });
}

function parse_sqs_json(defs, folder, folder_name) {
  defs.forEach(item => {
    if (!item.enable )
      return;
      
    const handler = item.handler || DEFAULT_HANDLER;
    const proc = require(folder)[handler];
    const params = {
      QueueUrl: item.QueueUrl,
      AttributeNames: item.AttributeNames,
      MaxNumberOfMessages: (item.MaxNumberOfMessages === undefined) ? 1 : item.MaxNumberOfMessages,
      MessageAttributeNames: item.MessageAttributeNames,
      VisibilityTimeout: (item.VisibilityTimeout === undefined) ? 30 : item.VisibilityTimeout,
      WaitTimeSeconds: (item.WaitTimeSeconds === undefined) ? 20: item.WaitTimeSeconds
    };

    continuousReceive(params, proc);

    console.log("sqs(" + item.QueueUrl + ") " + handler + ' ' + folder_name);
  });
}

async function continuousReceive(params, func){
	while(true){
		try{
			await new Promise((resolve, reject) =>{
				sqs.receiveMessage(params, async (err, data) => {
				  if (err) {
				    return reject(err);
				  }
				  if( !data.Messages ){
				  	return resolve('message empty');
				  }

          try{
			  		const event = {
			  			Records: data.Messages.map(item => {
                return {
                  messageId: item.MessageId,
                  receiptHandle: item.ReceiptHandle,
                  body :item.Body,
                  attributes: item.Attributes,
                  messageAttributes: item.MessageAttributes,
                  md5OfBody: item.MD5OfBody,
                  md5OfMessageAttributes: item.MD5OfMessageAttributes,
                  awsRegion: SQS_REGION
                }
              })
			  		};

            const success_return = (msg) => {
              if( data.Messages ){
                const deleteParams = {
                  QueueUrl: params.QueueUrl,
                  Entries: data.Messages.map(item => { return { Id: item.MessageId, ReceiptHandle: item.ReceiptHandle } } )
                };
                sqs.deleteMessageBatch(deleteParams, (err, data) => {
                  if (err) {
                    return reject(err);
                  } else {
                    return resolve();
                  }
                });
              }
            };
            const context = {
              succeed: success_return,
              fail: (error) => resolve(error)
            };
            const task = func(event, context, (error, response) => {
              if (error) return resolve(error);
              return success_return(response);
            });
            if (task instanceof Promise || (task && typeof task.then === 'function')) {
              return task.then(ret => {
                if (ret)
                  return success_return(ret);
              })
              .catch(err => resolve(err));
            }else{
              if( task !== undefined )
                success_return(task);
            }
          }catch(error){
            console.log(error);
          }
				});
			});
		}catch(error){
			console.log(error);
			return;
		}
	}
}

module.exports = parse_sqs();

以上

0
2
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
2