17
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

DynamoDB のデータ更新を WebSocket (API Gateway + Lambda) で通知してみた

Last updated at Posted at 2025-12-03

はじめに

API から取得して表示しているデータの更新を、ブラウザ側にどう通知するかにはいくつかパターンがあります。定期的に API を呼び出す Polling や Long Polling、サーバーからクライアントへイベントを送れる WebSocket などです。

本記事では、その中の WebSocket を利用して、DynamoDB のデータ更新をトリガーにして API Gateway の WebSocket API と Lambda を使い、ブラウザへ通知する仕組みを実装してみました。インフラ部分のリソース(DynamoDB, API Gateway, Lambda など)は AWS CDK で定義し、CDK からデプロイします。

実装内容は次のようなイメージです。

image.png

※ なお、今回作成したソース一式は以下のリポジトリにて確認できます。

実行環境

  • Node.js: v22.14.0
  • AWS CDK: 2.1033.0

インフラ構成

AWS CDK を使って作成したインフラリソースについて、DynamoDB, API Gateway, Lambda について紹介します。

DynamoDB テーブル

まずは、アプリケーションのデータや WebSocket の接続情報を保存するための DynamoDB テーブルを作成します。今回作成したサンプルでは、次の 2 つのテーブルを用意しています。

  • TodoTable: Todo の ID やタイトル、ステータス、更新日時などを保存するテーブル
  • ConnectionsTable: WebSocket の connectionId を保存するテーブル

WebSocket 通知のトリガーにするため、TodoTable には DynamoDB Streams を有効化しています。

const todoTable = new dynamodb.Table(this, "TodoTable", {
  partitionKey: { name: "id", type: dynamodb.AttributeType.STRING },
  removalPolicy: cdk.RemovalPolicy.DESTROY,
  stream: dynamodb.StreamViewType.NEW_IMAGE,    // DynamoDB Streams を有効化
});

ConnectionsTable は、WebSocket に接続しているクライアントを管理するためのテーブルです。API Gateway の $connect ルートで渡される connectionId をパーティションキーとして保存しておき、接続中のクライアントに対して後から通知を送るときに参照します。

const connectionsTable = new dynamodb.Table(this, "ConnectionsTable", {
	partitionKey: {
		name: "connectionId",
		type: dynamodb.AttributeType.STRING,
	},
	removalPolicy: cdk.RemovalPolicy.DESTROY,
});

WebSocket でクライアントに通知する場合は、API Gateway が払い出した connectionId ごとにメッセージを送る必要があります。そのため、「いまどのクライアントが接続しているか?」をどこかに保存しておかないと、DynamoDB の更新イベントを受け取ったタイミングで通知先を特定できません。

この構成では、その役割を ConnectionsTable に持たせることで、

  • $connect 時に connectionId を追加
  • $disconnect 時に connectionId を削除
  • 通知する際は接続中の connectionId を参照して送信

という流れを実現しています。

API Gateway

次に、ブラウザとサーバーの間で双方向通信を行うための API Gateway で WebSocket API を作成します。WebSocket API では、クライアントの接続・切断・メッセージ送信に応じて、事前に定義したルートにイベントが飛んできます。

今回は以下のようなルートを定義しました。

  • $connect: クライアントが WebSocket に接続したときに呼ばれる
  • $disconnect: クライアントが切断したときに呼ばれる
  • getTodos: Todo の一覧を取得するために呼び出すルート
  • updateTodo: Todo を更新するために呼び出すルート

const webSocketApi = new apigwv2.WebSocketApi(this, "TodoWebSocketApi");
const apiStage = new apigwv2.WebSocketStage(this, "DevStage", {
	webSocketApi,
	stageName: "dev",
	autoDeploy: true,
});

webSocketApi.addRoute("$connect", {
	integration: new apigwv2_integrations.WebSocketLambdaIntegration(
		"ConnectIntegration",
		connectHandler,
	),
});
// ほかのルート($disconnect / getTodos / updateTodo)も同様に追加

// 作成した WebSocketURL を出力しておくと便利
new cdk.CfnOutput(this, "WebSocketURL", { value: apiStage.url });

Lambda

