LoginSignup
12
14

More than 3 years have passed since last update.

WebSocket - AWS の API Gateway と Lambda でルーム機能付きのchatを作る時の仕様を考える

Last updated at Posted at 2019-07-14

はじめに

[発表]Amazon API GatewayでWebsocketが利用可能 - AWS」で紹介されている機能を使って、ルーム機能付きのchatを作ろうとしたときのメモです。

なお、ルーム機能なしであれば、AWSが「aws-samples/simple-websockets-chat-app - GitHub」としてコードを公開しています。

事前準備

AWSのアカウント取得や AWS CLI の設定については完了していることが前提です。

必要なコマンド

作業するにあたっては、次のようなコマンドが必要です。

  • git
  • aws cli : CloudFormation の操作に使用
  • sam : AWS Serverless Application Model (SAM) の操作に使用
  • awslogs : CloudWatch のログをローカルPCで確認するために使用
  • wscat : WebSocket 通信のクライアントとして使用

動作仕様

全体的な仕様をまとめます。

基本機能

次の3つの機能の実装によって実現します。

  1. コネクション接続とルームへの参加
  2. メッセージの送信
  3. コネクション切断とルームからの退室

全体構成

システムの全体像は次の通りです。

image.png

テーブル

コネクション情報とルームの情報を保持するために、DynamoDBに次の2つのテーブルを作成します。

image.png

処理

各機能を実現する基本的な処理仕様を記載します。

コネクション接続とルームへの参加

本機能の実現にあたっては、次のような処理を実装します。

  1. コネクションを接続する。
  2. ルームへの参加する。
  3. ルーム参加者へメンバーの入室を通知する。

image.png

AWSのサンプルと違うところは、以下の2つです。

  • ルームへの参加
  • 入室済みメンバーへの通知

メッセージの送信

本機能の実現にあたっては、次のような処理を実装します。

  1. メッセージの送信する。
  2. ルームメンバーの確認する。
  3. ルーム参加者へメンバーへメッセージを送信する。

image.png

AWSのサンプルと違うところは、以下の2つです。

  • ルームメンバーの取得
  • ルームメンバーへの通知

コネクション切断とルームからの退室

本機能の実現にあたっては、次のような処理を実装します。

  1. ルームから退室する。
  2. ルームメンバーへメンバーの退室を通知する。
  3. コネクションを切断する

image.png

AWSのサンプルと違うところは、以下の2つです。

  • ルームからの退室
  • ルームメンバーへの退室の通知

実装

AWS上に実装するための情報を記載します。

全体像

基本的には、記事「【新機能】APIGatewayでWebSocketが利用可能になったのでチャットAPIを構築してみた - Qiita」で紹介されている通り、AWSのテンプレートを使用してまずは構築します。

この記事のままで動かない場合は、記事「WebSocket - AWSのサンプルで API Gateway を使ったchatアプリを作ろうとしたらハマった件」を参照してください。

IAM Role

IAMロールを作成し、次の Policy をアタッチします。

ポリシー名 ポリシータイプ
AWSLambdaBasicExecutionRole AWS 管理ポリシー
chatFunctionRolePolicy0 インラインポリシー
chatFunctionRolePolicy1 インラインポリシー
chatFunctionRolePolicy0
{
    "Statement": [
        {
            "Action": [
                "dynamodb:GetItem",
                "dynamodb:DeleteItem",
                "dynamodb:PutItem",
                "dynamodb:Scan",
                "dynamodb:Query",
                "dynamodb:UpdateItem",
                "dynamodb:BatchWriteItem",
                "dynamodb:BatchGetItem",
                "dynamodb:DescribeTable"
            ],
            "Resource": [
                "arn:aws:dynamodb:ap-northeast-1:xxxxxxxxxxxx:table/simplechat_connections",
                "arn:aws:dynamodb:ap-northeast-1:xxxxxxxxxxxx:table/simplechat_connections/index/*",
                "arn:aws:dynamodb:ap-northeast-1:xxxxxxxxxxxx:table/simplechat_room",
                "arn:aws:dynamodb:ap-northeast-1:xxxxxxxxxxxx:table/simplechat_room/index/*"
            ],
            "Effect": "Allow"
        }
    ]
}
chatFunctionRolePolicy1
{
    "Statement": [
        {
            "Action": [
                "execute-api:*"
            ],
            "Resource": [
                "arn:aws:execute-api:ap-northeast-1:xxxxxxxxxxxx:yyyyyyyy/*"
            ],
            "Effect": "Allow"
        }
    ]
}

