24
24

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

DynamoDB Streamのデータをsocket.ioを使ってブラウザに流す

Posted at

やること

  • TwitterのストリームデータをKinesisを経由してDynamoDBに保存します。
  • DynamoDB Streamにデータを流し、ブラウザでストリームデータを可視化する

構成図

構成図 (1).png

Twitter Stream → Kinesis → DynamoDB

AWS Lambda Reference Architecture: Real-time Stream Processing
awslabsのアカウントにそのままやりたいことがありました。動かすとKinesisにデータを流して、キャプチャの通りDynamoDBにデータを保存してくれます。

スクリーンショット 2016-03-28 22.05.39.png

DynamoDB → DynamoDB Stream → Lambda → SNS http Notification

DynamoDB Streamの有効化

キャプチャの箇所からDynamoDB Streamを有効化します
スクリーンショット 2016-03-28 22.08.44.png

トリガーを有効化して、DynamoDB Streamから流れてきたデータを処理するLambdaファンクションを指定します
スクリーンショット 2016-03-28 22.09.29.png

DynamoDBトリガーLambdaファンクション

以下のLambdaファンクションでDynamoDB Streamのデータを処理します。
受け取ったデータをSNSにpublishします。

dynamodbTrigger
var aws = require('aws-sdk');

var sns = new aws.SNS();

exports.handler = function(event, context) {
    event.Records.forEach(function(record) {
        console.log('DynamoDB Record: %j', record.dynamodb);
        sns.publish({
        Message: JSON.stringify(record.dynamodb),
        Subject: 'message title ',
        TopicArn: '<SNS TopicのARN>'
        }, function(err, data) {
            if (err) {
                console.log(err.stack);
                context.done(err, 'Errors!');  
                return;
            }

            console.log(data);

            // Notify Lambda that we are finished
            context.done(null, 'push sent');  
        });
    });
};

SNS http Notification → Socket.io → ブラウザ

SNS http Notificationを設定します。そしてデータをSocket.ioで受け取った後、ブラウザにデータを流します。
スクリーンショット 2016-03-28 22.18.22.png

Socket.ioサーバ

[Node.js]Amazon SNSでHTTPを使って通知を受け取る[aws-sdk-js]
を参考にSNS http NotificationのエンドポイントとなるSocket.ioサーバを立てます
ソースコードは以下のとおり

app.js
var http = require('http');
var url = require("url");
var async = require('async');
var AWS = require('aws-sdk');
 
AWS.config.loadFromPath('./config/aws.json');
var sns = new AWS.SNS();
 
var httpServer = http.createServer(handler);
var io = require('socket.io').listen(httpServer);
var fs = require('fs');
//asyncを使って順番に処理を実行
async.series([
    //HTTPサーバ起動
    function (callback) {
        httpServer.listen(3000);
        callback(null, 1);
    },
    //subscrive開始
    function (callback) {
        initSubscriber(callback);
        callback(null, 2);
    }
], function (err, results) {
    if (err) {
        throw err;
    }
});
 
function handler(req, res) {
    var path = url.parse(req.url).pathname;
 
    if (path === "/httpsns") {
        var body = '';
        req.on('data', function (data) {
            body += data;
        });
        req.on('end', function () {
            res.writeHead(200, {
                'Content-Type':'text/html'
            });
 
            var obj = JSON.parse(body);
            if (obj.Type === "SubscriptionConfirmation") {
                sns.confirmSubscription({ TopicArn:obj.TopicArn, Token:obj.Token}, function (err, data) {
                    console.log("confirmSubscription");
                });
            } else if (obj.Type === "Notification" && obj.Message !== undefined) {
                console.log(obj.Subject + ":" + obj.Message);
    			io.sockets.emit('msg', obj.Message);
            }
            res.end('OK');
        });
    } else {
        res.writeHead(200, {'Content-Type':'text/html'});
        res.end(fs.readFileSync('index.html'));
    }
}
 
function initSubscriber(callback) {
    var args = {
        TopicArn:"<SNSトピックのARN>",
        Protocol:'http',
        Endpoint:"http://<host>:3000/httpsns"
    };
 
    sns.subscribe(args, function (err, data) {
        console.log("subscribe start.");        
    });
}

以下のコマンドでサーバを起動します

$ node app.js

Socket.ioクライアント

index.html
<!DOCTYPE html>
<meta charset="UTF-8">
<title>chat</title>
<script src="/socket.io/socket.io.js"></script>
<script src="//ajax.googleapis.com/ajax/libs/jquery/1.10.2/jquery.min.js"></script>
<script>
$(function() {
  var socket = io.connect();

  socket.on('msg', function(data) {
        console.log(data);
    $('div').prepend(data + '<br>');

  });
});
</script>
<h2>ツイート情報</h2></h2><div></div>

動作確認

http://<host>:3000 にブラウザでアクセスすると以下のとおりデータがリアルタイムに流れてきます。
DynamoDB Streamのデータをsocket.io経由でブラウザに流す

24
24
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
24
24

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?