6
3

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

【LangChain編】FAQがAIの力でパワーアップ!誰でもナレッジ情報を自由自在に活用できる仕組みをAmazon Bedrock(RAG)+ LangChain + GASで構築

Last updated at Posted at 2024-07-23

はじめに

こんにちは!CBcloudのRyoです:grinning:

私の所属するCBcloudでは、物流ラストワンマイルの配送プラットフォーム「ピックゴー」を運用しており、荷物を送りたい方、荷物を届けてくれる方を24時間365日、サービスを通してつなげています!

今回は以前より投稿しているRAGを使ったAI構築のLangchain編となります
Bedrockやナレッジベース・エージェントの説明は前の記事にありますので気になる方はそちらを参照ください

完成品編

Bedrock編

Agents番外編

前回のおさらい

作ったもの

アーキテクチャ

前回の記事ではBedrockで設定したClaude3にナレッジベースを追加し、更にそれをエージェントの詳細プロンプトを使ってナレッジベースの使用可否や回答の出力方法を調整しました

今回はこれらの動きをコードベースで作ることができるPythonのLangChainと、それを使ってBedrcok APIをサーバーレスで叩けるLambdaを作成していきます

Lambdaについて

いわずと知れたサーバーレスのコンピューティングサービスですが、今回気をつけたいのがLambdaのデプロイパッケージの制限は解凍後サイズ 250MBまでという部分です
LangChainは約50~100 MBとそこそこ大きいのでレイヤーとして使う際は注意が必要です

LangChainについて

PythonのLLM専用オープンソースフレームワークです 今回の使い方として関連前回の記事に書いたエージェント機能をコードで実装できるといった感じです

Chainという意味はプロンプトをつなげるという意味もあるので、AIに渡すプロンプトを精査したり繋げたりしてモデルの正確性や関連性を向上させるのにも使われます

完成コード

import os
import boto3
import json
import uuid
from langchain.prompts import (
    ChatPromptTemplate
)
from langchain_aws import ChatBedrock
from langchain.memory import ConversationBufferMemory
from langchain.schema import (
    messages_from_dict,
    messages_to_dict,
    AIMessage,
    HumanMessage,
)

from langchain_community.retrievers import AmazonKnowledgeBasesRetriever
from langchain_core.output_parsers import StrOutputParser

s3 = boto3.resource('s3')
MEMORY_BUCKET = os.environ.get('MEMORY_BUCKET')
KNOWLEDGE_BASE_ID = os.environ.get('KNOWLEDGE_BASE_ID')
memory_bucket = s3.Bucket(MEMORY_BUCKET)


def save_memory(memory, session_id):
    object_key_name = '{}.json'.format(session_id)
    obj = memory_bucket.Object(object_key_name)
    save = obj.put(Body=json.dumps(messages_to_dict(memory.chat_memory.messages)))

def load_memory(session_id):
    object_key_name = '{}.json'.format(session_id)
    obj = memory_bucket.Object(object_key_name)

    try:
        response = obj.get()
        body = response['Body'].read()
        json_data = json.loads(body.decode('utf-8'))

        memory = ConversationBufferMemory(return_messages=False, human_prefix="H", assistant_prefix="A")
        memory.chat_memory.messages = messages_from_dict(json_data)

    except:
        memory = ConversationBufferMemory(return_messages=False, human_prefix="H", assistant_prefix="A")

    return memory


