皆様こんにちは!私はSREチームとしてAWS周りの設計、構築、運用などを実施しています。
弊社でも生成AIの流れに乗って、RAGチャットを作る機会が多いのですが、BoxやSharepointなど多くのソースをS3に格納することでナレッジベースを実現しています。
それぞれのソースファイルのURLをチャットに載せたいという要件がありましたので、実装方法をご紹介したいと思います!
手探りで実装した部分も多いので、もっといい方法あるよ!という方は是非教えてください!
構成
基本的なチャット部分は以下のリポジトリで構築したものを利用します。
以下の図でオレンジ色の部分は上記リポジトリのCDKをデプロイすることで作成されます。
今回は青い部分を追加して、冒頭述べた通り、チャットの回答にソースURLを加えます。
構築
ナレッジベースへのクエリ
デフォルトだと、ナレッジベースへのクエリはできないので、ソースコードに変更を加えて、クエリできるようにします。
以下参考にしてみてください!
このアプリケーションはStrands Agentを使用しています。
以下のツールをStrands Agentで利用可能にすることで、任意のナレッジベースへのクエリが可能になります。
- ナレッジベースへのクエリを実行するツール
- ナレッジベースの一覧を取得するツール
ナレッジベースの説明欄を取得することでクエリを実行するナレッジベースを判断するため、ナレッジベースの説明欄を丁寧に書くとエージェントがクエリ先のナレッジベースを正しく選択する精度が上がります。
早速ツールを記述しましょう!
(新規作成)api/services/custom_tools.py
from strands import tool
import boto3
import logging
import time
@tool
def search_vector_db(query: str, kdb_id: str) -> str:
"""
A tool to query a knowledge base
Args:
query (str): User's question
kdb_id (str): Knowledge base's ID
Returns:
str: Summary of the results
"""
client = boto3.client('bedrock-agent-runtime')
max_retries = 3
retry_wait_seconds = 10
for attempt in range(max_retries + 1):
try:
retrieve_response = client.retrieve(
knowledgeBaseId=kdb_id,
retrievalQuery={
'text': query
},
retrievalConfiguration={
"vectorSearchConfiguration": {
"numberOfResults": 3
}
}
)
results = []
for result in retrieve_response.get('retrievalResults', []):
content = result.get('content', {}).get('text', '')
source_url = result.get('metadata', {}).get('source_url', '')
if content:
results.append(f"Content: {content}\nSource: {source_url}")
return "\n".join(results) if results else "No relevant information found in the knowledge base."
except Exception as e:
error_str = str(e)
# Check if it's the specific Aurora DB auto-pause error
is_aurora_pause_error = "Aurora DB instance" in error_str and "resuming after being auto-paused" in error_str
if is_aurora_pause_error and attempt < max_retries:
logging.warning(f"Aurora DB resuming (attempt {attempt + 1}/{max_retries}). Waiting {retry_wait_seconds} seconds before retry...")
time.sleep(retry_wait_seconds)
continue
# If all retries exhausted or not the aurora pause error, log and return error
logging.error(f"Error in search_vector_db: {str(e)}", exc_info=True)
return "Error occurred while searching the knowledge base."
@tool
def list_vector_dbs() -> str:
"""
A tool to get knowledge base list available.
Returns:
str: Knowledge base's ID list
"""
client = boto3.client('bedrock-agent')
try:
response = client.list_knowledge_bases()
kdb_ids = [f"{kdb['knowledgeBaseId']}: {kdb.get('description', 'None')}" for kdb in response.get('knowledgeBaseSummaries', [])]
return "\n".join(kdb_ids) if kdb_ids else "No knowledge bases available."
except Exception as e:
logging.error(f"Error in list_vector_dbs: {str(e)}", exc_info=True)
return "Error occurred while listing knowledge bases."
PoC環境ではAurora Serverlessの最小ACUを0にしてコスト削減をすることが多いため、Auroraのコールドスタート対策で、リトライ処理を入れています。
また、metadataからsource_urlを取得しています。
次にエージェントがツールを利用できるようにしましょう!
api/services/streaming_service.py
(省略)
from services.custom_tools import * ←追記
(省略)
tools = [
current_time,
calculator,
sleep,
search_vector_db, ←追記
list_vector_dbs, ←追記
session_upload_tool,
]
(省略)
IAMの権限が必要なので、つけてあげましょう!
cdk/lib/strands-chat-stack.ts
(省略)
handler.role?.addToPrincipalPolicy(
new PolicyStatement({
effect: Effect.ALLOW,
actions: [
'bedrock:InvokeModelWithResponseStream',
'bedrock:InvokeModel',
'bedrock:retrieve*', ←追記
'bedrock:List*', ←追記
],
resources: ['*'],
})
);
(省略)
次に、エージェントのシステムプロンプトを編集して、追加したツールの使い方を教えてあげましょう!
api/utils.py
(省略)
def generate_session_system_prompt(session_workspace_dir: str) -> str:
"""Generate system prompt with session-specific workspace directory
Args:
session_workspace_dir: Session-specific workspace directory
Returns:
System prompt with session workspace directory
"""
return f"""## Basic Output Policy
- When structuring text, please output in markdown format. However, there's no need to forcibly create chapters in markdown for simple plain text responses.
- Output links as [link_title](link_url) and images as .
- When using tools, explain in text how you will use them while calling them.
## About File Output
- You are running on AWS Lambda. Therefore, when writing files, always write under `{session_workspace_dir}`.
- Similarly, when a workspace is needed, use the `{session_workspace_dir}` directory. Do not ask users about their current workspace. It is always `{session_workspace_dir}`.
- Also, users cannot directly access files written under `{session_workspace_dir}`. Therefore, when providing these files to users, *always use the `upload_file_to_s3_and_retrieve_s3_url` tool to upload to S3 and retrieve the S3 URL*. Include the retrieved S3 URL in the final output in the format .
## About Tool Usage
- `search_vector_db`: Use this to search for information in the vector database. Provide the database name and query as arguments.←追記
- `list_vector_dbs`: Use this to list available vector databases.←追記
- `session_upload_tool`: Use this to upload files to S3 and retrieve the S3 URL. Provide the local file path as an argument.
- `calculator`: Use this to perform calculations. Provide the expression to calculate as an argument.
- `current_time`: Use this to get the current date and time.
- `sleep`: Use this to pause for a specified number of seconds. Provide the number of seconds as an argument.
## Source URL←追記
- When you retrieve information from the vector database using `search_vector_db`, if the search results include source URLs, please include those URLs in your response to the user.
- When providing source URLs, please format them as follows in markdown: [source_url](source_url). For example, if the source URL is https://example.com/info, include it in your response as [source_url](https://example.com/info).
"""
(省略)
これでエージェントがクエリを実行できるようになりました!
メタデータの追加方法
ナレッジベースにメタデータを追加するには、データソースのS3バケットのオブジェクトと同じ階層に「オブジェクト名.metadata.json」というファイルを作成してあげる必要があります。
また、ベクトルデータベースにAuroraを採用している場合は、Auroraにメタデータ用のカラムを追加してあげる必要があります。
ナレッジベースの作成に関してはたくさん記事もありますし、こだわりがなければクイック作成オプションをご利用いただければいいかと思います!
ちなみに、デフォルトVPCを削除しているとクイック作成オプションうまく行かないと思いますが、クイック作成のCfnテンプレートを入手して、VPCとサブネットをパラメータ化してService Catalogに置いておくと、カスタムVPCでもクイック作成オプションのようなことができます!
1.Auroraにカラムを追加する
Auroraに以下のクエリを実行します。
スキーマ、テーブル名は適宜変更してください。
ALTER TABLE bedrock_integration.bedrock_kb
ADD COLUMN IF NOT EXISTS source_url VARCHAR(256);
2.metadata.jsonファイルを作成して、アップロードする
以下のjsonファイルをS3に配置したソースファイルと同じ場所に「ソースファイル名.metadata.json」というファイル名で格納します。
{
"metadataAttributes": {
"source_url": "https://example.com/yamadasystem.txt"
}
}
3.同期する
ここまで出来たらいつも通り、ナレッジベースで同期します!
同期をするとメタデータが追加されていることがわかります!
試してみる
さっそくどんな感じで出力されるか試してみましょう!
登録したツールを使ってくれていますね!
ソースURLも出してくれてます!
URLの出し方についてはシステムプロンプトを調整することでよりキレイに出すことができると思います!
日本の山田太郎にあたる各国のものってなんだろうか?
metadata.jsonファイルの作成を自動化する
弊社では、S3へのファイルのアップロードはエンジニアじゃない方も行います。
メタデータファイルの作成とアップロードは面倒ですし、エラーの原因にもなるので、S3にファイルアップロード時にタグを追加してもらうことで、自動的にメタデータファイルを作成するようにします。
1.Lambda関数の作成
以下のコードで関数を作成します。
S3イベント通知のフィルターに除外が使えないので、Lambda側でmetadata.jsonは除外します。
SAMテンプレート
AWSTemplateFormatVersion: '2010-09-09'
Transform: AWS::Serverless-2016-10-31
Parameters:
BucketName:
Type: String
Description: The name of the S3 bucket to monitor for new objects.
Resources:
# Generate metadata when a new object is created in the S3 bucket
MetadataGeneratorFunction:
Type: AWS::Serverless::Function
Properties:
FunctionName: MetadataGenerator
CodeUri: metadata_generator_function/
Runtime: python3.13
Handler: app.lambda_handler
Policies:
- Version: '2012-10-17'
Statement:
- Effect: Allow
Action:
- s3:GetObject
- s3:PutObject
- s3:GetObjectTagging
Resource:
- !Sub arn:aws:s3:::${BucketName}/*
# Allow the S3 bucket to invoke the Lambda function
MetadataGeneratorFunctionPermission:
Type: AWS::Lambda::Permission
Properties:
Action: 'lambda:InvokeFunction'
FunctionName: !Ref MetadataGeneratorFunction
Principal: 's3.amazonaws.com'
SourceAccount: !Ref 'AWS::AccountId'
SourceArn: !Sub arn:aws:s3:::${BucketName}
metadata_generator_function/app.py
import json
import boto3
import os
import urllib.parse
import re
import logging
logger = logging.getLogger()
logger.setLevel(logging.INFO)
s3_client = boto3.client('s3')
def lambda_handler(event, context):
logger.info(f"Received event: {json.dumps(event)}")
# Get the bucket name and object key from the event
bucket = event['Records'][0]['s3']['bucket']['name']
key = urllib.parse.unquote_plus(event['Records'][0]['s3']['object']['key'])
# Skip processing if the file is a metadata file
if key.endswith('.metadata.json'):
logger.info(f"Skipping metadata file: {key}")
return {
'statusCode': 200,
'body': json.dumps('Skipped metadata file')
}
try:
# Get source_url tag key
obj_tag_list = s3_client.get_object_tagging(
Bucket=bucket,
Key=key
).get('TagSet', [])
source_url = next((tag['Value'] for tag in obj_tag_list if tag['Key'] == 'source_url'), None)
if source_url is None:
logging.info(f"No source_url tag found for {key}")
return {
'statusCode': 200,
'body': json.dumps('No source_url tag found, skipping metadata generation')
}
# Prepare metadata content
metadata_content = {
"metadataAttributes": {
"source_url": source_url
}
}
# Define the metadata file key
metadata_key = f"{key}.metadata.json"
# Upload the metadata file to S3
s3_client.put_object(
Bucket=bucket,
Key=metadata_key,
Body=json.dumps(metadata_content, indent=4),
ContentType='application/json'
)
return {
'statusCode': 200,
'body': json.dumps('Metadata processing complete')
}
except Exception as e:
print(f"Error processing {key}: {str(e)}")
return {
'statusCode': 500,
'body': json.dumps(f'Error: {str(e)}')
}
2.イベント通知の作成
S3のプロパティからイベント通知を作成します。
以下のような設定で作成します。
3.オブジェクト作成時にタグ付けをする
ファイルアップロード時に以下のようにタグ付けをします。
ファイルが自動的に作成されることを確認してください!
あとは先ほどの手順通り同期すればOKです!
さいごに
今回はRAGチャットにソースURLを出力する方法を試してみました!
一気にアップロードしたい場合など、CSV台帳化してそこから生成する方法などもあるかと思ってますが、どっちの方がいいかは検討中です!
弊社では一緒に働く仲間を募集中です!
現在、様々な職種を募集しております。
カジュアル面談も可能ですので、ご連絡お待ちしております!
募集内容等詳細は、是非採用サイトをご確認ください。







