はじめに
DynamoDBに、キャプチャした音声データから文字起こしをした内容が格納されています。
この文字起こしをした内容に対して、生成AIからコメントをもらうという仕組みを構築します。
次のユースケースを想定しています。
①発表者の音声をキャプチャし、数分間隔毎に文字起こしを実施
②文字起こしした内容に基づいて、生成AIからコメントをもらう
③これにより発表をしている途中に、適宜、生成AIがコメントしてくれる
本記事のポイント:
- DynamoDB Streamを使いLambdaを実行
- Lambda関数は、生成AIにコメントを求める
- 生成AIは、AmazonBedrockエージェントを利用する
こちらも参考にして下さい
アーキテクチャ
処理の流れは以下の通りです。
1.DynamoDBテーブルに項目が追加されると、Lambdaをトリガー
2.Lambda関数はデータを取得し、BedrockAgentを実行
3.BedrockAgentで回答(コメント)を生成
4.Lambda関数で回答をDynamoDBテーブルに追加
DynamoDBの構築
(1) DynamoDBテーブルの項目
項目 | 型 | 説明 |
---|---|---|
jobName | string | Transcribeのジョブ名(パーティションキー) |
start_time | string | 文字起こし開始時刻(ソートキー) |
end_time | string | 文字起こし終了時刻 |
transcript | string | 文字起こしした文章 |
(2) DynamoDB Streamの設定
DynamoDBテーブルのStreamを有効にすることで、DynamoDBのレコードが変更されるたびにストリームが発生し、そのデータをLambdaで処理できるようになります。
以下の設定で、ストリームをオンにします。
- 表示タイプ:新しいイメージ
(3) DynamoDB Streamの構造
DynamoDBストリームは、テーブルの変更をイベントとして記録し、Lambdaなどのサービスで処理できるようにする機能です。ストリームには INSERT(追加)、MODIFY(更新)、REMOVE(削除) の3種類のイベントが含まれます。
{
"Records": [
{
"eventID": "一意の識別子",
"eventName": "INSERT | MODIFY | REMOVE",
"eventVersion": "バージョン情報",
"eventSource": "aws:dynamodb",
"awsRegion": "リージョン",
"dynamodb": {
"ApproximateCreationDateTime": "タイムスタンプ",
"Keys": {
"パーティションキー": { "型": "値" },
"ソートキー": { "型": "値" }
},
"NewImage": {
"カラム名1": { "型": "値" },
"カラム名2": { "型": "値" }
},
"OldImage": {
"カラム名1": { "型": "値" },
"カラム名2": { "型": "値" }
},
"SequenceNumber": "変更のシーケンス番号",
"SizeBytes": "データサイズ",
"StreamViewType": "NEW_IMAGE | OLD_IMAGE | NEW_AND_OLD_IMAGES | KEYS_ONLY"
},
"eventSourceARN": "ストリームのARN"
}
]
}
各項目の説明
項目名 | 説明 |
---|---|
eventID | 各イベントの一意の識別子 |
eventName | 変更の種類(INSERT, MODIFY, REMOVE) |
eventVersion | イベントのバージョン情報 |
eventSource | イベントの発生元(固定値:aws:dynamodb ) |
awsRegion | DynamoDBテーブルがあるAWSリージョン |
dynamodb | DynamoDBストリームの詳細情報 |
ApproximateCreationDateTime | イベントの発生時刻(秒単位) |
Keys | 変更対象のレコードのキー情報(パーティションキーとソートキー) |
NewImage |
StreamViewType が NEW_IMAGE または NEW_AND_OLD_IMAGES の場合、新しいデータのスナップショット |
OldImage |
StreamViewType が OLD_IMAGE または NEW_AND_OLD_IMAGES の場合、変更前のデータのスナップショット |
SequenceNumber | ストリーム内の順序を示す番号 |
SizeBytes | 変更データのサイズ(バイト) |
StreamViewType | ストリームのデータ保持タイプ (NEW_IMAGE / OLD_IMAGE / NEW_AND_OLD_IMAGES / KEYS_ONLY ) |
eventSourceARN | DynamoDBストリームのARN |
Lambdaの構築
(1) Lambda関数の作成
- INSERTイベントに対して処理を実施
- StreamからNewImageを取得
- BedrockAgent(※後述)を呼び出し
- BedrockAgentからのレスポンスをDynamoDBテーブルに格納
import boto3
import json
import logging
import os
from uuid import uuid4
# ログの設定
logger = logging.getLogger()
logger.setLevel(logging.INFO)
# 環境変数から設定を取得
table_name = os.getenv("DYNAMODB_TABLE_NAME", "DefaultTableName")
agent_id = os.getenv("BEDROCK_AGENT_ID", "DefaultAgentID")
agent_alias_id = os.getenv("BEDROCK_AGENT_ALIAS_ID", "DefaultAgentAliasID")
session_id = str(uuid4())
# クライアントの初期化
bedrock = boto3.client('bedrock-agent-runtime', 'us-east-1')
dynamodb = boto3.client('dynamodb')
def lambda_handler(event, context):
logger.info(f"Received event: {json.dumps(event, ensure_ascii=False, indent=2)}")
for record in event.get('Records', []):
# INSERT イベントのみ処理
event_name = record.get('eventName', '')
if event_name != 'INSERT':
logger.info(f"Skipping event: {event_name}")
continue
# DynamoDBデータの取得
dynamo_data = record.get('dynamodb', {})
new_image = dynamo_data.get('NewImage')
if not new_image:
logger.warning(f"Skipping record without 'NewImage': {json.dumps(record, ensure_ascii=False)}")
continue # NewImageがない場合はスキップ
# データ取得
job_name = new_image.get('job_name', {}).get('S', '')
start_time = new_image.get('start_time', {}).get('S', '')
transcript = new_image.get('transcript', {}).get('S', '')
if not job_name or not transcript:
logger.warning(f"Skipping incomplete record: {json.dumps(new_image, ensure_ascii=False)}")
continue
logger.info(f"Processing job: {job_name}, Start Time: {start_time}, Transcript: {transcript}")
# Bedrockエージェント呼び出し
try:
response_bedrock = bedrock.invoke_agent(
agentId=agent_id,
agentAliasId=agent_alias_id,
sessionId=session_id,
inputText=transcript
)
# Bedrockエージェントのレスポンス処理
comment = ""
for res in response_bedrock.get("completion", []):
chunk = res.get("chunk", {})
if "bytes" in chunk:
comment += chunk["bytes"].decode("utf-8")
logger.info(f"Received comment: {comment}")
except Exception as e:
logger.error(f"Error calling Bedrock Agent: {str(e)}")
comment = "Error generating comment"
# DynamoDB にコメントを追加
try:
dynamodb.update_item(
TableName=table_name,
Key={'job_name': {'S': job_name},'start_time': {'S': start_time}},
UpdateExpression="set #comment = :comment",
ExpressionAttributeNames={'#comment': 'comment'},
ExpressionAttributeValues={':comment': {'S': comment}}
)
logger.info(f"Updated DynamoDB record for job: {job_name}")
except Exception as e:
logger.error(f"Error updating DynamoDB: {str(e)}")
return {'statusCode': 200, 'body': 'Success'}
(2) BedrockAgenからのレスポンスについて
Bedrockエージェントをinvoke_agentで呼び出したときのレスポンスはcompletionフィールドに含まれ、以下のような構造になります。
そのため、Lambda関数ではチャンクごとにbytesをデコードし、結合する処理を実装しています。
{
"sessionId": "<会話のセッションID>",
"completion": [
{
"chunk": {
"bytes": "44K8J+QtC4gQmVkcm9jayDlpKfjgZXjgaTjgYQ="
}
},
{
"chunk": {
"bytes": "5pel5pys44KS5L2T44KT44GZ44CC"
}
}
],
"status": "Success"
}
フィールドの説明
フィールド名 | 内容 |
---|---|
sessionId | 会話のセッションID、複数回の対話を管理するために使用される |
completion | ストリーミング応答のチャンクデータ(複数含まれる) |
chunk.bytes | Base64エンコードされたテキストデータ |
status | リクエストの処理状態 |
(3) Lamnda関数に付与する許可ポリシー
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": "bedrock:InvokeAgent",
"Resource": "arn:aws:bedrock:us-east-1:<AWSアカウントID>:agent-alias/<BedrockエージェントID>/<BedrockエージェントエイリアスID>"
},
{
"Effect": "Allow",
"Action": [
"dynamodb:GetRecords",
"dynamodb:GetShardIterator",
"dynamodb:DescribeStream",
"dynamodb:ListStreams"
],
"Resource": "arn:aws:dynamodb:us-east-1:<AWSアカウントID>:table/<DynamoDBテーブル名>/stream/*"
},
{
"Effect": "Allow",
"Action": [
"dynamodb:UpdateItem"
],
"Resource": "arn:aws:dynamodb:us-east-1:<AWSアカウントID>:table/<DynamoDBテーブル名>"
}
]
}
(4) Lamnda関数のトリガー設定
Lambda関数のトリガーにDynamoDBを選択し、DynamoDBテーブル名を設定します。
Bedrockエージェントの構築
(1) エージェントの作成
Bedrockエージェントは、発表者の音声文字起こしの内容を受け取り、それに対して適切なコメントを生成する役割を持ちます。エージェントの設定時に、AIがどのようなコメントを返すべきかを明確に指示することが重要です。
- エージェントの名前: 任意
- エージェントリソースロール: 新しいサービスロールを作成して使用
- モデルの選択: 自然言語生成モデルを選択
- エージェント向けの指示: 詳細は次項で説明
- その他の設定: デフォルトのままでOK
(2) エージェント向けの指示(プロンプト設計)
エージェントが適切なコメントを生成するためには、以下のような指示を与えるのが有効です。
指示例:
あなたは聴衆の一員であり、発表内容をリアルタイムで聞いている立場です。
発表内容の一部が提供されます。
発表の流れを考慮しながら、ポジティブで魅力的なコメントを生成してください。
- 発表内容を褒める形でコメントを作成する
- 聴衆が興味を持つような言葉を選ぶ
- 30文字以内で簡潔に表現する
- 絵文字を適宜使用し、カジュアルな雰囲気にする
- 予測や推測は出力には含めない
(3) エイリアスの作成
エージェントのデプロイ後、APIで利用できるようにするためにエイリアスを作成します。
- エイリアス名: 任意
- バージョン管理: 初期バージョンを公開
- デプロイ設定: 自動デプロイを有効化
実行結果
LambdaがDynamoDBのストリームをトリガーし、Bedrockエージェントがコメントを生成する様子を確認できます。コメントはDynamoDBテーブルに追加されています。
まとめ
本記事では、DynamoDB Streamを活用してLambdaをトリガーし、Bedrockエージェントでリアルタイムに発表内容へコメントを付与するシステムを構築しました。
今後の拡張として、
- コメントのパーソナライズ
- マルチモーダル対応(画像・音声解析)
- WebSocketを活用したリアルタイムフィードバック
などの可能性も考えられます。
実装の際には、エージェントのプロンプト設計が鍵となるため、試行錯誤しながら最適な設定を見つけてください。