やること
- TwitterのストリームデータをKinesisを経由してDynamoDBに保存します。
- DynamoDB Streamにデータを流し、ブラウザでストリームデータを可視化する
構成図
Twitter Stream → Kinesis → DynamoDB
AWS Lambda Reference Architecture: Real-time Stream Processing
awslabsのアカウントにそのままやりたいことがありました。動かすとKinesisにデータを流して、キャプチャの通りDynamoDBにデータを保存してくれます。
DynamoDB → DynamoDB Stream → Lambda → SNS http Notification
DynamoDB Streamの有効化
キャプチャの箇所からDynamoDB Streamを有効化します
トリガーを有効化して、DynamoDB Streamから流れてきたデータを処理するLambdaファンクションを指定します
DynamoDBトリガーLambdaファンクション
以下のLambdaファンクションでDynamoDB Streamのデータを処理します。
受け取ったデータをSNSにpublishします。
var aws = require('aws-sdk');
var sns = new aws.SNS();
exports.handler = function(event, context) {
event.Records.forEach(function(record) {
console.log('DynamoDB Record: %j', record.dynamodb);
sns.publish({
Message: JSON.stringify(record.dynamodb),
Subject: 'message title ',
TopicArn: '<SNS TopicのARN>'
}, function(err, data) {
if (err) {
console.log(err.stack);
context.done(err, 'Errors!');
return;
}
console.log(data);
// Notify Lambda that we are finished
context.done(null, 'push sent');
});
});
};
SNS http Notification → Socket.io → ブラウザ
SNS http Notificationを設定します。そしてデータをSocket.ioで受け取った後、ブラウザにデータを流します。
Socket.ioサーバ
[Node.js]Amazon SNSでHTTPを使って通知を受け取る[aws-sdk-js]
を参考にSNS http NotificationのエンドポイントとなるSocket.ioサーバを立てます
ソースコードは以下のとおり
var http = require('http');
var url = require("url");
var async = require('async');
var AWS = require('aws-sdk');
AWS.config.loadFromPath('./config/aws.json');
var sns = new AWS.SNS();
var httpServer = http.createServer(handler);
var io = require('socket.io').listen(httpServer);
var fs = require('fs');
//asyncを使って順番に処理を実行
async.series([
//HTTPサーバ起動
function (callback) {
httpServer.listen(3000);
callback(null, 1);
},
//subscrive開始
function (callback) {
initSubscriber(callback);
callback(null, 2);
}
], function (err, results) {
if (err) {
throw err;
}
});
function handler(req, res) {
var path = url.parse(req.url).pathname;
if (path === "/httpsns") {
var body = '';
req.on('data', function (data) {
body += data;
});
req.on('end', function () {
res.writeHead(200, {
'Content-Type':'text/html'
});
var obj = JSON.parse(body);
if (obj.Type === "SubscriptionConfirmation") {
sns.confirmSubscription({ TopicArn:obj.TopicArn, Token:obj.Token}, function (err, data) {
console.log("confirmSubscription");
});
} else if (obj.Type === "Notification" && obj.Message !== undefined) {
console.log(obj.Subject + ":" + obj.Message);
io.sockets.emit('msg', obj.Message);
}
res.end('OK');
});
} else {
res.writeHead(200, {'Content-Type':'text/html'});
res.end(fs.readFileSync('index.html'));
}
}
function initSubscriber(callback) {
var args = {
TopicArn:"<SNSトピックのARN>",
Protocol:'http',
Endpoint:"http://<host>:3000/httpsns"
};
sns.subscribe(args, function (err, data) {
console.log("subscribe start.");
});
}
以下のコマンドでサーバを起動します
$ node app.js
Socket.ioクライアント
<!DOCTYPE html>
<meta charset="UTF-8">
<title>chat</title>
<script src="/socket.io/socket.io.js"></script>
<script src="//ajax.googleapis.com/ajax/libs/jquery/1.10.2/jquery.min.js"></script>
<script>
$(function() {
var socket = io.connect();
socket.on('msg', function(data) {
console.log(data);
$('div').prepend(data + '<br>');
});
});
</script>
<h2>ツイート情報</h2></h2><div></div>
動作確認
http://<host>:3000 にブラウザでアクセスすると以下のとおりデータがリアルタイムに流れてきます。