API Gateway

API Gateway は次のように設定します。

ルート 用途 統合タイプ Lambda Region Lambda関数
$connect WebSocket の開始時に使用する。 Lambda関数 ap-northeast-1 onConnect関数
$disconnect WebSocket の終了時に使用する。 Lambda関数 ap-northeast-1 onDisconnect関数
senndmessage WebSocket でメッセージを送信する時に使用する。 Lambda関数 ap-northeast-1 sendMessage関数

AWS Lambda

AWS Lambda では、AWSのサンプルと同様に「Node.js 10.x」を使用します。

各関数では、次のモジュールを使用します。

module名 解説
AWS.DynamoDB.DocumentClient DynamoDBへ接続する際に使用します。
AWS.ApiGatewayManagementApi API Gateway を介してメッセージを送信する際に使用します。

onconnect/app.js
onconnect/app.js
const AWS = require("aws-sdk");
AWS.config.update({
  region: process.env.AWS_REGION
});
var docClient = new AWS.DynamoDB.DocumentClient({
  apiVersion: "2012-10-08"
});

const table_room = "simplechat_room";

exports.handler = async function (event, context, callback) {
  console.info("START");
  console.log("event: ", JSON.stringify(event));
  console.log("context: ", JSON.stringify(context));

  let connectionId = event.requestContext.connectionId;

  // Join or New ?
  // __________________________________________________
  let roomId;
  if (event.queryStringParameters) {
    if (event.queryStringParameters.roomId) {
      roomId = event.queryStringParameters.roomId;
    }
  }
  console.log("roomId: ", roomId);
  if (roomId === void 0) {
    roomId = "test";
  }

  // Register WebSocket Connection ID
  // __________________________________________________
  var params = {
    TableName: process.env.TABLE_NAME,
    Item: {
      connectionId: connectionId,
      roomId: roomId
    }
  };
  docClient.put(params, callback);

  // Exist Room?
  // __________________________________________________
  var params = {
    TableName: table_room,
    Key: {
      roomId: roomId
    }
  };
  var room = await getRoom(params);
  var users;
  if (room.Item === void 0) {
    // ルームが未作成の場合、新規で作成する。

    console.log("room is not exist");
    users = [{
      connectionId: connectionId,
      username: "taro",
      icon: "avatar"
    }];
    var params = {
      TableName: table_room,
      Item: {
        roomId: roomId,
        connectionIds: users
      }
    };
    docClient.put(params, callback);
  } else {
    // ルームが存在する場合、既存のルームに入室する。

    console.log("room: ", JSON.stringify(room.Item));
    console.log("connectionIds: ", JSON.stringify(room.Item.connectionIds));

    room.Item.connectionIds.push({
      connectionId: connectionId,
      username: "taro",
      icon: "avatar"
    });
    users = room.Item.connectionIds;

    console.log("room is exist");
    var params = {
      TableName: table_room,
      Item: {
        roomId: roomId,
        connectionIds: users
      }
    };
    docClient.put(params, callback);
  }

  // Push Message
  // __________________________________________________
  const apigwManagementApi = new AWS.ApiGatewayManagementApi({
    apiVersion: '2018-11-29',
    endpoint: event.requestContext.domainName + '/' + event.requestContext.stage
  });
  console.log("users: ", JSON.stringify(users));
  users.map(async ({
    connectionId,
    username,
    icon
  }) => {
    console.log("connectionId: ", JSON.stringify(connectionId));
    var message = {
      type: "join",
      member: {
        connectionId: connectionId,
        username: username,
        icon: icon
      },
      room: {
        users: users
      }
    }
    try {
      await apigwManagementApi.postToConnection({
        ConnectionId: connectionId,
        Data: JSON.stringify(message)
      }).promise();
    } catch (e) {
      if (e.statusCode === 410) {
        console.log(`Found stale connection, deleting ${connectionId}`);
        await docClient.delete({
          TableName: "simplechat_connections",
          Key: {
            connectionId
          }
        }).promise();
      } else {
        console.error("e: ", JSON.stringify(e));
        throw e;
      }
    }
  });

  console.info("END");
  return {
    statusCode: 200,
    body: 'Connection Success.'
  };
};

