0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

【Lambda】チャット会話履歴をDynamoDBに保存する

Posted at

やりたいこと

  • 生成AIを用いたチャットボットアプリにおいてチャット履歴を保存する
  • 会話履歴はDynamoDBに保存する
  • Lambda関数を用いてDynamoDBに接続する
  • LangChainは使わず、boto3だけでやってみる(やってみたかったから)
  • 以前に執筆したチャットボット作成してみたって記事に、チャット保存機能を足したかった
    ※以前の記事は以下です

インフラエンジニアがBedrockを呼び出すチャットアプリを作ってみた - Qiita

環境

実行環境についてまとめます。

  • Lambdaランタイム:Python 3.12
  • 使用リージョン:バージニア北部(us-east-1)

DynamoDBを作成する

これといって特別なことはしてないです。以下の設定項目を入力してテーブルを作成します。

  • テーブル名:BedrockConversationHistory
  • パーティションキー:SessionId(文字列)
  • ソートキー:Timestamp(数値)
  • テーブル設定:デフォルト設定
    1.png

Lambda関数の作成

Lambda用のIAMロール作成

Bedrock呼び出し用にIAMロールを作成する。今回はAmazonBedrockFullAccessを設定している。

また、LambdaからDynamoDBにアクセスできるよう、LambdaにアタッチされているIAMロールに AmazonDynamoDBFullAccess を追加します。
2.png

Lambda関数の作成

先ほど作成したIAMロールをアタッチしたLambda関数を作成します。

ランタイム:Python 3.12

アーキテクチャ:x86_64

タイムアウト:5分(デフォの3秒だと生成AIからの回答を待っている間にタイムアウトになっちゃうので今回は5分に値にしておく)

セッションIDやユーザIDはフロント側で実装する想定()として、とりあえず任意のセッションIDが来た際にDynamoDBに保存できるような関数を作成しました。

import json
import boto3
import time
import uuid
from botocore.exceptions import ClientError
from boto3.dynamodb.conditions import Key

dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table('BedrockConversationHistory')

def lambda_handler(event, context):
    bedrock_runtime = boto3.client('bedrock-runtime')
    model_id = "anthropic.claude-3-5-sonnet-20240620-v1:0"

    user_message = event.get('userMessage')
    session_id = event.get('sessionId', str(uuid.uuid4()))

    # 会話履歴を取得
    history = get_conversation_history(session_id)
    
    # 会話を構築
    conversation = history + [
        {"role": "user", "content": [{"text": user_message}]}
    ]

    response_text = ""
    try:
        streaming_response = bedrock_runtime.converse_stream(
            modelId=model_id,
            messages=conversation,
            inferenceConfig={
                "maxTokens": 4096,
                "temperature": 0.5,
                "topP": 0.5,
            }
        )

        for chunk in streaming_response["stream"]:
            if "contentBlockDelta" in chunk:
                text = chunk["contentBlockDelta"]["delta"]["text"]
                response_text += text
        
        # 会話履歴を保存
        save_conversation(session_id, user_message, response_text)
    
    except (ClientError, Exception) as e:
        return {
            'statusCode': 500,
            'body': json.dumps({'error': f"Can't invoke '{model_id}'. Reason: {str(e)}"})
        }
    
    return {
        'statusCode': 200,
        'headers': {
            'Content-Type': 'application/json',
            'Access-Control-Allow-Origin': '*',
            'Access-Control-Allow-Methods': 'POST, OPTIONS',
            'Access-Control-Allow-Headers': 'Content-Type'
        },
        'body': json.dumps({'response': response_text, 'sessionId': session_id}, ensure_ascii=False)
    }

# 会話履歴の取得
def get_conversation_history(session_id):
    try:
        response = table.query(
            KeyConditionExpression=Key('SessionId').eq(session_id),
            ScanIndexForward=True,
            Limit=20
        )
        
        messages = []
        for item in response.get('Items', []):
            messages.append({
                "role": item['Role'],
                "content": [{"text": item['Message']}]
            })
        return messages
    
    except Exception as e:
        print(f"Error getting history: {e}")
        return []

# 会話の保存
def save_conversation(session_id, user_message, assistant_message):
    timestamp = int(time.time() * 1000)
    
    try:
        table.put_item(
            Item={
                'SessionId': session_id,
                'Timestamp': timestamp,
                'Role': 'user',
                'Message': user_message
            }
        )
        
        table.put_item(
            Item={
                'SessionId': session_id,
                'Timestamp': timestamp + 1,
                'Role': 'assistant',
                'Message': assistant_message
            }
        )
    
    except Exception as e:
        print(f"Error saving conversation: {e}")

テストコードを以下のように用意してテスト実施してみます。

{
  "userMessage": "空が青いのはなぜですか",
  "sessionId": "123"
}

問題なく回答が返ってきました(DynamoDBにも保存されてます)。
3.png

さて今度は先ほどのテストとは別のテストコードを用意します。

{
  "userMessage": "先ほどの質問を覚えていますか。",
  "sessionId": "123"
}

前回実施したテストの質問内容を覚えていてくれました。
4.png

おわりに

セッションIDをシステムにどう持たせるかは今後悩みそうな内容になっちゃいました。
クライアント側(Reactとか)でセッションIDを保持してサーバに送付するとか、認証側(LamdbaオーソライザーやCognito)でユーザIDを持たせてサーバ側と通信することでセッションIDみたいな値を管理すべきかとか。
インフラばっかやってるとここら辺の勘所が全然培われてないなと実感します。
LangChainが遂にメジャーバージョン(1.0.0)がリリースされたし、そっちも色々触ってみようかと

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?