はじめに
リアルタイム配信が求められる場面は多々あります。例えば、
- ユーザーのアクションに応じた即時フィードバック
- チャットやコメントのリアルタイム表示
- ダッシュボードへの最新データの反映
これらを実現するために、WebSocketは有効な手段となります。
本記事では、API Gatewayを使ってWebSocket通信を構築し、クライアントに適宜コメントを配信する仕組みを構築します。
本記事のポイント:
- API GatewayでWebSocketを構築
- WebサイトをS3でホスティング
- LambdaからコメントをWeb画面に配信
ゴールイメージ:
- 利用者がWebサイトにアクセスし、WebSocket接続
- システムからコメントを適宜配信
- Web画面に配信コメントを表示
アーキテクチャ
処理のポイント
- クライアントから接続された際に、コネクション情報をDynamoDBに格納
- クライアントから切断された際に、コネクション情報をDynamoDBから削除
- 格納されたコネクション情報を使用して、コメントを配信
API Gatewayで WebSocket APIを設定
(1)APIタイプで、WebSocket APIを作成
- API名:任意
- ルート選択式:任意 (例:request.body.action)
(2)ルートの設定
- $connect
- $disconnect
- 各ルートの統合ターゲットとしてLambdaを設定(Lambdaは後述)
今回は、クライアントからメッセージは送信しないため、$defaultルートは不要です。
※重要なエンドポイント
- $connect: クライアントが接続すると呼ばれる
- $disconnect: クライアントが切断されると呼ばれる
- $default: クライアントからメッセージを受信すると呼ばれる
(3)ステージの設定
- ステージ名:任意
Lambda関数の実装
Lambda関数は2つ作成します。
(1)$connect
と $disconnect
ルートで実行されるLambda関数
- 接続時に、クライアントのコネクション情報をDynamoDBに格納します。
- 切断時に、クライアントのコネクション情報をDynamoDBから削除します。
(2)コメントを配信するLambda関数
- クライアントのコネクション情報を使って、コメントを配信します。
(1)$connect
と $disconnect
ルートで実行されるLambda関数
処理のポイント
- クライアントが接続すると
$connect
イベント、切断すると$disconnect
イベントが発生します。 - Lambdaに渡されるeventのrequestContextは以下のような構造になります。
{
"requestContext": {
"routeKey": "$connect",
"eventType": "CONNECT",
"connectionId": "<コネクションID>",
"domainName": "<ドメイン名>",
"stage": "<ステージ名>",
"identity": {
"sourceIp": "<接続元IPアドレス>"
}
},
"body": "・・・"
}
- event['requestContext']['routeKey'] を判定し、接続・切断の処理を分岐させることで、接続・切断を1つのLamnda関数で処理しています。
import json
import os
import boto3
dynamodb = boto3.client('dynamodb')
table_name = os.getenv("DYNAMODB_TABLE_NAME", "DefaultTableName")
def lambda_handler(event, context):
route_key = event['requestContext']['routeKey']
connection_id = event['requestContext']['connectionId']
if route_key == '$connect':
return handle_connect(connection_id)
elif route_key == '$disconnect':
return handle_disconnect(connection_id)
else:
return {'statusCode': 400, 'body': 'Invalid route'}
# 接続時の処理
def handle_connect(connection_id):
dynamodb.put_item(
TableName=table_name,
Item={'connection_id': {'S': connection_id}}
)
return {'statusCode': 200}
# 切断時の処理
def handle_disconnect(connection_id):
dynamodb.delete_item(
TableName=table_name,
Key={'connection_id': {'S': connection_id}}
)
return {'statusCode': 200}
付与するポリシー
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"dynamodb:PutItem",
"dynamodb:DeleteItem"
],
"Resource": "arn:aws:dynamodb:us-east-1:<AWSアカウントID>:table/<DynamoDBテーブル名>"
}
]
}
(2)コメントを配信するLambda関数
処理のポイント
- API Gateway管理APIを使用して、バックエンドサービスから、接続されたクライアントへのメッセージ送信を行います。
- API Gateway管理APIを使用する際は、デプロイされたAPIのエンドポイントを指すように設定する必要があります。
https:// {api-id}.execute-api.{region}.amazonaws.com/{stage} の形式
WebSocket URLを参考にして作成可能 - このLambdaをテスト実行などすることで、メッセージを送信できます。
import json
import boto3
import os
# DynamoDBとAPI Gatewayのクライアント
dynamodb = boto3.client('dynamodb')
apigw = boto3.client(
'apigatewaymanagementapi',
endpoint_url="<デプロイされたAPIのエンドポイント>"
)
table_name = os.getenv("DYNAMODB_TABLE_NAME", "DefaultTableName")
def lambda_handler(event, context):
print("Received event:", json.dumps(event, indent=2))
# 送信するメッセージを作成
message = {
"type": "comment",
"content": "<送信したいメッセージ>"
}
# 接続中のクライアントを取得
connections = dynamodb.scan(TableName=table_name)
for item in connections.get('Items', []):
connection_id = item['connection_id']['S']
try:
# クライアントにメッセージを送信
apigw.post_to_connection(
ConnectionId=connection_id,
Data=json.dumps(message)
)
print(f"Sent message to {connection_id}")
except Exception as e:
print(f"Error sending to {connection_id}: {str(e)}")
return {'statusCode': 200, 'body': 'Messages sent'}
付与するポリシー
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"dynamodb:Scan"
],
"Resource": "arn:aws:dynamodb:us-east-1:<AWSアカウントID>:table/<DynamoDBテーブル名>"
},
{
"Effect": "Allow",
"Action": "execute-api:ManageConnections",
"Resource": "arn:aws:execute-api:us-east-1:<AWSアカウントID>:<API ID>/prod/*"
}
]
}
Webサイトの構築
(1)Web画面の作成
- WebSocket接続・切断のみを操作可能
- 接続時にメッセージが配信されてくる
(2)S3静的ホスティング
- WebSocket接続・切断のみとなるため、パブリック公開設定で実装
(1)Web画面の作成
処理のポイント
- 接続時に、WebSocketエンドポイントを入力させます
<!DOCTYPE html>
<html lang="ja">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>WebSocket Client</title>
</head>
<body>
<h2>WebSocket 接続</h2>
<label for="wsEndpoint">WebSocketエンドポイント:</label>
<input type="text" id="wsEndpoint" placeholder="wss://example.com/prod/">
<button onclick="connect()">接続</button>
<button onclick="disconnect()">切断</button>
<div id="messages"></div>
<script>
let ws;
function connect() {
const endpoint = document.getElementById("wsEndpoint").value;
if (!endpoint) {
alert("WebSocketエンドポイントを入力してください!");
return;
}
ws = new WebSocket(endpoint);
ws.onopen = function() {
console.log("✅ WebSocket 接続成功!");
document.getElementById("messages").innerHTML += "<p>✅ 接続しました!</p>";
};
ws.onmessage = function(event) {
console.log("📩 受信:", event.data);
try {
const data = JSON.parse(event.data);
const messageText = data.content || "(メッセージなし)";
const timestamp = new Date().toLocaleString();
document.getElementById("messages").innerHTML += `<p>[${timestamp}] ${messageText}</p>`;
} catch (error) {
console.error("⚠️ メッセージのパースに失敗:", error);
}
};
ws.onclose = function() {
console.log("❌ WebSocket 切断!");
document.getElementById("messages").innerHTML += "<p>❌ 切断しました</p>";
};
ws.onerror = function(error) {
console.error("⚠️ WebSocket エラー:", error);
};
}
function disconnect() {
if (ws) {
ws.close();
}
}
</script>
</body>
</html>
(2)S3静的ホスティング
注意
S3バケットをパブリック公開するため、機密情報などを含まないようにしてください。
① パブリックアクセスのブロックを無効にして、S3バケットを作成します。
② バケットポリシーを設定します。
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "PublicReadGetObject",
"Effect": "Allow",
"Principal": "*",
"Action": "s3:GetObject",
"Resource": "arn:aws:s3:::<バケット名>/*"
}
]
}
③ 上記のHTMLファイルをアップロードします。
実行結果
- ブラウザでURLにアクセス
例)
https://<バケット名>.s3.us-east-1.amazonaws.com/<HTMLファイル名>.html
- WebSocket URLを入力して「接続」
- コメントを配信するLambda関数をテスト実行
まとめ
本記事では、AWS API Gatewayを使ってWebSocket通信を構築する方法を解説しました。
ポイント
- API GatewayでWebSocket APIを作成
- Lambdaで接続管理とメッセージ送信を処理
- DynamoDBを使って接続情報を管理
- WebSocketクライアントを実装
バックエンドからのメッセージ送信を、他処理の実行結果、実行完了イベントなどと連携することで、クライアントに色々な結果を通知することができそうです。