2
3

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

発表中にリアルタイムフィードバック! 〜DynamoDB StreamからのAmazonBedrockエージェント〜

Last updated at Posted at 2025-03-18

はじめに

DynamoDBに、キャプチャした音声データから文字起こしをした内容が格納されています。
この文字起こしをした内容に対して、生成AIからコメントをもらうという仕組みを構築します。

次のユースケースを想定しています。
①発表者の音声をキャプチャし、数分間隔毎に文字起こしを実施
②文字起こしした内容に基づいて、生成AIからコメントをもらう
③これにより発表をしている途中に、適宜、生成AIがコメントしてくれる

本記事のポイント:

  • DynamoDB Streamを使いLambdaを実行
  • Lambda関数は、生成AIにコメントを求める
  • 生成AIは、AmazonBedrockエージェントを利用する

こちらも参考にして下さい

アーキテクチャ

処理の流れは以下の通りです。

1.DynamoDBテーブルに項目が追加されると、Lambdaをトリガー
2.Lambda関数はデータを取得し、BedrockAgentを実行
3.BedrockAgentで回答(コメント)を生成
4.Lambda関数で回答をDynamoDBテーブルに追加

スクリーンショット 2025-03-18 22.30.25.png

DynamoDBの構築

(1) DynamoDBテーブルの項目

項目 説明
jobName string Transcribeのジョブ名(パーティションキー)
start_time string 文字起こし開始時刻(ソートキー)
end_time string 文字起こし終了時刻
transcript string 文字起こしした文章

(2) DynamoDB Streamの設定

DynamoDBテーブルのStreamを有効にすることで、DynamoDBのレコードが変更されるたびにストリームが発生し、そのデータをLambdaで処理できるようになります。
以下の設定で、ストリームをオンにします。

  • 表示タイプ:新しいイメージ

(3) DynamoDB Streamの構造

DynamoDBストリームは、テーブルの変更をイベントとして記録し、Lambdaなどのサービスで処理できるようにする機能です。ストリームには INSERT(追加)、MODIFY(更新)、REMOVE(削除) の3種類のイベントが含まれます。

{
    "Records": [
        {
            "eventID": "一意の識別子",
            "eventName": "INSERT | MODIFY | REMOVE",
            "eventVersion": "バージョン情報",
            "eventSource": "aws:dynamodb",
            "awsRegion": "リージョン",
            "dynamodb": {
                "ApproximateCreationDateTime": "タイムスタンプ",
                "Keys": {
                    "パーティションキー": { "型": "値" },
                    "ソートキー": { "型": "値" }
                },
                "NewImage": {
                    "カラム名1": { "型": "値" },
                    "カラム名2": { "型": "値" }
                },
                "OldImage": {
                    "カラム名1": { "型": "値" },
                    "カラム名2": { "型": "値" }
                },
                "SequenceNumber": "変更のシーケンス番号",
                "SizeBytes": "データサイズ",
                "StreamViewType": "NEW_IMAGE | OLD_IMAGE | NEW_AND_OLD_IMAGES | KEYS_ONLY"
            },
            "eventSourceARN": "ストリームのARN"
        }
    ]
}

各項目の説明