/**
 * 
 */
function getRoom(params) {
  return new Promise((resolve, reject) => {

    docClient.get(params, function (err, data) {
      if (err) {
        console.log("getRoom(params) ERROR");
        reject(err);
      } else {
        console.log("getRoom(params) SUCCESS");
        resolve(data);
      }
    });
  });
}

disconnect/app.js
disconnect/app.js
const AWS = require("aws-sdk");
AWS.config.update({
  region: process.env.AWS_REGION
});
//var DDB = new AWS.DynamoDB({ apiVersion: "2012-10-08" });
var docClient = new AWS.DynamoDB.DocumentClient({
  apiVersion: "2012-10-08"
});

const table_room = "simplechat_room";

exports.handler = async function (event, context, callback) {
  console.info("START");
  console.log("event: ", JSON.stringify(event));
  console.log("context: ", JSON.stringify(context));

  // 
  // __________________________________________________
  var params = {
    TableName: process.env.TABLE_NAME,
    Key: {
      connectionId: event.requestContext.connectionId
    }
  };
  var connection = await getRoom(params);
  console.log("connection: ", JSON.stringify(connection));

  var params = {
    TableName: table_room,
    Key: {
      roomId: connection.Item.roomId,
    }
  };
  var room = await getRoom(params);
  console.log("room: ", JSON.stringify(room));

  // 自分以外のメンバーを取得する。
  const roomMembers = room.Item.connectionIds.filter((member) => {
    return (member.connectionId != event.requestContext.connectionId);
  });
  console.log("I am : ", JSON.stringify(event.requestContext.connectionId));
  console.log("roomMembers: ", JSON.stringify(roomMembers));

  // 自分以外のメンバーで登録し直す
  var params = {
    TableName: table_room,
    Item: {
      roomId: connection.Item.roomId,
      connectionIds: roomMembers
    }
  };
  await docClient.put(params, callback).promise();

  // connectionId を削除する。
  var deleteParams = {
    TableName: process.env.TABLE_NAME,
    Key: {
      connectionId: event.requestContext.connectionId
    }
  };
  await docClient.delete(deleteParams, callback).promise();

  // Push Message
  // __________________________________________________
  const apigwManagementApi = new AWS.ApiGatewayManagementApi({
    apiVersion: '2018-11-29',
    endpoint: event.requestContext.domainName + '/' + event.requestContext.stage
  });
  roomMembers.map(async ({
    connectionId,
    username,
    icon
  }) => {
    console.log("connectionId: ", JSON.stringify(connectionId));
    var message = {
      type: "unjoin",
      member: {
        connectionId: connectionId,
        username: username,
        icon: icon
      },
      room: {
        users: roomMembers
      }
    }
    try {
      await apigwManagementApi.postToConnection({
        ConnectionId: connectionId,
        Data: JSON.stringify(message)
      }).promise();
    } catch (e) {
      if (e.statusCode === 410) {
        console.log(`Found stale connection, deleting ${connectionId}`);
        await docClient.delete({
          TableName: "simplechat_connections",
          Key: {
            connectionId
          }
        }).promise();
      } else {
        console.error("e: ", JSON.stringify(e));
        throw e;
      }
    }
  });


  console.info("END");
};

/**
 * 
 */
function getRoom(params) {
  return new Promise((resolve, reject) => {

    docClient.get(params, function (err, data) {
      if (err) {
        console.log("getRoom(params) ERROR");
        reject(err);
      } else {
        console.log("getRoom(params) SUCCESS");
        resolve(data);
      }
    });
  });
}