def chat(message, session_id, param_template):
    memory = load_memory(session_id)
    messages = memory.chat_memory.messages

    retriever = AmazonKnowledgeBasesRetriever(
        knowledge_base_id=KNOWLEDGE_BASE_ID,
        retrieval_config={
            "vectorSearchConfiguration": {
                "numberOfResults": 20
            }
        },
    )

    LLM = ChatBedrock(
        model_id="anthropic.claude-3-sonnet-20240229-v1:0",
        region_name="us-east-1"
    )

    context_question = """

    質問: {question}
    関連情報: {context}

    回答: """

    template = param_template + context_question

    prompt = ChatPromptTemplate.from_template(template)
    output_parser = StrOutputParser()
    all_context_info = []
    use_context_info = []

    def passthrough(data):
        return data

    def get_context(data):
        nonlocal all_context_info
        nonlocal use_context_info
        context = retriever.invoke(data['question'])

        filtered_context = []

        for item in context:
            score = item.metadata.get('score')
            source_metadata = item.metadata.get('source_metadata')
            source_uri = source_metadata.get('x-amz-bedrock-kb-source-uri') if source_metadata else None

            if score is not None and source_uri is not None:
                all_context_info.append({"name": source_uri, "score": score})
                if score >= 0.57:
                    use_context_info.append({"name": source_uri, "score": score})
                    filtered_context.append(item)

        # フィルタリングされた結果が多すぎる場合に上限を設定
        max_contexts = 10  # 取得するコンテキスト情報の上限
        print('origin',len(filtered_context))
        if len(filtered_context) > max_contexts:
            filtered_context = sorted(filtered_context, key=lambda x: x.metadata['score'], reverse=True)[:max_contexts]
            print('filter',len(filtered_context))

        if not filtered_context:
            return "現時点ではこの質問に対する適切な情報を提供することができませんでした"

        return filtered_context

    chain = (
        {"context": get_context, "question": passthrough}
        | prompt
        | LLM
        | output_parser
    )

    human_input = HumanMessage(content=message)
    resp = chain.invoke(
        {
            "history": messages,
            "human_input": human_input.content,
            "question": message,
        }
    )

    response = resp

    memory.chat_memory.messages.append(human_input)
    memory.chat_memory.messages.append(AIMessage(content=response))

    save_memory(memory, session_id)

    return response,all_context_info,use_context_info

def lambda_handler(event, context):
    body = json.loads(event['body'])
    prompt = body.get('prompt', '')
    sessionId = body.get('sessionId', '')
    template = body.get('param_template', '')

    response, all_context_info, use_context_info = chat(prompt, sessionId, template)
    print("Response:", response)
    print("All Context Info:", all_context_info)
    print("Use Context Info:", use_context_info)
    return {
        'statusCode': 200,
        'body': json.dumps({
            'message': response,
            'allContextInfo': all_context_info,
            'useContextInfo': use_context_info
        }, ensure_ascii=False)
    }

requirements.txt
boto3==1.34.114
python-dotenv
langchain
langchain_aws
langchain_community
pinecone-client
orjson

概要

やっていることですが以下の流れになります

  1. Lambdaがプロンプト、セッションID、テンプレートプロンプトの3つの引数を受け取る
  2. セッションIDを使ってS3に保存されている過去の会話履歴を取得する
  3. AmazonBedrock・モデル・プロンプトを設定する
  4. ナレッジベースからドキュメントを取得し、情報を繋げてAIに問い合わせる
  5. AIから帰ってきた回答を会話履歴としてS3に保存する
  6. 最後に回答と使ったドキュメント+関連度スコアを返す

まずはコードを一つ一つ見ていきましょう

1. Lambdaがプロンプト、セッションID、テンプレートプロンプトの3つの引数を受け取る

def lambda_handler(event, context):
    body = json.loads(event['body'])
    prompt = body.get('prompt', '')
    sessionId = body.get('sessionId', '')
    template = body.get('param_template', '')

    response, all_context_info, use_context_info = chat(prompt, sessionId, template)

各オブジェクトの説明です

prompt = AIに問い合わせる質問

sessionId = 会話履歴を保存するファイル名に使う ファイルはS3に保存される

template = 詳細プロンプトにあたる部分 AIにロール(役割)を設定することで出力条件を調節する

templateを引数として渡しているのはAPIを叩く側で後でロールを柔軟に変えられるようにする為です
中身はこんな感じで、当てたい役割に応じて変えています

あなたはPickGoという物流マッチングサービスを運用する会社の質問応答エージェントです。
以下の関連情報に基づいて、質問に簡潔に回答してください。
質問に対して与えられた関連情報のみを使用して簡潔に回答してください。
「関連情報から」や「おそらく」という言葉は使わないでください。