項目名 説明
eventID 各イベントの一意の識別子
eventName 変更の種類(INSERT, MODIFY, REMOVE)
eventVersion イベントのバージョン情報
eventSource イベントの発生元(固定値:aws:dynamodb
awsRegion DynamoDBテーブルがあるAWSリージョン
dynamodb DynamoDBストリームの詳細情報
ApproximateCreationDateTime イベントの発生時刻(秒単位)
Keys 変更対象のレコードのキー情報(パーティションキーとソートキー)
NewImage StreamViewTypeNEW_IMAGE または NEW_AND_OLD_IMAGES の場合、新しいデータのスナップショット
OldImage StreamViewTypeOLD_IMAGE または NEW_AND_OLD_IMAGES の場合、変更前のデータのスナップショット
SequenceNumber ストリーム内の順序を示す番号
SizeBytes 変更データのサイズ(バイト)
StreamViewType ストリームのデータ保持タイプ (NEW_IMAGE / OLD_IMAGE / NEW_AND_OLD_IMAGES / KEYS_ONLY)
eventSourceARN DynamoDBストリームのARN

Lambdaの構築

(1) Lambda関数の作成

  • INSERTイベントに対して処理を実施
  • StreamからNewImageを取得
  • BedrockAgent(※後述)を呼び出し
  • BedrockAgentからのレスポンスをDynamoDBテーブルに格納
import boto3
import json
import logging
import os
from uuid import uuid4

# ログの設定
logger = logging.getLogger()
logger.setLevel(logging.INFO)

# 環境変数から設定を取得
table_name = os.getenv("DYNAMODB_TABLE_NAME", "DefaultTableName")
agent_id = os.getenv("BEDROCK_AGENT_ID", "DefaultAgentID")
agent_alias_id = os.getenv("BEDROCK_AGENT_ALIAS_ID", "DefaultAgentAliasID")
session_id = str(uuid4())

# クライアントの初期化
bedrock = boto3.client('bedrock-agent-runtime', 'us-east-1')
dynamodb = boto3.client('dynamodb')

def lambda_handler(event, context):
    logger.info(f"Received event: {json.dumps(event, ensure_ascii=False, indent=2)}")

    for record in event.get('Records', []):
        # INSERT イベントのみ処理
        event_name = record.get('eventName', '')
        if event_name != 'INSERT':
            logger.info(f"Skipping event: {event_name}")
            continue

        # DynamoDBデータの取得
        dynamo_data = record.get('dynamodb', {})
        new_image = dynamo_data.get('NewImage')

        if not new_image:
            logger.warning(f"Skipping record without 'NewImage': {json.dumps(record, ensure_ascii=False)}")
            continue  # NewImageがない場合はスキップ

        # データ取得
        job_name = new_image.get('job_name', {}).get('S', '')
        start_time = new_image.get('start_time', {}).get('S', '')
        transcript = new_image.get('transcript', {}).get('S', '')

        if not job_name or not transcript:
            logger.warning(f"Skipping incomplete record: {json.dumps(new_image, ensure_ascii=False)}")
            continue

        logger.info(f"Processing job: {job_name}, Start Time: {start_time}, Transcript: {transcript}")

        # Bedrockエージェント呼び出し
        try:
            response_bedrock = bedrock.invoke_agent(
                agentId=agent_id,
                agentAliasId=agent_alias_id,
                sessionId=session_id,
                inputText=transcript
            )

            # Bedrockエージェントのレスポンス処理
            comment = ""
            for res in response_bedrock.get("completion", []):
                chunk = res.get("chunk", {})
                if "bytes" in chunk:
                    comment += chunk["bytes"].decode("utf-8")

            logger.info(f"Received comment: {comment}")

        except Exception as e:
            logger.error(f"Error calling Bedrock Agent: {str(e)}")
            comment = "Error generating comment"

        # DynamoDB にコメントを追加
        try:
            dynamodb.update_item(
                TableName=table_name,
                Key={'job_name': {'S': job_name},'start_time': {'S': start_time}},
                UpdateExpression="set #comment = :comment",
                ExpressionAttributeNames={'#comment': 'comment'},
                ExpressionAttributeValues={':comment': {'S': comment}}
            )
            logger.info(f"Updated DynamoDB record for job: {job_name}")

        except Exception as e:
            logger.error(f"Error updating DynamoDB: {str(e)}")

    return {'statusCode': 200, 'body': 'Success'}

(2) BedrockAgenからのレスポンスについて

Bedrockエージェントをinvoke_agentで呼び出したときのレスポンスはcompletionフィールドに含まれ、以下のような構造になります。
そのため、Lambda関数ではチャンクごとにbytesをデコードし、結合する処理を実装しています。

{
  "sessionId": "<会話のセッションID>",
  "completion": [
    {
      "chunk": {
        "bytes": "44K8J+QtC4gQmVkcm9jayDlpKfjgZXjgaTjgYQ="
      }
    },
    {
      "chunk": {
        "bytes": "5pel5pys44KS5L2T44KT44GZ44CC"
      }
    }
  ],
  "status": "Success"
}

フィールドの説明

フィールド名 内容
sessionId 会話のセッションID、複数回の対話を管理するために使用される
completion ストリーミング応答のチャンクデータ(複数含まれる)
chunk.bytes Base64エンコードされたテキストデータ
status リクエストの処理状態

(3) Lamnda関数に付与する許可ポリシー

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": "bedrock:InvokeAgent",
            "Resource": "arn:aws:bedrock:us-east-1:<AWSアカウントID>:agent-alias/<BedrockエージェントID>/<BedrockエージェントエイリアスID>"
        },
        {
            "Effect": "Allow",
            "Action": [
                "dynamodb:GetRecords",
                "dynamodb:GetShardIterator",
                "dynamodb:DescribeStream",
                "dynamodb:ListStreams"
            ],
            "Resource": "arn:aws:dynamodb:us-east-1:<AWSアカウントID>:table/<DynamoDBテーブル名>/stream/*"
        },
        {
            "Effect": "Allow",
            "Action": [
                "dynamodb:UpdateItem"
            ],
            "Resource": "arn:aws:dynamodb:us-east-1:<AWSアカウントID>:table/<DynamoDBテーブル名>"
        }
    ]
}

