Amazon BedrockのKnowledge Baseは、RAG(Retrieval-Augmented Generation)を簡単に実装できる強力なサービスです。しかし、新しいドキュメントを追加する度に手動で同期処理を実行するのは面倒ですよね。
今回は、S3にファイルをアップロードするだけで自動的にKnowledge Baseに同期される仕組みを構築してみました。
システム概要
処理フロー
- S3バケットにPDFや文書ファイルをアップロード
- S3イベント通知がファイルアップロードを検知
- イベントトリガーでLambda関数が自動実行
- Bedrock APIを呼び出してKnowledge Baseの同期を開始
- RAGで検索・回答が可能になる
実装手順
0. 前提条件
実装するためには以下の作成・設定が必要です
- Bedrock Knowledge Baseが作成済み
- S3バケットが作成済み
- 適切なIAM権限の設定
1. Lambda関数の作成
S3イベントを受信してBedrock APIを呼び出すLambda関数を作成します
1-1.Lambdaコンソールから「関数を作成」をクリック
1-2. 基本的な情報を入力し「関数の作成」をクリック
1-3. コードを実装
以下のコードを張り付け、Deployする
import json
import boto3
import os
from urllib.parse import unquote_plus
def lambda_handler(event, context):
# 環境変数から情報を取得
knowledge_base_id = os.getenv("KNOWLEDGE_BASE_ID")
data_source_id = os.getenv("DATA_SOURCE_ID")
s3_bucket_name = os.getenv("S3_BUCKET_NAME")
# 必須環境変数が設定されているか確認
if not knowledge_base_id or not data_source_id:
error_msg = "Missing required environment variables"
return {"statusCode": 400, "body": json.dumps(error_msg)}
# Bedrockクライアントを初期化
bedrock_client = boto3.client("bedrock-agent")
processed_files = 0
try:
# S3イベントを処理
for i, record in enumerate(event.get("Records", [])):
bucket = record["s3"]["bucket"]["name"]
key = unquote_plus(record["s3"]["object"]["key"])
event_name = record["eventName"]
# バケット名が一致しない場合はスキップ
if s3_bucket_name and bucket != s3_bucket_name:
continue
# サポートされているファイル形式でない場合はスキップ
if not is_supported_file(key):
continue
# ファイルがアップロードされた場合
if event_name.startswith("ObjectCreated"):
try:
response = bedrock_client.start_ingestion_job(
knowledgeBaseId=knowledge_base_id,
dataSourceId=data_source_id,
description=f"S3バケットからのアップロード: {key}",
)
ingestion_job_id = response["ingestionJob"]["ingestionJobId"]
processed_files += 1
except Exception as bedrock_error:
continue
else:
continue
return {
"statusCode": 200,
"body": json.dumps(
{
"message": "S3 events processed successfully",
"processed_files": processed_files,
"total_records": len(event.get("Records", [])),
}
),
}
except Exception as e:
return {"statusCode": 500, "body": json.dumps(f"Error: {str(e)}")}
# サポートされているファイル形式を確認
def is_supported_file(key):
supported_extensions = [".pdf", ".txt", ".md", ".docx", ".html", ".htm", ".csv"]
is_supported = any(key.lower().endswith(ext) for ext in supported_extensions)
return is_supported
1-4. 基本設定のタイムアウトを5分に設定
2. 環境変数の設定
Lambda関数に以下の環境変数を設定します(XXXXXを適切な値に設定してください)
3. IAM権限の設定
Lambdaの実行ロールに以下の権限があるかを確認する
- AmzonBedrockFullAccess
- S3バケット読み取り権限
4. イベント通知の設定
S3の「プロパティ」-「イベント通知」からイベント通知を作成をクリックする
以下の内容でイベント通知を作成する
- イベント名:任意の名前
- サフィックス:追加するファイルの拡張子を指定する
- 例
.pdf.txt.docx
- 例
- イベントタイプ:「すべてのオブジェクト作成イベント」にチェックする
5. テスト
Lambda関数が正常に動かくか確認しましょう
以下のJSONでテストします
{
"Records": [
{
"eventName": "ObjectCreated:Put",
"s3": {
"bucket": {
"name": "<実際のバケット名>"
},
"object": {
"key": "test-document.pdf"
}
}
}
]
}
この結果が帰ってくれば成功です!
{
"statusCode": 200,
"body": "{\"message\": \"S3 events processed successfully\", \"processed_files\": 1, \"total_records\": 1}"
}