sendmessage/app.js
sendmessage/app.js
const AWS = require('aws-sdk');
const docClient = new AWS.DynamoDB.DocumentClient({
  apiVersion: '2012-08-10'
});
const {
  TABLE_NAME
} = process.env;

const table_room = "simplechat_room";

exports.handler = async (event, context) => {
  console.info("START");
  console.log("event: ", JSON.stringify(event));
  console.log("context: ", JSON.stringify(context));

  // 
  // __________________________________________________
  var params = {
    TableName: process.env.TABLE_NAME,
    Key: {
      connectionId: event.requestContext.connectionId
    }
  };
  var connection = await getRoom(params);
  console.log("connection: ", JSON.stringify(connection));

  var params = {
    TableName: table_room,
    Key: {
      roomId: connection.Item.roomId,
    }
  };
  var room = await getRoom(params);
  console.log("room: ", JSON.stringify(room));

  const apigwManagementApi = new AWS.ApiGatewayManagementApi({
    apiVersion: '2018-11-29',
    endpoint: event.requestContext.domainName + '/' + event.requestContext.stage
  });

  const postData = JSON.parse(event.body).data;
  console.log("postData: ", JSON.stringify(postData));

  const postCalls = room.Item.connectionIds.map(async ({
    connectionId
  }) => {
    try {
      await apigwManagementApi.postToConnection({
        ConnectionId: connectionId,
        Data: postData
      }).promise();
    } catch (e) {
      if (e.statusCode === 410) {
        console.log(`Found stale connection, deleting ${connectionId}`);
        await docClient.delete({
          TableName: TABLE_NAME,
          Key: {
            connectionId
          }
        }).promise();
      } else {
        throw e;
      }
    }
  });

  try {
    await Promise.all(postCalls);
  } catch (e) {
    return {
      statusCode: 500,
      body: e.stack
    };
  }

  console.info("END");
  return {
    statusCode: 200,
    body: 'Data sent.'
  };
};

/**
 * 
 */
function getRoom(params) {
  return new Promise((resolve, reject) => {

    docClient.get(params, function (err, data) {
      if (err) {
        console.log("getRoom(params) ERROR");
        reject(err);
      } else {
        console.log("getRoom(params) SUCCESS");
        resolve(data);
      }
    });
  });
}

DynamoDB

DynamoDBには次の2つのテーブルを作成します。

テーブル名 ID 用途
simplechat_connections connectionId コネクション情報の登録
simplechat_room roomId ルーム所属メンバの登録

デバッグ

ローカルPCのコードをコマンドで Lambda へデプロイする

command_powershell
// zip 圧縮する
compress-archive * ../function.zip

// デプロイする
aws lambda update-function-code `
  --function-name [function name] `
  --zip-file fileb://../function.zip `
  --profile [profile name]

CloudWatch のログをローカルで確認する

command_powershell
awslogs get [Log Group Name] `
  -w `
  -s 10m `
  -G `
  -S `
  --timestamp `
  --profile [Profile Name]

動作検証

gif などの動作が分かる情報ではないので分かりづらいですが、次のように動作します。

// 接続とルームへの入室
> wscat -c wss://xxxxxxxxx.execute-api.ap-northeast-1.amazonaws.com/Prod?roomId=dklfajsd3938daf83dfa
connected (press CTRL+C to quit)
< {"type":"join","member":{"connectionId":"czeJQeqpNjMAc3g=","username":"taro","icon":"avatar"},"room":{"users":[{"icon":"avatar","connectionId":"czKiJe_BNjMCJew=","username":"taro"},{"icon":"avatar","connectionId":"czKxffiNNjMCF-w=","username":"taro"},]}}

// メッセージの送信
> {"message":"sendmessage", "data":"hello world"}
< hello world

// メンバーの退室通知
< {"type":"unjoin","member":{"connectionId":"czeJ0d6mNjMCFCw=","username":"taro","icon":"avatar"},"room":{"users":[{"icon":"avatar","connectionId":"czKiJe_BNjMCJew=","username":"taro"}]}}

おわりに

コードが整理できたらそれも掲載したい。。

参考

CloudWatch

DynamoDB

async/awain

12
14
2

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
12
14