概要
User A がメッセージを送信したときに User D にはメッセージが受信されずに User B と User C がメッセージ受信されるような実装をしてみます。
API Gateway の WebSocket 環境は以下の CDK をデプロイすることで簡単に作れます。
ただこれをそのまま使っても、送信したメッセージがすべてのユーザーに一斉送信されてしまうため改良していきます。
今回はチャットアプリのようなイメージで、部屋ごとにメッセージが送信されるような仕組みを作っていきます。
DynamoDB のテーブル設計
部屋ごとにメッセージ送信をするために roomId を GSI に追加します。
キー・インデックス | 物理名 | 説明 |
---|---|---|
パーティションキー | connectionId | websocket 接続すると発行されるセッションID API Gateway だと event.requestContext.connectionId に格納されている |
GSI | roomId | どの部屋から送信されたメッセージか識別するためのID ユーザーがメッセージ送信するときのパラメータ GSI で定義しているのは、 Lambda で sendmessage するときにフィルターするため |
CDK に roomId (GSI) を追加する場合は以下のようなコードを書きます。
// add global secandary index (GSI)
table.addGlobalSecondaryIndex({
indexName: 'roomIdIndex',
partitionKey: {
name: 'roomId',
type: AttributeType.STRING,
},
readCapacity: 1,
writeCapacity: 1,
projectionType: ProjectionType.ALL,
});
Lambda
部屋ごとにメッセージ送信する仕組みを記述します。
onconnect
QueryString で roomId を受け取り DynammoDB に格納します。これで どの部屋からのコネクションかを管理していきます。
const AWS = require('aws-sdk');
const ddb = new AWS.DynamoDB.DocumentClient({ apiVersion: '2012-08-10', region: process.env.AWS_REGION });
exports.handler = async (event) => {
const roomId = event.queryStringParameters.roomId || '';
const putParams = {
TableName: process.env.TABLE_NAME,
Item: {
connectionId: event.requestContext.connectionId,
roomId: roomId,
},
};
try {
await ddb.put(putParams).promise();
} catch (err) {
console.log(JSON.stringify(err));
return { statusCode: 500, body: 'Failed to connect: ' + JSON.stringify(err) };
}
return { statusCode: 200, body: 'Connected.' };
};
sendmessage
メッセージ送信するときは、コネクションが紐づく部屋、つまり roomId を取得し、一致する部屋に対してのみメッセージを一斉送信します。
const AWS = require('aws-sdk');
const ddb = new AWS.DynamoDB.DocumentClient({ apiVersion: '2012-08-10', region: process.env.AWS_REGION });
const { TABLE_NAME } = process.env;
exports.handler = async (event) => {
const roomId = JSON.parse(event.body).roomId;
let connectionData;
// roomId に紐づく コネクションID を取得
try {
const queryParams = {
TableName: TABLE_NAME,
IndexName: 'roomIdIndex',
ProjectionExpression: 'connectionId, roomId',
KeyConditionExpression: 'roomId = :roomId',
ExpressionAttributeValues: {
':roomId': roomId,
},
};
connectionData = await ddb.query(queryParams).promise();
} catch (e) {
console.log(e.stack);
return { statusCode: 500, body: e.stack };
}
// 取得した コネクションID にメッセージ一斉送信
const apigwManagementApi = new AWS.ApiGatewayManagementApi({
apiVersion: '2018-11-29',
endpoint: event.requestContext.domainName + '/' + event.requestContext.stage,
});
const postData = JSON.parse(event.body).data;
const postCalls = connectionData.Items.map(async ({ connectionId }) => {
try {
await apigwManagementApi.postToConnection({ ConnectionId: connectionId, Data: postData }).promise();
} catch (e) {
if (e.statusCode === 410) {
console.log(`Found stale connection, deleting ${connectionId}`);
await ddb.delete({ TableName: TABLE_NAME, Key: { connectionId } }).promise();
} else {
throw e;
}
}
});
try {
await Promise.all(postCalls);
} catch (e) {
console.log(e.stack);
return { statusCode: 500, body: e.stack };
}
return { statusCode: 200, body: 'Data sent.' };
};
ondisconnect
主キー(connectionId)で削除するだけなので、変更なし。