はじめに
昨今話題となっているRAGですが、RAGを構築するのに欠かせない存在がベクトルデータベースです。
このベクトルデータベースが高価なために、RAGのPoCが進まないだったり、気軽に使えないという方も多いのではないでしょうか。
AWSでベクトルデータベースとなると選択肢は以下でした。
- Kendra
- OpenSearch
- Aurora Postgres
Kendraはもちろん、OpenSearchも気軽に立てられるほど安価ではありません。
AuroraでACUの最小値を0にして使用するのが、PoC時最も安価に利用できる方法でした。
今回、AWSが発表したAmazon S3 Vectorsの登場により、RAGアプリケーションの構築がより気軽に行えるようになりました。
コストやパフォーマンスは概ね以下のようなイメージです。
これらをユースケースによって、使い分ける必要があります。
私は現状以下のようなユースケースを想定しています。
ベクトルデータベース | ユースケース |
---|---|
S3 Vectors | PoC 小規模なRAGアプリケーション とにかくコスト優先したい場合 |
Aurora | 小規模から中規模なRAGアプリケーション VPC内にデータを保管したい場合 |
OpenSearch | 中規模なRAGアプリケーション |
Kendra | 大規模なRAGアプリケーション |
本記事では非常に安価なRAGアプリケーションをServiceCatalogからワンクリックで実装できるようにしたいと思います!
ワンクリックRAGアプリ
ユーザーはRAGのソースにしたいファイルをアップロードするだけで、新しいRAGチャットを作成することができます。
アプリケーション作成時に名前と説明、ソースとなるファイルをアップロードします。
アプリケーションを作成すると、Service Catalogの製品が起動し、新たなナレッジベースとS3 Vectors Bucketを作成します。
(構築時点でS3 Vectors Bucketはプレビューのため、バージニア北部で構築、また、CloudFormation未対応であるため、カスタムリソースで作成しています。)
Service Catalogの製品テンプレート
AWSTemplateFormatVersion: '2010-09-09'
Description: 'Minimum Custom RAG with S3 vectors'
Parameters:
ApplicationName:
Type: String
Description: 'Name of the app'
Boto3LayerArn:
Type: String
Default: 'arn:aws:lambda:us-east-1:123456789123:layer:boto3-139:1'
Resources:
# S3 Vector and Knowledge Base Creation Role
S3VectorKnowledgeBaseRole:
Type: AWS::IAM::Role
Properties:
AssumeRolePolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Principal:
Service: lambda.amazonaws.com
Action: sts:AssumeRole
ManagedPolicyArns:
- arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole
Policies:
- PolicyName: S3VectorKnowledgeBasePolicy
PolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Action:
- s3vectors:*
- bedrock:*
- bedrock-agent:*
- iam:PassRole
- s3:*
Resource: '*'
# S3 Vector and Knowledge Base Creation Function
S3VectorKnowledgeBaseFunction:
Type: AWS::Lambda::Function
Properties:
FunctionName: !Sub '${ApplicationName}-s3vector-kb-creator'
Runtime: python3.13
Handler: index.handler
Role: !GetAtt S3VectorKnowledgeBaseRole.Arn
Timeout: 600
Layers:
- !Ref Boto3LayerArn
Code:
ZipFile: !Sub |
import boto3
import cfnresponse
import json
import time
class VectorBucket:
def __init__(self, bucket_name):
self.bucket_name = bucket_name
self.client = boto3.client('s3vectors')
def _create_vector_bucket(self):
try:
response = self.client.create_vector_bucket(
vectorBucketName=self.bucket_name,
encryptionConfiguration={
'sseType': 'AES256'
}
)
except Exception as e:
print(f"Error creating vector bucket: {e}")
return None
try:
response = self.client.list_vector_buckets(
prefix=self.bucket_name
)
return response['vectorBuckets'][0]
except Exception as e:
print(f"Error listing vector buckets: {e}")
return None
def _create_vector_index(self):
try:
response = self.client.create_index(
vectorBucketName=self.bucket_name,
indexName=f"{self.bucket_name}-index",
dataType='float32',
dimension=1024,
distanceMetric='cosine',
metadataConfiguration={
'nonFilterableMetadataKeys': [
'AMAZON_BEDROCK_TEXT',
]
}
)
except Exception as e:
if 'ConflictException' in str(e):
print("Index already exists, proceeding to list indexes.")
pass
else:
print(f"Error creating vector index: {e}")
return None
try:
response = self.client.list_indexes(
vectorBucketName=self.bucket_name,
prefix=f"{self.bucket_name}-index"
)
return response['indexes'][0]
except Exception as e:
print(f"Error creating vector index: {e}")
return None
def _delete_vector_bucket(self):
try:
response = self.client.delete_vector_bucket(
vectorBucketName=self.bucket_name
)
print(f"Deleted vector bucket: {self.bucket_name}")
except Exception as e:
print(f"Error deleting vector bucket: {e}")
return None
return response
def _delete_vector_index(self):
try:
response = self.client.delete_index(
vectorBucketName=self.bucket_name,
indexName=f"{self.bucket_name}-index"
)
print(f"Deleted vector index: {self.bucket_name}-index")
except Exception as e:
print(f"Error deleting vector index: {e}")
return None
return response
def handler(event, context):
try:
print(f"Event: {json.dumps(event)}")
if event['RequestType'] == 'Create':
# Step 1: Create S3 Vector bucket
vector_bucket_name = event['ResourceProperties']['VectorBucketName']
vector_bucket = VectorBucket(vector_bucket_name)
print(f"Creating S3 Vector bucket: {vector_bucket_name}")
create_bucket_response = vector_bucket._create_vector_bucket()
vector_bucket_arn = create_bucket_response.get('vectorBucketArn', '')
print(f"S3 Vector bucket created: {vector_bucket_arn}")
# Wait for bucket to be ready
time.sleep(3)
# Step 2: Create Vector Index
index_name = event['ResourceProperties']['IndexName']
print(f"Creating Vector Index: {index_name}")
index_response = vector_bucket._create_vector_index()
index_arn = index_response.get('indexArn', '')
print(f"Vector Index created: {index_arn}")
# Wait for index to be ready
time.sleep(3)
# Step 3: Create Knowledge Base
bedrock_client = boto3.client('bedrock-agent')
kb_name = event['ResourceProperties']['KnowledgeBaseName']
kb_role_arn = event['ResourceProperties']['KnowledgeBaseRoleArn']
embedding_model_arn = event['ResourceProperties']['EmbeddingModelArn']
print(f"Creating Knowledge Base: {kb_name}")
kb_response = bedrock_client.create_knowledge_base(
name=kb_name,
description='RAG Knowledge Base with S3 Vector storage',
roleArn=kb_role_arn,
knowledgeBaseConfiguration={
'type': 'VECTOR',
'vectorKnowledgeBaseConfiguration': {
'embeddingModelArn': embedding_model_arn,
'embeddingModelConfiguration': {
'bedrockEmbeddingModelConfiguration': {
'dimensions': 1024,
'embeddingDataType': 'FLOAT32'
}
}
}
},
storageConfiguration={
'type': 'S3_VECTORS',
's3VectorsConfiguration': {
'vectorBucketArn': vector_bucket_arn,
'indexArn': index_arn
}
},
tags={
'StackName': '${AWS::StackName}'
}
)
print(f"Knowledge Base creation response: {kb_response}")
knowledge_base_id = kb_response['knowledgeBase']['knowledgeBaseId']
knowledge_base_arn = kb_response['knowledgeBase']['knowledgeBaseArn']
print(f"Knowledge Base created: {knowledge_base_id}")
# Step 4: Create Data Source
document_bucket_arn = event['ResourceProperties']['DocumentBucketArn']
print(f"Creating Data Source for bucket: {document_bucket_arn}")
ds_response = bedrock_client.create_data_source(
knowledgeBaseId=knowledge_base_id,
name=f"{kb_name}-data-source",
description='S3 data source for documents',
dataSourceConfiguration={
'type': 'S3',
's3Configuration': {
'bucketArn': document_bucket_arn,
'inclusionPrefixes': ['documents/'] # Optional: specify folder
}
}
)
data_source_id = ds_response['dataSource']['dataSourceId']
print(f"Data Source created: {data_source_id}")
# Physical Resource ID for tracking
physical_id = f"{vector_bucket_name}|{index_name}|{knowledge_base_id}|{data_source_id}"
# Return data for CloudFormation
response_data = {
'VectorBucketName': vector_bucket_name,
'VectorBucketArn': vector_bucket_arn,
'IndexName': index_name,
'IndexArn': index_arn,
'KnowledgeBaseId': knowledge_base_id,
'KnowledgeBaseArn': knowledge_base_arn,
'DataSourceId': data_source_id,
'Message': 'S3 Vector bucket, index, and Knowledge Base created successfully'
}
cfnresponse.send(event, context, cfnresponse.SUCCESS, response_data, physical_id)
elif event['RequestType'] == 'Delete':
# Parse physical resource ID
physical_id = event.get('PhysicalResourceId', '')
print(f"Physical Resource ID: {physical_id}")
if '|' in physical_id:
parts = physical_id.split('|')
vector_bucket_name = parts[0] if len(parts) > 0 else event['ResourceProperties']['VectorBucketName']
index_name = parts[1] if len(parts) > 1 else event['ResourceProperties']['IndexName']
knowledge_base_id = parts[2] if len(parts) > 2 else None
data_source_id = parts[3] if len(parts) > 3 else None
else:
vector_bucket_name = event['ResourceProperties']['VectorBucketName']
index_name = event['ResourceProperties']['IndexName']
knowledge_base_id = None
data_source_id = None
# Cleanup resources in reverse order
try:
bedrock_client = boto3.client('bedrock-agent')
vector_bucket = VectorBucket(vector_bucket_name)
# Delete Data Source first
if knowledge_base_id and data_source_id:
try:
bedrock_client.delete_data_source(
knowledgeBaseId=knowledge_base_id,
dataSourceId=data_source_id
)
print(f"Deleted data source: {data_source_id}")
time.sleep(5)
except Exception as e:
print(f"Error deleting data source: {e}")
# Delete Knowledge Base
if knowledge_base_id:
try:
bedrock_client.delete_knowledge_base(knowledgeBaseId=knowledge_base_id)
print(f"Deleted knowledge base: {knowledge_base_id}")
time.sleep(10)
except Exception as e:
print(f"Error deleting knowledge base: {e}")
# Delete Vector Index
try:
vector_bucket._delete_vector_index()
print(f"Deleted vector index: {index_name}")
time.sleep(5)
except Exception as e:
print(f"Error deleting vector index: {e}")
# Delete S3 Vector bucket
try:
vector_bucket._delete_vector_bucket()
print(f"Deleted S3 Vector bucket: {vector_bucket_name}")
except Exception as e:
print(f"Error deleting S3 Vector bucket: {e}")
except Exception as delete_error:
print(f"Delete error (ignored): {str(delete_error)}")
pass
cfnresponse.send(event, context, cfnresponse.SUCCESS, {})
elif event['RequestType'] == 'Update':
# For updates, return existing values
physical_id = event.get('PhysicalResourceId', '')
if '|' in physical_id:
parts = physical_id.split('|')
response_data = {
'VectorBucketName': parts[0] if len(parts) > 0 else '',
'IndexName': parts[1] if len(parts) > 1 else '',
'KnowledgeBaseId': parts[2] if len(parts) > 2 else '',
'DataSourceId': parts[3] if len(parts) > 3 else '',
'Message': 'Update completed'
}
else:
response_data = {'Message': 'Update completed'}
cfnresponse.send(event, context, cfnresponse.SUCCESS, response_data, physical_id)
else:
cfnresponse.send(event, context, cfnresponse.SUCCESS, {})
except Exception as e:
print(f"Error: {str(e)}")
import traceback
traceback.print_exc()
cfnresponse.send(event, context, cfnresponse.FAILED, {
'Error': str(e)
})
# S3 Vector and Knowledge Base Custom Resource
S3VectorKnowledgeBase:
Type: AWS::CloudFormation::CustomResource
Properties:
ServiceToken: !GetAtt S3VectorKnowledgeBaseFunction.Arn
VectorBucketName: !Sub '${ApplicationName}-vector-bucket-${AWS::AccountId}'
IndexName: !Sub '${ApplicationName}-vector-index'
KnowledgeBaseName: !Sub '${AWS::StackName}-knowledge-base'
KnowledgeBaseRoleArn: !GetAtt BedrockKnowledgeBaseRole.Arn
EmbeddingModelArn: !Sub 'arn:aws:bedrock:${AWS::Region}::foundation-model/amazon.titan-embed-text-v2:0'
DocumentBucketArn: !GetAtt DocumentBucket.Arn
# Document Storage Bucket
DocumentBucket:
Type: AWS::S3::Bucket
Properties:
BucketName: !Sub '${ApplicationName}-documents-${AWS::AccountId}'
PublicAccessBlockConfiguration:
BlockPublicAcls: true
BlockPublicPolicy: true
IgnorePublicAcls: true
RestrictPublicBuckets: true
# Lambda Execution Role
LambdaExecutionRole:
Type: AWS::IAM::Role
Properties:
AssumeRolePolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Principal:
Service: lambda.amazonaws.com
Action: sts:AssumeRole
ManagedPolicyArns:
- arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole
Policies:
- PolicyName: BedrockAndS3Access
PolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Action:
- bedrock:*
- bedrock-agent:*
- bedrock-agent-runtime:*
- s3:GetObject
- s3:PutObject
- s3:ListBucket
Resource: '*'
# Document Processor Lambda Function
DocumentProcessorFunction:
Type: AWS::Lambda::Function
Properties:
FunctionName: !Sub '${ApplicationName}-document-processor'
Runtime: python3.13
Handler: index.handler
Role: !GetAtt LambdaExecutionRole.Arn
Timeout: 300
Code:
ZipFile: |
import json
import boto3
import os
def handler(event, context):
print(f"Received event: {json.dumps(event)}")
try:
bedrock_agent = boto3.client('bedrock-agent')
# S3イベントまたは直接呼び出しを処理
if 'Records' in event:
# S3イベントからの呼び出し
for record in event['Records']:
bucket = record['s3']['bucket']['name']
key = record['s3']['object']['key']
print(f"Processing file: {bucket}/{key}")
# Knowledge Base sync job start
knowledge_base_id = os.environ.get('KNOWLEDGE_BASE_ID')
data_source_id = os.environ.get('DATA_SOURCE_ID')
if knowledge_base_id and data_source_id:
response = bedrock_agent.start_ingestion_job(
knowledgeBaseId=knowledge_base_id,
dataSourceId=data_source_id
)
print(f"Started ingestion job: {response.get('ingestionJob', {}).get('ingestionJobId')}")
return {
'statusCode': 200,
'body': json.dumps('Document processing initiated')
}
except Exception as e:
print(f"Error: {str(e)}")
return {
'statusCode': 500,
'body': json.dumps(f'Error: {str(e)}')
}
# Bedrock Knowledge Base Role
BedrockKnowledgeBaseRole:
Type: AWS::IAM::Role
Properties:
AssumeRolePolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Principal:
Service: bedrock.amazonaws.com
Action: sts:AssumeRole
Policies:
- PolicyName: BedrockKnowledgeBasePolicy
PolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Action:
- s3vectors:*
- s3:GetObject
- s3:ListBucket
Resource: '*'
- Effect: Allow
Action:
- bedrock:InvokeModel
Resource: '*'
# Lambda Environment Variables Update (Custom Resource)
LambdaEnvironmentUpdater:
Type: AWS::CloudFormation::CustomResource
DependsOn:
- DocumentProcessorFunction
- S3VectorKnowledgeBase
Properties:
ServiceToken: !GetAtt LambdaEnvironmentUpdateFunction.Arn
FunctionName: !Ref DocumentProcessorFunction
KnowledgeBaseId: !GetAtt S3VectorKnowledgeBase.KnowledgeBaseId
DataSourceId: !GetAtt S3VectorKnowledgeBase.DataSourceId
LambdaEnvironmentUpdateFunction:
Type: AWS::Lambda::Function
Properties:
FunctionName: !Sub '${ApplicationName}-env-updater'
Runtime: python3.13
Handler: index.handler
Role: !GetAtt LambdaEnvironmentUpdateRole.Arn
Code:
ZipFile: |
import boto3
import cfnresponse
def handler(event, context):
try:
if event['RequestType'] in ['Create', 'Update']:
lambda_client = boto3.client('lambda')
function_name = event['ResourceProperties']['FunctionName']
knowledge_base_id = event['ResourceProperties']['KnowledgeBaseId']
data_source_id = event['ResourceProperties']['DataSourceId']
# Update Lambda environment variables
lambda_client.update_function_configuration(
FunctionName=function_name,
Environment={
'Variables': {
'KNOWLEDGE_BASE_ID': knowledge_base_id,
'DATA_SOURCE_ID': data_source_id
}
}
)
cfnresponse.send(event, context, cfnresponse.SUCCESS, {})
except Exception as e:
print(f"Error: {str(e)}")
cfnresponse.send(event, context, cfnresponse.FAILED, {})
LambdaEnvironmentUpdateRole:
Type: AWS::IAM::Role
Properties:
AssumeRolePolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Principal:
Service: lambda.amazonaws.com
Action: sts:AssumeRole
ManagedPolicyArns:
- arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole
Policies:
- PolicyName: LambdaUpdatePolicy
PolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Action:
- lambda:UpdateFunctionConfiguration
- lambda:GetFunction
Resource: '*'
# S3 Bucket Notification (Custom Resource to avoid circular dependency)
S3NotificationConfiguration:
Type: AWS::CloudFormation::CustomResource
DependsOn:
- DocumentBucket
- DocumentProcessorFunction
- LambdaEnvironmentUpdater
Properties:
ServiceToken: !GetAtt S3NotificationFunction.Arn
BucketName: !Ref DocumentBucket
LambdaFunctionArn: !GetAtt DocumentProcessorFunction.Arn
S3NotificationFunction:
Type: AWS::Lambda::Function
Properties:
FunctionName: !Sub '${ApplicationName}-s3-notification'
Runtime: python3.13
Handler: index.handler
Timeout: 30
Role: !GetAtt S3NotificationRole.Arn
Code:
ZipFile: |
import boto3
import cfnresponse
import json
def handler(event, context):
try:
s3_client = boto3.client('s3')
lambda_client = boto3.client('lambda')
bucket_name = event['ResourceProperties']['BucketName']
lambda_arn = event['ResourceProperties']['LambdaFunctionArn']
if event['RequestType'] == 'Create':
# Add Lambda permission for S3
try:
lambda_client.add_permission(
FunctionName=lambda_arn,
StatementId='s3-trigger-permission',
Action='lambda:InvokeFunction',
Principal='s3.amazonaws.com',
SourceArn=f'arn:aws:s3:::{bucket_name}'
)
except lambda_client.exceptions.ResourceConflictException:
pass # Permission already exists
# Configure S3 notification
s3_client.put_bucket_notification_configuration(
Bucket=bucket_name,
NotificationConfiguration={
'LambdaFunctionConfigurations': [
{
'Id': 'DocumentProcessorTrigger',
'LambdaFunctionArn': lambda_arn,
'Events': ['s3:ObjectCreated:*']
}
]
}
)
elif event['RequestType'] == 'Delete':
# Remove S3 notification
try:
s3_client.put_bucket_notification_configuration(
Bucket=bucket_name,
NotificationConfiguration={}
)
except:
pass
# Remove Lambda permission
try:
lambda_client.remove_permission(
FunctionName=lambda_arn,
StatementId='s3-trigger-permission'
)
except:
pass
cfnresponse.send(event, context, cfnresponse.SUCCESS, {})
except Exception as e:
print(f"Error: {str(e)}")
cfnresponse.send(event, context, cfnresponse.FAILED, {})
S3NotificationRole:
Type: AWS::IAM::Role
Properties:
AssumeRolePolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Principal:
Service: lambda.amazonaws.com
Action: sts:AssumeRole
ManagedPolicyArns:
- arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole
Policies:
- PolicyName: S3NotificationPolicy
PolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Action:
- s3:PutBucketNotification
- s3:GetBucketNotification
- lambda:AddPermission
- lambda:RemovePermission
Resource: '*'
Outputs:
StackName:
Description: Stack Name
Value: !Ref AWS::StackName
VectorBucketName:
Description: Vector Bucket Name
Value: !Ref DocumentBucket
KnowledgeBaseId:
Description: Knowledge Base Id
Value: !GetAtt S3VectorKnowledgeBase.KnowledgeBaseId
DataSourceId:
Description: Data Source Id
Value: !GetAtt S3VectorKnowledgeBase.DataSourceId
リソースが作成されるとアプリケーションが選択可能になり、チャットを行うことができます。
構成図
かなり省略していますが、大まかに以下のような構成です。
DynamoDBにはユーザーごとに選択できるナレッジベースIDを格納することで、一つのアプリケーションから複数のナレッジベースにつなぐようにしています。
さいごに
RAGチャット便利だけど、コストが高い、あるいは作るのが大変そうという方も多いのではないかと思います。
そんな方でも簡単に安く利用できるサービスだと思いますので、よかったら参考にしてみてください。
この後、他のログインユーザーへの共有機能などを実装予定です。
弊社では一緒に働く仲間を募集中です!
現在、様々な職種を募集しております。
カジュアル面談も可能ですので、ご連絡お待ちしております!
募集内容等詳細は、是非採用サイトをご確認ください。