はじめに
こんにちは!CBcloudのRyoです
私の所属するCBcloudでは、物流ラストワンマイルの配送プラットフォーム「ピックゴー」を運用しており、荷物を送りたい方、荷物を届けてくれる方を24時間365日、サービスを通してつなげています!
今回は以前より投稿しているRAGを使ったAI構築のLangchain編となります
Bedrockやナレッジベース・エージェントの説明は前の記事にありますので気になる方はそちらを参照ください
完成品編
Bedrock編
Agents番外編
前回のおさらい
アーキテクチャ
前回の記事ではBedrockで設定したClaude3にナレッジベースを追加し、更にそれをエージェントの詳細プロンプトを使ってナレッジベースの使用可否や回答の出力方法を調整しました
今回はこれらの動きをコードベースで作ることができるPythonのLangChainと、それを使ってBedrcok APIをサーバーレスで叩けるLambdaを作成していきます
Lambdaについて
いわずと知れたサーバーレスのコンピューティングサービスですが、今回気をつけたいのがLambdaのデプロイパッケージの制限は解凍後サイズ 250MBまでという部分です
LangChainは約50~100 MBとそこそこ大きいのでレイヤーとして使う際は注意が必要です
LangChainについて
PythonのLLM専用オープンソースフレームワークです 今回の使い方として関連前回の記事に書いたエージェント機能をコードで実装できるといった感じです
Chainという意味はプロンプトをつなげるという意味もあるので、AIに渡すプロンプトを精査したり繋げたりしてモデルの正確性や関連性を向上させるのにも使われます
完成コード
import os
import boto3
import json
import uuid
from langchain.prompts import (
ChatPromptTemplate
)
from langchain_aws import ChatBedrock
from langchain.memory import ConversationBufferMemory
from langchain.schema import (
messages_from_dict,
messages_to_dict,
AIMessage,
HumanMessage,
)
from langchain_community.retrievers import AmazonKnowledgeBasesRetriever
from langchain_core.output_parsers import StrOutputParser
s3 = boto3.resource('s3')
MEMORY_BUCKET = os.environ.get('MEMORY_BUCKET')
KNOWLEDGE_BASE_ID = os.environ.get('KNOWLEDGE_BASE_ID')
memory_bucket = s3.Bucket(MEMORY_BUCKET)
def save_memory(memory, session_id):
object_key_name = '{}.json'.format(session_id)
obj = memory_bucket.Object(object_key_name)
save = obj.put(Body=json.dumps(messages_to_dict(memory.chat_memory.messages)))
def load_memory(session_id):
object_key_name = '{}.json'.format(session_id)
obj = memory_bucket.Object(object_key_name)
try:
response = obj.get()
body = response['Body'].read()
json_data = json.loads(body.decode('utf-8'))
memory = ConversationBufferMemory(return_messages=False, human_prefix="H", assistant_prefix="A")
memory.chat_memory.messages = messages_from_dict(json_data)
except:
memory = ConversationBufferMemory(return_messages=False, human_prefix="H", assistant_prefix="A")
return memory
def chat(message, session_id, param_template):
memory = load_memory(session_id)
messages = memory.chat_memory.messages
retriever = AmazonKnowledgeBasesRetriever(
knowledge_base_id=KNOWLEDGE_BASE_ID,
retrieval_config={
"vectorSearchConfiguration": {
"numberOfResults": 20
}
},
)
LLM = ChatBedrock(
model_id="anthropic.claude-3-sonnet-20240229-v1:0",
region_name="us-east-1"
)
context_question = """
質問: {question}
関連情報: {context}
回答: """
template = param_template + context_question
prompt = ChatPromptTemplate.from_template(template)
output_parser = StrOutputParser()
all_context_info = []
use_context_info = []
def passthrough(data):
return data
def get_context(data):
nonlocal all_context_info
nonlocal use_context_info
context = retriever.invoke(data['question'])
filtered_context = []
for item in context:
score = item.metadata.get('score')
source_metadata = item.metadata.get('source_metadata')
source_uri = source_metadata.get('x-amz-bedrock-kb-source-uri') if source_metadata else None
if score is not None and source_uri is not None:
all_context_info.append({"name": source_uri, "score": score})
if score >= 0.57:
use_context_info.append({"name": source_uri, "score": score})
filtered_context.append(item)
# フィルタリングされた結果が多すぎる場合に上限を設定
max_contexts = 10 # 取得するコンテキスト情報の上限
print('origin',len(filtered_context))
if len(filtered_context) > max_contexts:
filtered_context = sorted(filtered_context, key=lambda x: x.metadata['score'], reverse=True)[:max_contexts]
print('filter',len(filtered_context))
if not filtered_context:
return "現時点ではこの質問に対する適切な情報を提供することができませんでした"
return filtered_context
chain = (
{"context": get_context, "question": passthrough}
| prompt
| LLM
| output_parser
)
human_input = HumanMessage(content=message)
resp = chain.invoke(
{
"history": messages,
"human_input": human_input.content,
"question": message,
}
)
response = resp
memory.chat_memory.messages.append(human_input)
memory.chat_memory.messages.append(AIMessage(content=response))
save_memory(memory, session_id)
return response,all_context_info,use_context_info
def lambda_handler(event, context):
body = json.loads(event['body'])
prompt = body.get('prompt', '')
sessionId = body.get('sessionId', '')
template = body.get('param_template', '')
response, all_context_info, use_context_info = chat(prompt, sessionId, template)
print("Response:", response)
print("All Context Info:", all_context_info)
print("Use Context Info:", use_context_info)
return {
'statusCode': 200,
'body': json.dumps({
'message': response,
'allContextInfo': all_context_info,
'useContextInfo': use_context_info
}, ensure_ascii=False)
}
boto3==1.34.114
python-dotenv
langchain
langchain_aws
langchain_community
pinecone-client
orjson
概要
やっていることですが以下の流れになります
- Lambdaがプロンプト、セッションID、テンプレートプロンプトの3つの引数を受け取る
- セッションIDを使ってS3に保存されている過去の会話履歴を取得する
- AmazonBedrock・モデル・プロンプトを設定する
- ナレッジベースからドキュメントを取得し、情報を繋げてAIに問い合わせる
- AIから帰ってきた回答を会話履歴としてS3に保存する
- 最後に回答と使ったドキュメント+関連度スコアを返す
まずはコードを一つ一つ見ていきましょう
1. Lambdaがプロンプト、セッションID、テンプレートプロンプトの3つの引数を受け取る
def lambda_handler(event, context):
body = json.loads(event['body'])
prompt = body.get('prompt', '')
sessionId = body.get('sessionId', '')
template = body.get('param_template', '')
response, all_context_info, use_context_info = chat(prompt, sessionId, template)
各オブジェクトの説明です
prompt = AIに問い合わせる質問
sessionId = 会話履歴を保存するファイル名に使う ファイルはS3に保存される
template = 詳細プロンプトにあたる部分 AIにロール(役割)を設定することで出力条件を調節する
templateを引数として渡しているのはAPIを叩く側で後でロールを柔軟に変えられるようにする為です
中身はこんな感じで、当てたい役割に応じて変えています
あなたはPickGoという物流マッチングサービスを運用する会社の質問応答エージェントです。
以下の関連情報に基づいて、質問に簡潔に回答してください。
質問に対して与えられた関連情報のみを使用して簡潔に回答してください。
「関連情報から」や「おそらく」という言葉は使わないでください。
2. セッションIDを使ってS3に保存されている過去の会話履歴を取得する
from langchain.schema import (
messages_from_dict,
messages_to_dict,
AIMessage,
HumanMessage,
)
from langchain.memory import ConversationBufferMemory
memory_bucket = s3.Bucket(MEMORY_BUCKET)
def load_memory(session_id):
object_key_name = '{}.json'.format(session_id)
obj = memory_bucket.Object(object_key_name)
try:
response = obj.get()
body = response['Body'].read()
json_data = json.loads(body.decode('utf-8'))
memory = ConversationBufferMemory(return_messages=False, human_prefix="H", assistant_prefix="A")
memory.chat_memory.messages = messages_from_dict(json_data)
except:
memory = ConversationBufferMemory(return_messages=False, human_prefix="H", assistant_prefix="A")
return memory
def chat(message, session_id, param_template):
memory = load_memory(session_id)
messages = memory.chat_memory.messages
会話の履歴はJSONで以下のような形でS3に保存されます
[
{
"type": "human",
"data": {
"content": "タイプ人間のテキスト ここに質問が表示される",
"additional_kwargs": {},
"response_metadata": {},
"type": "human",
"name": null,
"id": null,
"example": false
}
},
{
"type": "ai",
"data": {
"content": "タイプAIのテキスト ここに回答が表示される",
"additional_kwargs": {},
"response_metadata": {},
"type": "ai",
"name": null,
"id": null,
"example": false,
"tool_calls": [],
"invalid_tool_calls": [],
"usage_metadata": null
}
}
]
memory = ConversationBufferMemory(return_messages=False, human_prefix="H", assistant_prefix="A")
memory.chat_memory.messages = messages_from_dict(json_data)
ConversationBufferMemoryでメモリインスタンスを作成し、その中にS3から取得した会話履歴を入れています
3. AmazonBedrock・モデル・プロンプトを設定する
retriever = AmazonKnowledgeBasesRetriever(
knowledge_base_id="KNOWLEDGE_BASE_ID",
retrieval_config={
"vectorSearchConfiguration": {
"numberOfResults": 20
}
},
)
LLM = ChatBedrock(
model_id="anthropic.claude-3-sonnet-20240229-v1:0",
region_name="us-east-1"
)
context_question = """
質問: {question}
関連情報: {context}
回答: """
template = param_template + context_question
prompt = ChatPromptTemplate.from_template(template)
AmazonKnowledgeBasesRetrieverを使いAmazonBedrockに作成したナレッジベースIDを設定して初期化しておきます
この時numberOfResultsを設定することで、RGAで使うドキュメント数を制限できます(この場合このretrieverを使って検索を行った場合、スコアが高い順で20個まで取得する)
その後LLMとしてモデルも設定しておきます
プロンプトは先ほどのテンプレート+質問と関連情報と回答の出力条件を繋ぎ合わせておきます
これにより現在のプロンプトは以下となります
あなたはPickGoという物流マッチングサービスを運用する会社の質問応答エージェントです。
以下の関連情報に基づいて、質問に簡潔に回答してください。
質問に対して与えられた関連情報のみを使用して簡潔に回答してください。
「関連情報から」や「おそらく」という言葉は使わないでください。`
質問: {question}
関連情報: {context}
回答: """
4. ナレッジベースからドキュメントを取得し、情報を繋げてAIに問い合わせる
output_parser = StrOutputParser()
all_context_info = []
use_context_info = []
def passthrough(data):
return data
def get_context(data):
nonlocal all_context_info
nonlocal use_context_info
context = retriever.invoke(data['question'])
filtered_context = []
for item in context:
score = item.metadata.get('score')
source_metadata = item.metadata.get('source_metadata')
source_uri = source_metadata.get('x-amz-bedrock-kb-source-uri') if source_metadata else None
if score is not None and source_uri is not None:
all_context_info.append({"name": source_uri, "score": score})
if score >= 0.57:
use_context_info.append({"name": source_uri, "score": score})
filtered_context.append(item)
# フィルタリングされた結果が多すぎる場合に上限を設定
max_contexts = 10 # 取得するコンテキスト情報の上限
print('origin',len(filtered_context))
if len(filtered_context) > max_contexts:
filtered_context = sorted(filtered_context, key=lambda x: x.metadata['score'], reverse=True)[:max_contexts]
print('filter',len(filtered_context))
if not filtered_context:
return "現時点ではこの質問に対する適切な情報を提供することができませんでした"
return filtered_context
chain = (
{"context": get_context, "question": passthrough}
| prompt
| LLM
| output_parser
)
human_input = HumanMessage(content=message)
resp = chain.
(
{
"history": messages,
"human_input": human_input.content,
"question": message,
}
)
response = resp
chainとchain.invoke部分に触れながら解説していきます
chain
{"context": get_context, "question": passthrough}
context = retriever.invoke(data['question'])
retrieverを使って検索を行います
検索結果はドキュメントとスコアを配列で返し、スコアはどれだけ関連していたかの指標になります
ループで回しながら使ったドキュメント等を空の配列に入れるループを実装していますが、私の感覚によるスコアの閾値を設定してドキュメントの数を制限しています(if score >= 0.57)
更にそこからmax_contexts = 5を設定してスコアが高い順から5つだけを返しています
これは最終的なインプットするプロンプトのトークン数を制限するためにやっていますが、数値周りは要検証ですね
prompt
上記の変数がプロンプトにチェインされるので以下のようになります
あなたはPickGoという物流マッチングサービスを運用する会社の質問応答エージェントです。
以下の関連情報に基づいて、質問に簡潔に回答してください。
質問に対して与えられた関連情報のみを使用して簡潔に回答してください。
「関連情報から」や「おそらく」という言葉は使わないでください。`
質問: PickGoとはなんですか
関連情報: {
"document": {
"topic": "ピックゴーについて",
"sections": [
{
"question": ピックゴーとは何ですか?",
"answer": "PickGoは、個人や企業が即日配送サービスを利用できるプラットフォームです。ドライバーと荷主をマッチングすることで、迅速かつ効率的な配送を実現しています![Something went wrong]()
![Something went wrong]()
。"
}
]
}
}
回答: """
LLM
設定したLLM(Claude 3)を入れます
output_parser
これはAIからの応答を文字列として処理するために使用しています
この場合は、AIモデルが返す出力をテキストとして処理し、文字列として解析します
chain.invoke
history
historyなのでS3から取得した過去の会話履歴メッセージを入れます
human_input
human_input = HumanMessage(content=message)
人間の発言を表すためのクラス(HumanMessage)を設定し、質問を入れます
question
最後にquestionフィールドにメッセージ(ここでいう質問)を入れて設定完了です
5. AIから帰ってきた回答を会話履歴としてS3に保存する
def save_memory(memory, session_id):
object_key_name = '{}.json'.format(session_id)
obj = memory_bucket.Object(object_key_name)
obj.put(Body=json.dumps(messages_to_dict(memory.chat_memory.messages)))
memory.chat_memory.messages.append(human_input)
memory.chat_memory.messages.append(AIMessage(content=response))
save_memory(memory, session_id)
セッションID通りにS3に保存します
これ以降同じセッションIDの場合は会話履歴を使用することができます
Lambdaにアップロードする
諸々コードとテストを終えたらLambdaにアップロードします
今回はデプロイパッケージをS3にアップロードしてCloudFormationを使いLambdaに展開させるSAMを使います
macOSなのでbrewでCLIをインストールします
$ brew tap aws/tap
$ brew install aws-sam-cli
$ sam --version
SAM CLI, version ~
sam initでパッケージファイルを作り、Lambda用pythonファイルとrequirements.txt、各yamlを用意します
AWSTemplateFormatVersion: '2010-09-09'
Transform: AWS::Serverless-2016-10-31
Resources:
MyLambdaFunction:
Type: AWS::Serverless::Function
Properties:
Handler: lambda_function.lambda_handler
Runtime: python3.11
CodeUri: {レイヤーのURI}
MemorySize: 128
Timeout: 10
Metadata:
SamResourceId: MyLambdaFunction
AWSTemplateFormatVersion: '2010-09-09'
Transform: AWS::Serverless-2016-10-31
Resources:
MyLambdaFunction:
Type: AWS::Serverless::Function
Properties:
Handler: lambda_function.lambda_handler
Runtime: python3.11
CodeUri: .
MemorySize: 128
Timeout: 10
最後にビルドしてSAMのデプロイを開始します
sam build
sam package --output-template-file packaged.yaml --s3-bucket {ここに使うレイヤー名}
sam deploy --template-file packaged.yaml --stack-name {スタック名} --capabilities CAPABILITY_IAM
環境変数を設定し、REST APIを公開する
無事Lambdaに上がったら必要なポリシーを割り当てたロールを設定し、API Gatewayを使ってトークン認証ありのREST APIとして公開します
これでBedrockのAIを外から叩く準備ができました!
実際に叩いてみる
では今回はGASからLambdaを叩いてみましょう
const template = `あなたはPickGoという物流マッチングサービスを運用する会社の質問応答エージェントです。以下の関連情報に基づいて、質問に簡潔に回答してください。質問に対して与えられた関連情報のみを使用して簡潔に回答してください。「関連情報から」や「おそらく」という言葉は使わないでください。`
function fetchLangChain(prompt, sessionId, template) {
var scriptProperties = PropertiesService.getScriptProperties();
var url = scriptProperties.getProperty('langChainLmabdaUrl');
var apiKey = scriptProperties.getProperty('apiKey');
return new Promise((resolve, reject) => {
var options = {
'method': 'POST',
'headers': {
'x-api-key': apiKey,
'Content-Type': 'application/json'
},
'payload': JSON.stringify({ prompt: prompt, sessionId: sessionId, param_template: template }),
'muteHttpExceptions': true
};
try {
var response = UrlFetchApp.fetch(url, options);
var responseCode = response.getResponseCode();
var responseBody = response.getContentText();
Logger.log('Response Code: ' + responseCode);
Logger.log('Response Body: ' + responseBody);
if (responseCode === 200) {
var jsonResponse = JSON.parse(responseBody);
resolve(jsonResponse); // 修正点: JSON形式のレスポンスボディ全体を返す
} else {
reject(new Error('Error response: ' + responseBody));
}
} catch (e) {
Logger.log('Error: ' + e.message);
if (e.response) {
Logger.log('Error Response: ' + e.response.getContentText());
}
reject(e);
}
});
}
各パラメータとURLを設定し、APIをリクエストすることで回答とドキュメント+スコアの配列が結果が返ってきました
終わりに
以上までがLangcahinのLambdaアップロードまでの流れになります
おおまかな流れでしたが、私はAWS初心者だったのでポリシーやAPI周りで結構つまづきました
ただポリシーはエラーの通りに権限を付与してあげたり、公開APIもCloudWatchを活用して冷静にみていけば問題を解決できることが多いです
またChatGPTはじめAWSのドキュメントやコミュニティのサポートも活用すると、学習がよりスムーズに進むと感じました!
次はいよいよ実際に使ってみた後の評価や直近のアップデート内容、開発途中につまづいた所などをまとめた完結編になります
果たしてこのAIは使えたのかどうか・・・
記事ができあがり次第投稿する予定です!