Amazon SQSと互換があるElasticMQをインストールし、ローカルにあるLambdaのソースコードと連携するローカル実行環境を構築します
softwaremill/elasticmq
ElasticMQのインストール
ElasticMQのインストール方法には複数ありますが、今回はDockerを使います。
特に、Docker環境としては、QNAPのNASにインストールします。理由は単にQNAP NASを利用しているから。
QNAPには、Docker環境として、ContainerStationがあります。
ElasticMQのDockerファイルは、リポジトリに対して、elasticmqの入力で検索にひっかかります。
Dockerに馴染みのある方は、以下がわかりやすいかと思います。
docker run -p 9324:9324 -p 9325:9325 softwaremill/elasticmq-native
これでもよいのですが、インストールされるホスト名がlocalhostとなってしまい、かっこ悪いので、設定ファイルを変更します。
(以降はQNAPでの操作です)
そこで、エントリポイントには以下を指定します。
/sbin/tini -- /opt/docker/bin/elasticmq-native-server -Dconfig.file=/opt2/elasticmq.conf -Dlogback.configurationFile=/opt/logback.xml
Configファイルは、「/opt2/elasticmq.conf」を参照するようにしています。
そして、/opt2をNAS上のフォルダに割り当てるようにします。
例:
ホストからのボリューム:/Container/elasticmq/opt2
マウントポイント:/opt2
あとは、/Container/elasticmq/opt2の下に、elasticmq.confを作成すればよいです。QNAPのアプリである「Text Editor」を使うのがよいかと思います。
include classpath("application.conf")
# What is the outside visible address of this ElasticMQ node
# Used to create the queue URL (may be different from bind address!)
node-address {
protocol = http
host = 【外部からアクセスされる際のホスト名】
port = 9324
context-path = ""
}
ネットワークは、「NAT」にして、コンテナ側のポート番号9324と9325はそのままホスト側に割り当てます。
以上で設定完了です。
インストール完了後、以下にアクセスすると、管理Webページが表示されます。
http://【外部からアクセスされる際のホスト名】:9325
Node.jsコード・AWS CLIの例
●キューの作成
(通常のキュー)
const SQS_ENDPOINT = "http://【外部からアクセスされる際のホスト名】:9324";
const SQS_QUEUE_NAME = "test";
const AWS = require('aws-sdk');
AWS.config.update({
// region: 'ap-northeast-1'
region: 'elasticmq'
});
const sqs = new AWS.SQS({
apiVersion: '2012-11-05',
endpoint: SQS_ENDPOINT
});
const params = {
QueueName: SQS_QUEUE_NAME,
Attributes: {
// FifoQueue: "true",
},
};
sqs.createQueue(params, (err, data) => {
if (err) {
console.log("Error", err);
return;
}
console.log("Success", JSON.stringify(data, null, '\t'));
});
(FIFOキュー)
const SQS_ENDPOINT = "http://【外部からアクセスされる際のホスト名】:9324";
const SQS_QUEUE_NAME = "test.fifo";
const AWS = require('aws-sdk');
AWS.config.update({
// region: 'ap-northeast-1'
region: 'elasticmq'
});
const sqs = new AWS.SQS({
apiVersion: '2012-11-05',
endpoint: SQS_ENDPOINT
});
const params = {
QueueName: SQS_QUEUE_NAME,
Attributes: {
FifoQueue: "true",
},
};
sqs.createQueue(params, (err, data) => {
if (err) {
console.log("Error", err);
return;
}
console.log("Success", JSON.stringify(data, null, '\t'));
});
> aws sqs create-queue --queue-name test --endpoint-url http://【外部からアクセスされる際のホスト名】:9324
●キューのリスト取得
const SQS_ENDPOINT = "http://【外部からアクセスされる際のホスト名】:9324";
const AWS = require('aws-sdk');
AWS.config.update({
// region: 'ap-northeast-1'
region: 'elasticmq'
});
const sqs = new AWS.SQS({
apiVersion: '2012-11-05',
endpoint: SQS_ENDPOINT
});
const params = {
};
sqs.listQueues(params, (err, data) => {
if (err) {
console.log("Error", err);
return;
}
console.log("Success", JSON.stringify(data, null, '\t'));
});
> aws sqs list-queues --endpoint-url http://【外部からアクセスされる際のホスト名】:9324
●キューの削除
const SQS_ENDPOINT = "http://【外部からアクセスされる際のホスト名】:9324";
const SQS_QUEUE_URL = "http://【外部からアクセスされる際のホスト名】:9324/000000000000/test";
const AWS = require('aws-sdk');
AWS.config.update({
// region: 'ap-northeast-1'
region: 'elasticmq'
});
const sqs = new AWS.SQS({
apiVersion: '2012-11-05',
endpoint: SQS_ENDPOINT
});
const params = {
QueueUrl: SQS_QUEUE_URL
};
sqs.deleteQueue(params, (err, data) => {
if (err) {
console.log("Error", err);
return;
}
console.log("Success", data);
});
> aws sqs delete-queue --queue-url http://【外部からアクセスされる際のホスト名】:9324/000000000000/test --endpoint-url http://【外部からアクセスされる際のホスト名】:9324
●メッセージの送信
const SQS_ENDPOINT = "http://【外部からアクセスされる際のホスト名】:9324";
const SQS_QUEUE_URL = "http://【外部からアクセスされる際のホスト名】:9324/000000000000/test";
const AWS = require('aws-sdk');
AWS.config.update({
// region: 'ap-northeast-1'
region: 'elasticmq'
});
const sqs = new AWS.SQS({
apiVersion: '2012-11-05',
endpoint: SQS_ENDPOINT
});
const params = {
QueueUrl: SQS_QUEUE_URL,
MessageBody: "This is test message.",
MessageAttributes: {
},
};
sqs.sendMessage(params, (err, data) => {
if (err) {
console.log("Error", err);
return;
}
console.log("Success", data);
});
> aws sqs send-message --queue-url http://【外部からアクセスされる際のホスト名】:9324/000000000000/test --message-body "This is test message." --endpoint-url http://【外部からアクセスされる際のホスト名】:9324
●メッセージの受信
const SQS_ENDPOINT = "http://【外部からアクセスされる際のホスト名】:9324";
const SQS_QUEUE_URL = "http://【外部からアクセスされる際のホスト名】:9324/000000000000/test";
const AWS = require('aws-sdk');
AWS.config.update({
// region: 'ap-northeast-1'
region: 'elasticmq'
});
const sqs = new AWS.SQS({
apiVersion: '2012-11-05',
endpoint: SQS_ENDPOINT
});
const params = {
QueueUrl: SQS_QUEUE_URL,
MaxNumberOfMessages: 10,
MessageAttributeNames: [
"All"
],
VisibilityTimeout: 30,
WaitTimeSeconds: 20
};
sqs.receiveMessage(params, (err, data) => {
if (err) {
console.log("Receive Error", err);
return;
}
console.log(JSON.stringify(data, null, "\t"));
});
> aws sqs receive-message --queue-url http://【外部からアクセスされる際のホスト名】:9324/000000000000/test --endpoint-url http://【外部からアクセスされる際のホスト名】:9324
●メッセージの削除
const SQS_ENDPOINT = "http://【外部からアクセスされる際のホスト名】:9324";
const SQS_QUEUE_URL = "http://【外部からアクセスされる際のホスト名】:9324/000000000000/test";
const SQS_RECEIPT_HANDLE = "【送信した後に取得されるMessageId】";
const AWS = require('aws-sdk');
AWS.config.update({
// region: 'ap-northeast-1'
region: 'elasticmq'
});
const sqs = new AWS.SQS({
apiVersion: '2012-11-05',
endpoint: SQS_ENDPOINT
});
const params = {
QueueUrl: SQS_QUEUE_URL,
ReceiptHandle: SQS_RECEIPT_HANDLE
};
sqs.deleteMessage(params, (err, data) => {
if (err) {
console.log("Error", err);
return;
}
console.log("Success", data);
});
> aws sqs delete-message --queue-url http://【外部からアクセスされる際のホスト名】:9324/000000000000/test --receipt-handle 【送信した後に取得されるMessageId】--endpoint-url http://【外部からアクセスされる際のホスト名】:9324
メッセージ受信時にLambdaと連携する
面倒なので、簡単に設定できるようにしておきました。
使い方だけ。
以下に実装済みです。
poruruba/amplify_template
ZIPダウンロード後展開します。
> unzip amplify_template-master.zip
> cd amplify_template-master
> npm install
> npm install aws-sdk --save-dev
> vi .env
下記は、以降で作成するLambdaで共通で使われます。
ROUTING_SQS_ENDPOINT=http://【外部からアクセスされる際のホスト名】:9324
Lambdaのロジックは以下のように作成します。
> cd api/controllers
> mkdir test-sqs
> cd test-sqs
> vi sqs.json
[
{
"enable": true,
"QueueUrl": "http://【外部からアクセスされる際のホスト名】:9324/000000000000/【キュー名】"
}
]
Lambdaのロジックを実装します。以下例です。
> vi index.js
'use strict';
exports.handler = async (event, context, callback) => {
console.log(JSON.stringify(event, null, '\t'));
return {};
};
これで完了です。
以下のように実行することで、ローカル実行環境が起動します。
> node app.js
sqs_send_message.js で送信すると、受信されてLambdaのロジック(index.js)が起動するかと思います。
内部で以下のような仲介処理を入れることで実現しています。
'use strict';
const THIS_BASE_PATH = process.env.THIS_BASE_PATH;
const CONTROLLERS_BASE = THIS_BASE_PATH + '/api/controllers/';
//const SQS_REGION = "ap-northeast-1";
const SQS_REGION = "elasticmq";
const SQS_ENDPOINT = process.env.ROUTING_SQS_ENDPOINT;
const SQS_TARGET_FNAME = "sqs.json";
const DEFAULT_HANDLER = "handler";
const AWS = require('aws-sdk');
AWS.config.update({
region: SQS_REGION
});
const sqs = new AWS.SQS({
apiVersion: '2012-11-05',
endpoint: SQS_ENDPOINT
});
const fs = require('fs');
function parse_sqs() {
// sqs.jsonの検索
const folders = fs.readdirSync(CONTROLLERS_BASE);
folders.forEach(folder => {
try {
const stats_dir = fs.statSync(CONTROLLERS_BASE + folder);
if (!stats_dir.isDirectory())
return;
const fname = CONTROLLERS_BASE + folder + "/" + SQS_TARGET_FNAME;
if (!fs.existsSync(fname))
return;
const stats_file = fs.statSync(fname);
if (!stats_file.isFile())
return;
// pollerの登録
const defs = JSON.parse(fs.readFileSync(fname).toString());
parse_sqs_json(defs, CONTROLLERS_BASE + folder, folder);
} catch (error) {
console.log(error);
}
});
}
function parse_sqs_json(defs, folder, folder_name) {
defs.forEach(item => {
if (!item.enable )
return;
const handler = item.handler || DEFAULT_HANDLER;
const proc = require(folder)[handler];
const params = {
QueueUrl: item.QueueUrl,
AttributeNames: item.AttributeNames,
MaxNumberOfMessages: (item.MaxNumberOfMessages === undefined) ? 1 : item.MaxNumberOfMessages,
MessageAttributeNames: item.MessageAttributeNames,
VisibilityTimeout: (item.VisibilityTimeout === undefined) ? 30 : item.VisibilityTimeout,
WaitTimeSeconds: (item.WaitTimeSeconds === undefined) ? 20: item.WaitTimeSeconds
};
continuousReceive(params, proc);
console.log("sqs(" + item.QueueUrl + ") " + handler + ' ' + folder_name);
});
}
async function continuousReceive(params, func){
while(true){
try{
await new Promise((resolve, reject) =>{
sqs.receiveMessage(params, async (err, data) => {
if (err) {
return reject(err);
}
if( !data.Messages ){
return resolve('message empty');
}
try{
const event = {
Records: data.Messages.map(item => {
return {
messageId: item.MessageId,
receiptHandle: item.ReceiptHandle,
body :item.Body,
attributes: item.Attributes,
messageAttributes: item.MessageAttributes,
md5OfBody: item.MD5OfBody,
md5OfMessageAttributes: item.MD5OfMessageAttributes,
awsRegion: SQS_REGION
}
})
};
const success_return = (msg) => {
if( data.Messages ){
const deleteParams = {
QueueUrl: params.QueueUrl,
Entries: data.Messages.map(item => { return { Id: item.MessageId, ReceiptHandle: item.ReceiptHandle } } )
};
sqs.deleteMessageBatch(deleteParams, (err, data) => {
if (err) {
return reject(err);
} else {
return resolve();
}
});
}
};
const context = {
succeed: success_return,
fail: (error) => resolve(error)
};
const task = func(event, context, (error, response) => {
if (error) return resolve(error);
return success_return(response);
});
if (task instanceof Promise || (task && typeof task.then === 'function')) {
return task.then(ret => {
if (ret)
return success_return(ret);
})
.catch(err => resolve(err));
}else{
if( task !== undefined )
success_return(task);
}
}catch(error){
console.log(error);
}
});
});
}catch(error){
console.log(error);
return;
}
}
}
module.exports = parse_sqs();
以上