2. セッションIDを使ってS3に保存されている過去の会話履歴を取得する

from langchain.schema import (
    messages_from_dict,
    messages_to_dict,
    AIMessage,
    HumanMessage,
)
from langchain.memory import ConversationBufferMemory

memory_bucket = s3.Bucket(MEMORY_BUCKET)

def load_memory(session_id):
    object_key_name = '{}.json'.format(session_id)
    obj = memory_bucket.Object(object_key_name)

    try:
        response = obj.get()
        body = response['Body'].read()
        json_data = json.loads(body.decode('utf-8'))

        memory = ConversationBufferMemory(return_messages=False, human_prefix="H", assistant_prefix="A")
        memory.chat_memory.messages = messages_from_dict(json_data)

    except:
        memory = ConversationBufferMemory(return_messages=False, human_prefix="H", assistant_prefix="A")

    return memory


def chat(message, session_id, param_template):
    memory = load_memory(session_id)
    messages = memory.chat_memory.messages

会話の履歴はJSONで以下のような形でS3に保存されます

[
	{
		"type": "human",
		"data": {
			"content": "タイプ人間のテキスト ここに質問が表示される",
			"additional_kwargs": {},
			"response_metadata": {},
			"type": "human",
			"name": null,
			"id": null,
			"example": false
		}
	},
	{
		"type": "ai",
		"data": {
			"content": "タイプAIのテキスト ここに回答が表示される",
			"additional_kwargs": {},
			"response_metadata": {},
			"type": "ai",
			"name": null,
			"id": null,
			"example": false,
			"tool_calls": [],
			"invalid_tool_calls": [],
			"usage_metadata": null
		}
	}
]
memory = ConversationBufferMemory(return_messages=False, human_prefix="H", assistant_prefix="A")

memory.chat_memory.messages = messages_from_dict(json_data)

ConversationBufferMemoryでメモリインスタンスを作成し、その中にS3から取得した会話履歴を入れています

3. AmazonBedrock・モデル・プロンプトを設定する

retriever = AmazonKnowledgeBasesRetriever(
    knowledge_base_id="KNOWLEDGE_BASE_ID",
    retrieval_config={
        "vectorSearchConfiguration": {
            "numberOfResults": 20
        }
    },
)

LLM = ChatBedrock(
    model_id="anthropic.claude-3-sonnet-20240229-v1:0",
    region_name="us-east-1"
)

context_question = """

質問: {question}
関連情報: {context}

回答: """

template = param_template + context_question

prompt = ChatPromptTemplate.from_template(template)

AmazonKnowledgeBasesRetrieverを使いAmazonBedrockに作成したナレッジベースIDを設定して初期化しておきます
この時numberOfResultsを設定することで、RGAで使うドキュメント数を制限できます(この場合このretrieverを使って検索を行った場合、スコアが高い順で20個まで取得する)

その後LLMとしてモデルも設定しておきます

プロンプトは先ほどのテンプレート+質問と関連情報と回答の出力条件を繋ぎ合わせておきます
これにより現在のプロンプトは以下となります