(4) Lamnda関数のトリガー設定

Lambda関数のトリガーにDynamoDBを選択し、DynamoDBテーブル名を設定します。

Bedrockエージェントの構築

(1) エージェントの作成

Bedrockエージェントは、発表者の音声文字起こしの内容を受け取り、それに対して適切なコメントを生成する役割を持ちます。エージェントの設定時に、AIがどのようなコメントを返すべきかを明確に指示することが重要です。

  • エージェントの名前: 任意
  • エージェントリソースロール: 新しいサービスロールを作成して使用
  • モデルの選択: 自然言語生成モデルを選択
  • エージェント向けの指示: 詳細は次項で説明
  • その他の設定: デフォルトのままでOK

(2) エージェント向けの指示(プロンプト設計)

エージェントが適切なコメントを生成するためには、以下のような指示を与えるのが有効です。

指示例:

あなたは聴衆の一員であり、発表内容をリアルタイムで聞いている立場です。
発表内容の一部が提供されます。
発表の流れを考慮しながら、ポジティブで魅力的なコメントを生成してください。

  • 発表内容を褒める形でコメントを作成する
  • 聴衆が興味を持つような言葉を選ぶ
  • 30文字以内で簡潔に表現する
  • 絵文字を適宜使用し、カジュアルな雰囲気にする
  • 予測や推測は出力には含めない

(3) エイリアスの作成

エージェントのデプロイ後、APIで利用できるようにするためにエイリアスを作成します。

  • エイリアス名: 任意
  • バージョン管理: 初期バージョンを公開
  • デプロイ設定: 自動デプロイを有効化

実行結果

LambdaがDynamoDBのストリームをトリガーし、Bedrockエージェントがコメントを生成する様子を確認できます。コメントはDynamoDBテーブルに追加されています。

まとめ

本記事では、DynamoDB Streamを活用してLambdaをトリガーし、Bedrockエージェントでリアルタイムに発表内容へコメントを付与するシステムを構築しました。

今後の拡張として、

  • コメントのパーソナライズ
  • マルチモーダル対応(画像・音声解析)
  • WebSocketを活用したリアルタイムフィードバック

などの可能性も考えられます。
実装の際には、エージェントのプロンプト設計が鍵となるため、試行錯誤しながら最適な設定を見つけてください。

2
3
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
3

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?