最後に、実際のロジックを実装する Lambda 関数です。

  1. 接続管理用 Lambda($connect, $disconnect ルート用)
  2. アプリケーションロジック用 Lambda(getTodos, updateTodo
  3. 更新通知用 Lambda(DynamoDB Streams)

接続管理用 Lambda

$connect 呼び出し時に API Gateway から渡される connectionId を Connections テーブルに保存し、どのクライアントが接続中かを管理します。

const ddb = DynamoDBDocumentClient.from(new DynamoDBClient({}));

export const handler: APIGatewayProxyHandler = async (event) => {
  const connectionId = event.requestContext.connectionId;
  await ddb.send(
    new PutCommand({
      TableName: process.env.CONNECTIONS_TABLE,
      Item: { connectionId },
    })
  );
  return { statusCode: 200, body: "Connected" };
};

$disconnect では、対応する connectionId のレコードを削除し、不要な接続情報が残らないようにしています。

const ddb = DynamoDBDocumentClient.from(new DynamoDBClient({}));

export const handler: APIGatewayProxyHandler = async (event) => {
  const connectionId = event.requestContext.connectionId;
  await ddb.send(
    new DeleteCommand({
      TableName: process.env.CONNECTIONS_TABLE,
      Key: { connectionId },
    })
  );
  return { statusCode: 200, body: "Disconnected" };
};

アプリケーションロジック用 Lambda

Todo の一覧取得や更新などを行う Lambda です。今回作成したサンプルでは、これらも WebSocket のルート(getTodos / updateTodo)にぶら下げています。

  • getTodos.ts: TodoTable から現在の Todo 一覧を取得し、呼び出し元のクライアントに返す
  • updateTodo.ts: 指定された id の Todo を更新し、statustitle, updatedAt を書き換える

更新結果は DynamoDB に書き込まれ、その後の DynamoDB Streams 経由で WebSocket の通知が飛ぶ構成になっています。

更新通知用 Lambda

DynamoDB Streams をトリガーにして起動する Lambda です。TodoTable に対する INSERT / MODIFY イベントを受け取り、新しいレコードイメージ(NEW_IMAGE)から id, status, title を取り出して通知用のペイロードを組み立てます。

const ddb = DynamoDBDocumentClient.from(new DynamoDBClient({}));

export const handler = async (event: DynamoDBStreamEvent) => {
  const updates = event.Records.map((record) => {
    const newImage = record.dynamodb?.NewImage;
    if (!newImage) {
      return null;
    }

    const item = unmarshall(newImage as Record<string, AttributeValue>);
    return { type: "UPDATE", item };
  }).filter((update) => update !== null);

  if (updates.length === 0) {
    return;
  }

  const connections = await ddb.send(new ScanCommand({ TableName: process.env.CONNECTIONS_TABLE }));

  if (!connections.Items || connections.Items.length === 0) {
    return;
  }

  const apiGw = new ApiGatewayManagementApiClient({
    endpoint: process.env.APIGW_ENDPOINT,
  });
  const payload = JSON.stringify(updates);

  const postTasks = connections.Items.map(async (conn) => {
    try {
      // ここでクライアントに通知する
      await apiGw.send(
        new PostToConnectionCommand({
          ConnectionId: conn.connectionId,
          Data: payload,
        })
      );
    } catch (e) {
      if (e.statusCode === 410) {
        await ddb.send(
          new DeleteCommand({
            TableName: process.env.CONNECTIONS_TABLE,
            Key: { connectionId: conn.connectionId },
          })
        );
      } else {
        console.error("Failed to notify connection", conn.connectionId, e);
      }
    }
  });

  await Promise.all(postTasks);
};

CDK の実装では、TodoTable をイベントソースとして Lambda に紐づけることで、DynamoDB Streams イベントを受け取れるようにしています。

const todoStreamNotifier = new nodejs.NodejsFunction(this, "TodoStreamNotifier", {
	entry: path.join(__dirname, "../lambda/streamNotifier.ts"),
	environment: {
		CONNECTIONS_TABLE: connectionsTable.tableName,
		APIGW_ENDPOINT: apiStage.callbackUrl,
	},
	...commonProps,
});

todoStreamNotifier.addEventSource(
    new lambdaEventSources.DynamoEventSource(todoTable, {
        startingPosition: lambda.StartingPosition.LATEST,
		batchSize: 10,
		retryAttempts: 2,
	}),
);
webSocketApi.grantManageConnections(todoStreamNotifier);

また、Management API で WebSocket クライアントへメッセージを送信できるように、対象の WebSocket API に対する execute-api:ManageConnections 権限をロールに付与する必要があります。

バックエンド実装(Lambda)

先ほど CDK で作成した Lambda 関数の実装について、役割ごとに簡単に触れていきます。

接続管理用 Lambda(connect.ts / disconnect.ts

接続管理用の Lambda は、API Gateway の $connect / $disconnect ルートと連携し、ConnectionsTable に接続情報を保存・削除します。

connect.ts
export const handler: APIGatewayProxyHandler = async (event) => {
  const connectionId = event.requestContext.connectionId;
  await ddb.send(
    new PutCommand({
      TableName: process.env.CONNECTIONS_TABLE,
      Item: { connectionId },
    })
  );
  return { statusCode: 200, body: "Connected" };
};
disconnect.ts
export const handler: APIGatewayProxyHandler = async (event) => {
  const connectionId = event.requestContext.connectionId;
  await ddb.send(
    new DeleteCommand({
      TableName: process.env.CONNECTIONS_TABLE,
      Key: { connectionId },
    })
  );
  return { statusCode: 200, body: "Disconnected" };
};

これらの実装により、「いま WebSocket に接続しているクライアントは誰か?」を DynamoDB で管理できるようになります。

アプリケーションロジック用 Lambda(getTodos.ts / updateTodo.ts

Todo の取得・更新を行う Lambda です。この記事では、これらも WebSocket のルートとして呼び出す構成にしています。

  • getTodos.ts
    • TodoTable から scan で Todo 一覧を取得する
    • WebSocket イベントの requestContext.connectionId と、CDK から渡した APIGW_ENDPOINT を使って、呼び出し元クライアントに結果を送り返す
  • updateTodo.ts
    • TodoTable に対して updateItem し、対象の Todo の statustitle を更新する

呼び出し元に WebSocket でメッセージを送る処理は、以下のように実装できます。connectionId は、WebSocket イベントの Lambda の場合は、requestContext から取得します。DynamoDB Streams のような WebSocket イベント以外の場合は、自分で管理して取得する必要があります。(今回は DynamoDB の ConnectionsTable で手動管理)

const apiGw = new ApiGatewayManagementApiClient({
    endpoint: process.env.APIGW_ENDPOINT,
});
const connectionId = event.requestContext.connectionId;
/// 省略...
await apiGw.send(
  new PostToConnectionCommand({
    ConnectionId: connectionId,
    Data: messageData,
  })
);

更新通知用 Lambda(streamNotifier.ts

更新通知用の Lambda は、DynamoDB Streams をトリガーにして起動し、TodoTable の更新内容を WebSocket 経由でクライアントに配信します。

streamNotifier.ts
try {
  await apiGw.send(
    new PostToConnectionCommand({
      ConnectionId: conn.connectionId,
      Data: payload,
    })
  );
} catch (e) {
  if (e.statusCode === 410) {
    // 接続していたクライアントがすでに切断されている場合
    await ddb.send(
      new DeleteCommand({
        TableName: process.env.CONNECTIONS_TABLE,
        Key: { connectionId: conn.connectionId },
      })
    );
  } else {
    console.error("Failed to notify connection", conn.connectionId, e);
  }
}

送信先のクライアントがすでに切断されている場合は、postToConnection が 410(Gone) エラーを返すので、そのタイミングで ConnectionsTable から該当の connectionId を削除するようにしています。

動作確認

CDK でデプロイした環境とクライアント(React Router v7)を使って、実際に WebSocket 通知を確認します。

デプロイとクライアント起動

まずはバックエンドをデプロイします。

npm run cdk -- deploy
...
CdkStack.WebSocketURL = wss://xxxxx.execute-api.ap-northeast-1.amazonaws.com/dev
...

出力された WebSocketURL が、WebSocket 接続時に利用するエンドポイントとなります。

続いて、クライアントを起動します。以下のように WebSocket クライアント作成し、メッセージを送受信します。なお、API Gateway に作成した、$connect ルートや $disconnect ルートは明示的に呼ばなくても、接続開始時、接続終了時に実行されます。

useEffect(() => {
  // websocketUrl は発行された WebSocket URL を設定
  ws.current = new WebSocket(websocketUrl);

  ws.current.onopen = () => {
    // WebSocket 接続をオープン()

    // 初期表示する Todo データを WebSocket で取得
    ws.current?.send(JSON.stringify({ action: "getTodos" }));
  };

  ws.current.onmessage = (event) => {
    // サーバーからメッセージを受信した際の処理
    
    // 省略・・・
  };
  ws.current.onclose = () => {
    // WebSocket 接続をクローズ
  }

  // コンポーネントがアンマウントされる際に実行
  return () => {
    ws.current?.close();
  };
}, []);
npm run dev
...
> dev
> react-router dev

  ➜  Local:   http://localhost:5173/

起動させた URL にアクセスすると、WebSocket で取得した Todo 一覧を確認することができました。

image.png

Chrome Dev Tools の Network を確認すると、初期表示時のメッセージのやりとりを確認することができました。

image.png

ステータス更新時の挙動を確認する

Todo のステータスを更新したときの WebSocket 通知の流れを確認します。

  1. 他のブラウザで、特定の Todo のステータスを TODOIn Progress に変更する
  2. 更新操作により、updateTodo ルートが呼び出され、TodoTable の該当レコードが更新される
  3. DynamoDB Streams をトリガーに Lambda が起動し、更新内容を含むメッセージを接続中クライアントに送信する
  4. ブラウザでは、画面をリロードしなくても、対象の Todo のステータスが自動的に更新される

image.png

ステータスが更新された際に、WebSocket でメッセージを受信していることを確認できました 🙌

image.png

さいごに

サーバレス構成で、DynamoDB のデータ更新をトリガーに API Gateway WebSocket API 経由でクライアントへニアリアルタイムに通知する仕組みを試してみました。DynamoDB Streams と Lambda、API Gateway を組み合わせることで、更新検知通知 の流れを比較的シンプルな構成で実現できたと思います。
接続中のクライアントにサーバ側から更新を届けることを検討中の方の参考になれば幸いです。

参考

17
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
17
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?