あなたはPickGoという物流マッチングサービスを運用する会社の質問応答エージェントです。
以下の関連情報に基づいて、質問に簡潔に回答してください。
質問に対して与えられた関連情報のみを使用して簡潔に回答してください。
「関連情報から」や「おそらく」という言葉は使わないでください。`

質問: {question}
関連情報: {context}

回答: """

4. ナレッジベースからドキュメントを取得し、情報を繋げてAIに問い合わせる

output_parser = StrOutputParser()
all_context_info = []
use_context_info = []

def passthrough(data):
    return data

def get_context(data):
    nonlocal all_context_info
    nonlocal use_context_info
    context = retriever.invoke(data['question'])

    filtered_context = []

    for item in context:
        score = item.metadata.get('score')
        source_metadata = item.metadata.get('source_metadata')
        source_uri = source_metadata.get('x-amz-bedrock-kb-source-uri') if source_metadata else None

        if score is not None and source_uri is not None:
            all_context_info.append({"name": source_uri, "score": score})
            if score >= 0.57:
                use_context_info.append({"name": source_uri, "score": score})
                filtered_context.append(item)

    # フィルタリングされた結果が多すぎる場合に上限を設定
    max_contexts = 10  # 取得するコンテキスト情報の上限
    print('origin',len(filtered_context))
    if len(filtered_context) > max_contexts:
        filtered_context = sorted(filtered_context, key=lambda x: x.metadata['score'], reverse=True)[:max_contexts]
        print('filter',len(filtered_context))

    if not filtered_context:
        return "現時点ではこの質問に対する適切な情報を提供することができませんでした"

    return filtered_context


chain = (
    {"context": get_context, "question": passthrough}
    | prompt
    | LLM
    | output_parser
)


human_input = HumanMessage(content=message)
resp = chain.
(
    {
        "history": messages,
        "human_input": human_input.content,
        "question": message,
    }
)

response = resp

chainとchain.invoke部分に触れながら解説していきます

chain

{"context": get_context, "question": passthrough}

context = retriever.invoke(data['question'])

retrieverを使って検索を行います
検索結果はドキュメントとスコアを配列で返し、スコアはどれだけ関連していたかの指標になります
ループで回しながら使ったドキュメント等を空の配列に入れるループを実装していますが、私の感覚によるスコアの閾値を設定してドキュメントの数を制限しています(if score >= 0.57)

更にそこからmax_contexts = 5を設定してスコアが高い順から5つだけを返しています
これは最終的なインプットするプロンプトのトークン数を制限するためにやっていますが、数値周りは要検証ですね

prompt

上記の変数がプロンプトにチェインされるので以下のようになります

あなたはPickGoという物流マッチングサービスを運用する会社の質問応答エージェントです。
以下の関連情報に基づいて、質問に簡潔に回答してください。
質問に対して与えられた関連情報のみを使用して簡潔に回答してください。
「関連情報から」や「おそらく」という言葉は使わないでください。`

質問: PickGoとはなんですか
関連情報: {
  "document": {
    "topic": "ピックゴーについて",
    "sections": [
      {
        "question": ピックゴーとは何ですか?",
        "answer": "PickGoは、個人や企業が即日配送サービスを利用できるプラットフォームです。ドライバーと荷主をマッチングすることで、迅速かつ効率的な配送を実現しています![Something went wrong]()
![Something went wrong]()
。"
      }
    ]
  }
}


