2
2

API Gateway の WebSocket で connectionId を部屋ごとに管理するチャットアプリ実装サンプル

Last updated at Posted at 2022-09-11

概要

User A がメッセージを送信したときに User D にはメッセージが受信されずに User B と User C がメッセージ受信されるような実装をしてみます。

API Gateway の WebSocket 環境は以下の CDK をデプロイすることで簡単に作れます。
ただこれをそのまま使っても、送信したメッセージがすべてのユーザーに一斉送信されてしまうため改良していきます。
今回はチャットアプリのようなイメージで、部屋ごとにメッセージが送信されるような仕組みを作っていきます。

DynamoDB のテーブル設計

部屋ごとにメッセージ送信をするために roomId を GSI に追加します。

キー・インデックス 物理名 説明
パーティションキー connectionId websocket 接続すると発行されるセッションID
API Gateway だと event.requestContext.connectionId に格納されている
GSI roomId どの部屋から送信されたメッセージか識別するためのID
ユーザーがメッセージ送信するときのパラメータ
GSI で定義しているのは、 Lambda で sendmessage するときにフィルターするため

CDK に roomId (GSI) を追加する場合は以下のようなコードを書きます。

// add global secandary index (GSI)
table.addGlobalSecondaryIndex({
  indexName: 'roomIdIndex',
  partitionKey: {
    name: 'roomId',
    type: AttributeType.STRING,
  },
  readCapacity: 1,
  writeCapacity: 1,
  projectionType: ProjectionType.ALL,
});

Lambda

部屋ごとにメッセージ送信する仕組みを記述します。

onconnect

QueryString で roomId を受け取り DynammoDB に格納します。これで どの部屋からのコネクションかを管理していきます。

const AWS = require('aws-sdk');

const ddb = new AWS.DynamoDB.DocumentClient({ apiVersion: '2012-08-10', region: process.env.AWS_REGION });

exports.handler = async (event) => {
  const roomId = event.queryStringParameters.roomId || '';
  const putParams = {
    TableName: process.env.TABLE_NAME,
    Item: {
      connectionId: event.requestContext.connectionId,
      roomId: roomId,
    },
  };

  try {
    await ddb.put(putParams).promise();
  } catch (err) {
    console.log(JSON.stringify(err));
    return { statusCode: 500, body: 'Failed to connect: ' + JSON.stringify(err) };
  }

  return { statusCode: 200, body: 'Connected.' };
};

sendmessage

メッセージ送信するときは、コネクションが紐づく部屋、つまり roomId を取得し、一致する部屋に対してのみメッセージを一斉送信します。

const AWS = require('aws-sdk');

const ddb = new AWS.DynamoDB.DocumentClient({ apiVersion: '2012-08-10', region: process.env.AWS_REGION });

const { TABLE_NAME } = process.env;

exports.handler = async (event) => {
  const roomId = JSON.parse(event.body).roomId;
  let connectionData;

  // roomId に紐づく コネクションID を取得
  try {
    const queryParams = {
      TableName: TABLE_NAME,
      IndexName: 'roomIdIndex',
      ProjectionExpression: 'connectionId, roomId',
      KeyConditionExpression: 'roomId = :roomId',
      ExpressionAttributeValues: {
        ':roomId': roomId,
      },
    };
    connectionData = await ddb.query(queryParams).promise();
  } catch (e) {
    console.log(e.stack);
    return { statusCode: 500, body: e.stack };
  }

  // 取得した コネクションID にメッセージ一斉送信
  const apigwManagementApi = new AWS.ApiGatewayManagementApi({
    apiVersion: '2018-11-29',
    endpoint: event.requestContext.domainName + '/' + event.requestContext.stage,
  });

  const postData = JSON.parse(event.body).data;

  const postCalls = connectionData.Items.map(async ({ connectionId }) => {
    try {
      await apigwManagementApi.postToConnection({ ConnectionId: connectionId, Data: postData }).promise();
    } catch (e) {
      if (e.statusCode === 410) {
        console.log(`Found stale connection, deleting ${connectionId}`);
        await ddb.delete({ TableName: TABLE_NAME, Key: { connectionId } }).promise();
      } else {
        throw e;
      }
    }
  });

  try {
    await Promise.all(postCalls);
  } catch (e) {
    console.log(e.stack);
    return { statusCode: 500, body: e.stack };
  }

  return { statusCode: 200, body: 'Data sent.' };
};

ondisconnect

主キー(connectionId)で削除するだけなので、変更なし。

2
2
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
2