はじめに
「[[発表]Amazon API GatewayでWebsocketが利用可能 - AWS]([発表]Amazon API GatewayでWebsocketが利用可能 - AWS)」で紹介されている機能を使って、ルーム機能付きのchatを作ろうとしたときのメモです。
なお、ルーム機能なしであれば、AWSが「aws-samples/simple-websockets-chat-app - GitHub」としてコードを公開しています。
事前準備
AWSのアカウント取得や AWS CLI の設定については完了していることが前提です。
必要なコマンド
作業するにあたっては、次のようなコマンドが必要です。
- git
- aws cli : CloudFormation の操作に使用
- sam : AWS Serverless Application Model (SAM) の操作に使用
- awslogs : CloudWatch のログをローカルPCで確認するために使用
- wscat : WebSocket 通信のクライアントとして使用
動作仕様
全体的な仕様をまとめます。
基本機能
次の3つの機能の実装によって実現します。
- コネクション接続とルームへの参加
- メッセージの送信
- コネクション切断とルームからの退室
全体構成
システムの全体像は次の通りです。
テーブル
コネクション情報とルームの情報を保持するために、DynamoDBに次の2つのテーブルを作成します。
処理
各機能を実現する基本的な処理仕様を記載します。
コネクション接続とルームへの参加
本機能の実現にあたっては、次のような処理を実装します。
- コネクションを接続する。
- ルームへの参加する。
- ルーム参加者へメンバーの入室を通知する。
AWSのサンプルと違うところは、以下の2つです。
- ルームへの参加
- 入室済みメンバーへの通知
メッセージの送信
本機能の実現にあたっては、次のような処理を実装します。
- メッセージの送信する。
- ルームメンバーの確認する。
- ルーム参加者へメンバーへメッセージを送信する。
AWSのサンプルと違うところは、以下の2つです。
- ルームメンバーの取得
- ルームメンバーへの通知
コネクション切断とルームからの退室
本機能の実現にあたっては、次のような処理を実装します。
- ルームから退室する。
- ルームメンバーへメンバーの退室を通知する。
- コネクションを切断する
AWSのサンプルと違うところは、以下の2つです。
- ルームからの退室
- ルームメンバーへの退室の通知
実装
AWS上に実装するための情報を記載します。
全体像
基本的には、記事「【新機能】APIGatewayでWebSocketが利用可能になったのでチャットAPIを構築してみた - Qiita」で紹介されている通り、AWSのテンプレートを使用してまずは構築します。
この記事のままで動かない場合は、記事「WebSocket - AWSのサンプルで API Gateway を使ったchatアプリを作ろうとしたらハマった件」を参照してください。
IAM Role
IAMロールを作成し、次の Policy をアタッチします。
ポリシー名 | ポリシータイプ |
---|---|
AWSLambdaBasicExecutionRole | AWS 管理ポリシー |
chatFunctionRolePolicy0 | インラインポリシー |
chatFunctionRolePolicy1 | インラインポリシー |
{
"Statement": [
{
"Action": [
"dynamodb:GetItem",
"dynamodb:DeleteItem",
"dynamodb:PutItem",
"dynamodb:Scan",
"dynamodb:Query",
"dynamodb:UpdateItem",
"dynamodb:BatchWriteItem",
"dynamodb:BatchGetItem",
"dynamodb:DescribeTable"
],
"Resource": [
"arn:aws:dynamodb:ap-northeast-1:xxxxxxxxxxxx:table/simplechat_connections",
"arn:aws:dynamodb:ap-northeast-1:xxxxxxxxxxxx:table/simplechat_connections/index/*",
"arn:aws:dynamodb:ap-northeast-1:xxxxxxxxxxxx:table/simplechat_room",
"arn:aws:dynamodb:ap-northeast-1:xxxxxxxxxxxx:table/simplechat_room/index/*"
],
"Effect": "Allow"
}
]
}
{
"Statement": [
{
"Action": [
"execute-api:*"
],
"Resource": [
"arn:aws:execute-api:ap-northeast-1:xxxxxxxxxxxx:yyyyyyyy/*"
],
"Effect": "Allow"
}
]
}
API Gateway
API Gateway は次のように設定します。
ルート | 用途 | 統合タイプ | Lambda Region | Lambda関数 |
---|---|---|---|---|
$connect | WebSocket の開始時に使用する。 | Lambda関数 | ap-northeast-1 | onConnect関数 |
$disconnect | WebSocket の終了時に使用する。 | Lambda関数 | ap-northeast-1 | onDisconnect関数 |
senndmessage | WebSocket でメッセージを送信する時に使用する。 | Lambda関数 | ap-northeast-1 | sendMessage関数 |
AWS Lambda
AWS Lambda では、AWSのサンプルと同様に「Node.js 10.x」を使用します。
各関数では、次のモジュールを使用します。
module名 | 解説 |
---|---|
AWS.DynamoDB.DocumentClient | DynamoDBへ接続する際に使用します。 |
AWS.ApiGatewayManagementApi | API Gateway を介してメッセージを送信する際に使用します。 |
onconnect/app.js
const AWS = require("aws-sdk");
AWS.config.update({
region: process.env.AWS_REGION
});
var docClient = new AWS.DynamoDB.DocumentClient({
apiVersion: "2012-10-08"
});
const table_room = "simplechat_room";
exports.handler = async function (event, context, callback) {
console.info("START");
console.log("event: ", JSON.stringify(event));
console.log("context: ", JSON.stringify(context));
let connectionId = event.requestContext.connectionId;
// Join or New ?
// __________________________________________________
let roomId;
if (event.queryStringParameters) {
if (event.queryStringParameters.roomId) {
roomId = event.queryStringParameters.roomId;
}
}
console.log("roomId: ", roomId);
if (roomId === void 0) {
roomId = "test";
}
// Register WebSocket Connection ID
// __________________________________________________
var params = {
TableName: process.env.TABLE_NAME,
Item: {
connectionId: connectionId,
roomId: roomId
}
};
docClient.put(params, callback);
// Exist Room?
// __________________________________________________
var params = {
TableName: table_room,
Key: {
roomId: roomId
}
};
var room = await getRoom(params);
var users;
if (room.Item === void 0) {
// ルームが未作成の場合、新規で作成する。
console.log("room is not exist");
users = [{
connectionId: connectionId,
username: "taro",
icon: "avatar"
}];
var params = {
TableName: table_room,
Item: {
roomId: roomId,
connectionIds: users
}
};
docClient.put(params, callback);
} else {
// ルームが存在する場合、既存のルームに入室する。
console.log("room: ", JSON.stringify(room.Item));
console.log("connectionIds: ", JSON.stringify(room.Item.connectionIds));
room.Item.connectionIds.push({
connectionId: connectionId,
username: "taro",
icon: "avatar"
});
users = room.Item.connectionIds;
console.log("room is exist");
var params = {
TableName: table_room,
Item: {
roomId: roomId,
connectionIds: users
}
};
docClient.put(params, callback);
}
// Push Message
// __________________________________________________
const apigwManagementApi = new AWS.ApiGatewayManagementApi({
apiVersion: '2018-11-29',
endpoint: event.requestContext.domainName + '/' + event.requestContext.stage
});
console.log("users: ", JSON.stringify(users));
users.map(async ({
connectionId,
username,
icon
}) => {
console.log("connectionId: ", JSON.stringify(connectionId));
var message = {
type: "join",
member: {
connectionId: connectionId,
username: username,
icon: icon
},
room: {
users: users
}
}
try {
await apigwManagementApi.postToConnection({
ConnectionId: connectionId,
Data: JSON.stringify(message)
}).promise();
} catch (e) {
if (e.statusCode === 410) {
console.log(`Found stale connection, deleting ${connectionId}`);
await docClient.delete({
TableName: "simplechat_connections",
Key: {
connectionId
}
}).promise();
} else {
console.error("e: ", JSON.stringify(e));
throw e;
}
}
});
console.info("END");
return {
statusCode: 200,
body: 'Connection Success.'
};
};
/**
*
*/
function getRoom(params) {
return new Promise((resolve, reject) => {
docClient.get(params, function (err, data) {
if (err) {
console.log("getRoom(params) ERROR");
reject(err);
} else {
console.log("getRoom(params) SUCCESS");
resolve(data);
}
});
});
}
disconnect/app.js
const AWS = require("aws-sdk");
AWS.config.update({
region: process.env.AWS_REGION
});
//var DDB = new AWS.DynamoDB({ apiVersion: "2012-10-08" });
var docClient = new AWS.DynamoDB.DocumentClient({
apiVersion: "2012-10-08"
});
const table_room = "simplechat_room";
exports.handler = async function (event, context, callback) {
console.info("START");
console.log("event: ", JSON.stringify(event));
console.log("context: ", JSON.stringify(context));
//
// __________________________________________________
var params = {
TableName: process.env.TABLE_NAME,
Key: {
connectionId: event.requestContext.connectionId
}
};
var connection = await getRoom(params);
console.log("connection: ", JSON.stringify(connection));
var params = {
TableName: table_room,
Key: {
roomId: connection.Item.roomId,
}
};
var room = await getRoom(params);
console.log("room: ", JSON.stringify(room));
// 自分以外のメンバーを取得する。
const roomMembers = room.Item.connectionIds.filter((member) => {
return (member.connectionId != event.requestContext.connectionId);
});
console.log("I am : ", JSON.stringify(event.requestContext.connectionId));
console.log("roomMembers: ", JSON.stringify(roomMembers));
// 自分以外のメンバーで登録し直す
var params = {
TableName: table_room,
Item: {
roomId: connection.Item.roomId,
connectionIds: roomMembers
}
};
await docClient.put(params, callback).promise();
// connectionId を削除する。
var deleteParams = {
TableName: process.env.TABLE_NAME,
Key: {
connectionId: event.requestContext.connectionId
}
};
await docClient.delete(deleteParams, callback).promise();
// Push Message
// __________________________________________________
const apigwManagementApi = new AWS.ApiGatewayManagementApi({
apiVersion: '2018-11-29',
endpoint: event.requestContext.domainName + '/' + event.requestContext.stage
});
roomMembers.map(async ({
connectionId,
username,
icon
}) => {
console.log("connectionId: ", JSON.stringify(connectionId));
var message = {
type: "unjoin",
member: {
connectionId: connectionId,
username: username,
icon: icon
},
room: {
users: roomMembers
}
}
try {
await apigwManagementApi.postToConnection({
ConnectionId: connectionId,
Data: JSON.stringify(message)
}).promise();
} catch (e) {
if (e.statusCode === 410) {
console.log(`Found stale connection, deleting ${connectionId}`);
await docClient.delete({
TableName: "simplechat_connections",
Key: {
connectionId
}
}).promise();
} else {
console.error("e: ", JSON.stringify(e));
throw e;
}
}
});
console.info("END");
};
/**
*
*/
function getRoom(params) {
return new Promise((resolve, reject) => {
docClient.get(params, function (err, data) {
if (err) {
console.log("getRoom(params) ERROR");
reject(err);
} else {
console.log("getRoom(params) SUCCESS");
resolve(data);
}
});
});
}
sendmessage/app.js
const AWS = require('aws-sdk');
const docClient = new AWS.DynamoDB.DocumentClient({
apiVersion: '2012-08-10'
});
const {
TABLE_NAME
} = process.env;
const table_room = "simplechat_room";
exports.handler = async (event, context) => {
console.info("START");
console.log("event: ", JSON.stringify(event));
console.log("context: ", JSON.stringify(context));
//
// __________________________________________________
var params = {
TableName: process.env.TABLE_NAME,
Key: {
connectionId: event.requestContext.connectionId
}
};
var connection = await getRoom(params);
console.log("connection: ", JSON.stringify(connection));
var params = {
TableName: table_room,
Key: {
roomId: connection.Item.roomId,
}
};
var room = await getRoom(params);
console.log("room: ", JSON.stringify(room));
const apigwManagementApi = new AWS.ApiGatewayManagementApi({
apiVersion: '2018-11-29',
endpoint: event.requestContext.domainName + '/' + event.requestContext.stage
});
const postData = JSON.parse(event.body).data;
console.log("postData: ", JSON.stringify(postData));
const postCalls = room.Item.connectionIds.map(async ({
connectionId
}) => {
try {
await apigwManagementApi.postToConnection({
ConnectionId: connectionId,
Data: postData
}).promise();
} catch (e) {
if (e.statusCode === 410) {
console.log(`Found stale connection, deleting ${connectionId}`);
await docClient.delete({
TableName: TABLE_NAME,
Key: {
connectionId
}
}).promise();
} else {
throw e;
}
}
});
try {
await Promise.all(postCalls);
} catch (e) {
return {
statusCode: 500,
body: e.stack
};
}
console.info("END");
return {
statusCode: 200,
body: 'Data sent.'
};
};
/**
*
*/
function getRoom(params) {
return new Promise((resolve, reject) => {
docClient.get(params, function (err, data) {
if (err) {
console.log("getRoom(params) ERROR");
reject(err);
} else {
console.log("getRoom(params) SUCCESS");
resolve(data);
}
});
});
}
DynamoDB
DynamoDBには次の2つのテーブルを作成します。
テーブル名 | ID | 用途 |
---|---|---|
simplechat_connections | connectionId | コネクション情報の登録 |
simplechat_room | roomId | ルーム所属メンバの登録 |
デバッグ
ローカルPCのコードをコマンドで Lambda へデプロイする
// zip 圧縮する
compress-archive * ../function.zip
// デプロイする
aws lambda update-function-code `
--function-name [function name] `
--zip-file fileb://../function.zip `
--profile [profile name]
CloudWatch のログをローカルで確認する
awslogs get [Log Group Name] `
-w `
-s 10m `
-G `
-S `
--timestamp `
--profile [Profile Name]
動作検証
gif などの動作が分かる情報ではないので分かりづらいですが、次のように動作します。
// 接続とルームへの入室
> wscat -c wss://xxxxxxxxx.execute-api.ap-northeast-1.amazonaws.com/Prod?roomId=dklfajsd3938daf83dfa
connected (press CTRL+C to quit)
< {"type":"join","member":{"connectionId":"czeJQeqpNjMAc3g=","username":"taro","icon":"avatar"},"room":{"users":[{"icon":"avatar","connectionId":"czKiJe_BNjMCJew=","username":"taro"},{"icon":"avatar","connectionId":"czKxffiNNjMCF-w=","username":"taro"},]}}
// メッセージの送信
> {"message":"sendmessage", "data":"hello world"}
< hello world
// メンバーの退室通知
< {"type":"unjoin","member":{"connectionId":"czeJ0d6mNjMCFCw=","username":"taro","icon":"avatar"},"room":{"users":[{"icon":"avatar","connectionId":"czKiJe_BNjMCJew=","username":"taro"}]}}
おわりに
コードが整理できたらそれも掲載したい。。
参考
CloudWatch
- ECSからcloudwatchにながしたログをtailでみたい - Qiita
- ターミナルから直感的にCloudWatch Logsを検索できるawslogsコマンドの紹介 - Developers.IO
DynamoDB
- 【詳解】JavascriptでDynamoDBを操作する - Qiita
- AWS lambdaからdynamodbに接続: "AccessDeniedException"で怒られたのでポリシーを設定した - Qiita
- Class: AWS.DynamoDB.DocumentClient - AWS DynamoDB DocumentClient
- DynamoDBのテーブルに複雑な構造のドキュメントをputItem()/put()する - Qiita