回答: """

LLM

設定したLLM(Claude 3)を入れます

output_parser

これはAIからの応答を文字列として処理するために使用しています
この場合は、AIモデルが返す出力をテキストとして処理し、文字列として解析します

chain.invoke

history

historyなのでS3から取得した過去の会話履歴メッセージを入れます

human_input

human_input = HumanMessage(content=message)

人間の発言を表すためのクラス(HumanMessage)を設定し、質問を入れます

question

最後にquestionフィールドにメッセージ(ここでいう質問)を入れて設定完了です

5. AIから帰ってきた回答を会話履歴としてS3に保存する

def save_memory(memory, session_id):
    object_key_name = '{}.json'.format(session_id)
    obj = memory_bucket.Object(object_key_name)
    obj.put(Body=json.dumps(messages_to_dict(memory.chat_memory.messages)))


memory.chat_memory.messages.append(human_input)
memory.chat_memory.messages.append(AIMessage(content=response))

save_memory(memory, session_id)

セッションID通りにS3に保存します
これ以降同じセッションIDの場合は会話履歴を使用することができます

Lambdaにアップロードする

諸々コードとテストを終えたらLambdaにアップロードします
今回はデプロイパッケージをS3にアップロードしてCloudFormationを使いLambdaに展開させるSAMを使います

macOSなのでbrewでCLIをインストールします

$ brew tap aws/tap
$ brew install aws-sam-cli
$ sam --version
SAM CLI, version ~

sam initでパッケージファイルを作り、Lambda用pythonファイルとrequirements.txt、各yamlを用意します

packaged.yaml
AWSTemplateFormatVersion: '2010-09-09'
Transform: AWS::Serverless-2016-10-31
Resources:
  MyLambdaFunction:
    Type: AWS::Serverless::Function
    Properties:
      Handler: lambda_function.lambda_handler
      Runtime: python3.11
      CodeUri: {レイヤーのURI}
      MemorySize: 128
      Timeout: 10
    Metadata:
      SamResourceId: MyLambdaFunction

template.yaml
AWSTemplateFormatVersion: '2010-09-09'
Transform: AWS::Serverless-2016-10-31
Resources:
  MyLambdaFunction:
    Type: AWS::Serverless::Function
    Properties:
      Handler: lambda_function.lambda_handler
      Runtime: python3.11
      CodeUri: .
      MemorySize: 128
      Timeout: 10

最後にビルドしてSAMのデプロイを開始します

sam build
sam package --output-template-file packaged.yaml --s3-bucket {ここに使うレイヤー名}
sam deploy --template-file packaged.yaml --stack-name {スタック名} --capabilities CAPABILITY_IAM

環境変数を設定し、REST APIを公開する

無事Lambdaに上がったら必要なポリシーを割り当てたロールを設定し、API Gatewayを使ってトークン認証ありのREST APIとして公開します

これでBedrockのAIを外から叩く準備ができました!

実際に叩いてみる

では今回はGASからLambdaを叩いてみましょう

const template = `あなたはPickGoという物流マッチングサービスを運用する会社の質問応答エージェントです。以下の関連情報に基づいて、質問に簡潔に回答してください。質問に対して与えられた関連情報のみを使用して簡潔に回答してください。「関連情報から」や「おそらく」という言葉は使わないでください。`

function fetchLangChain(prompt, sessionId, template) {
  var scriptProperties = PropertiesService.getScriptProperties();
  var url = scriptProperties.getProperty('langChainLmabdaUrl');
  var apiKey = scriptProperties.getProperty('apiKey');

  return new Promise((resolve, reject) => {
    var options = {
      'method': 'POST',
      'headers': {
        'x-api-key': apiKey,
        'Content-Type': 'application/json'
      },
      'payload': JSON.stringify({ prompt: prompt, sessionId: sessionId, param_template: template }),
      'muteHttpExceptions': true
    };

    try {
      var response = UrlFetchApp.fetch(url, options);
      var responseCode = response.getResponseCode();
      var responseBody = response.getContentText();
      
      Logger.log('Response Code: ' + responseCode);
      Logger.log('Response Body: ' + responseBody);
      
      if (responseCode === 200) {
        var jsonResponse = JSON.parse(responseBody);
        resolve(jsonResponse); // 修正点: JSON形式のレスポンスボディ全体を返す
      } else {
        reject(new Error('Error response: ' + responseBody));
      }
    } catch (e) {
      Logger.log('Error: ' + e.message);
      if (e.response) {
        Logger.log('Error Response: ' + e.response.getContentText());
      }
      reject(e);
    }
  });
}


各パラメータとURLを設定し、APIをリクエストすることで回答とドキュメント+スコアの配列が結果が返ってきました

終わりに

以上までがLangcahinのLambdaアップロードまでの流れになります
おおまかな流れでしたが、私はAWS初心者だったのでポリシーやAPI周りで結構つまづきました

ただポリシーはエラーの通りに権限を付与してあげたり、公開APIもCloudWatchを活用して冷静にみていけば問題を解決できることが多いです
またChatGPTはじめAWSのドキュメントやコミュニティのサポートも活用すると、学習がよりスムーズに進むと感じました!

次はいよいよ実際に使ってみた後の評価や直近のアップデート内容、開発途中につまづいた所などをまとめた完結編になります

果たしてこのAIは使えたのかどうか・・・

記事ができあがり次第投稿する予定です!

6
3
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
6
3

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?