1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

S3にファイルをアップロードからBedrock Knowledge Baseの同期まで自動化する

Posted at

Amazon BedrockのKnowledge Baseは、RAG(Retrieval-Augmented Generation)を簡単に実装できる強力なサービスです。しかし、新しいドキュメントを追加する度に手動で同期処理を実行するのは面倒ですよね。

今回は、S3にファイルをアップロードするだけで自動的にKnowledge Baseに同期される仕組みを構築してみました。

システム概要

aws_architecture_drawio.io-Bedrock_sync.drawio.png

処理フロー

  1. S3バケットにPDFや文書ファイルをアップロード
  2. S3イベント通知がファイルアップロードを検知
  3. イベントトリガーでLambda関数が自動実行
  4. Bedrock APIを呼び出してKnowledge Baseの同期を開始
  5. RAGで検索・回答が可能になる

実装手順

0. 前提条件

実装するためには以下の作成・設定が必要です

  • Bedrock Knowledge Baseが作成済み
  • S3バケットが作成済み
  • 適切なIAM権限の設定

1. Lambda関数の作成

S3イベントを受信してBedrock APIを呼び出すLambda関数を作成します

1-1.Lambdaコンソールから「関数を作成」をクリック
1-2. 基本的な情報を入力し「関数の作成」をクリック
  • 関数名:任意の名前
  • ランタイム:Python 3.12
  • 実行ロール:S3オブジェクトの読み取り専用アクセス権限を追加する
    スクリーンショット 2025-06-10 152131.png
1-3. コードを実装

以下のコードを張り付け、Deployする

import json
import boto3
import os
from urllib.parse import unquote_plus


def lambda_handler(event, context):

    # 環境変数から情報を取得
    knowledge_base_id = os.getenv("KNOWLEDGE_BASE_ID")
    data_source_id = os.getenv("DATA_SOURCE_ID")
    s3_bucket_name = os.getenv("S3_BUCKET_NAME")

    # 必須環境変数が設定されているか確認
    if not knowledge_base_id or not data_source_id:
        error_msg = "Missing required environment variables"
        return {"statusCode": 400, "body": json.dumps(error_msg)}

    # Bedrockクライアントを初期化
    bedrock_client = boto3.client("bedrock-agent")

    processed_files = 0

    try:
        # S3イベントを処理
        for i, record in enumerate(event.get("Records", [])):
            bucket = record["s3"]["bucket"]["name"]
            key = unquote_plus(record["s3"]["object"]["key"])
            event_name = record["eventName"]

            # バケット名が一致しない場合はスキップ
            if s3_bucket_name and bucket != s3_bucket_name:
                continue

            # サポートされているファイル形式でない場合はスキップ
            if not is_supported_file(key):
                continue

            # ファイルがアップロードされた場合
            if event_name.startswith("ObjectCreated"):
                try:
                    response = bedrock_client.start_ingestion_job(
                        knowledgeBaseId=knowledge_base_id,
                        dataSourceId=data_source_id,
                        description=f"S3バケットからのアップロード: {key}",
                    )

                    ingestion_job_id = response["ingestionJob"]["ingestionJobId"]
                    processed_files += 1

                except Exception as bedrock_error:
                    continue
            else:
                continue

        return {
            "statusCode": 200,
            "body": json.dumps(
                {
                    "message": "S3 events processed successfully",
                    "processed_files": processed_files,
                    "total_records": len(event.get("Records", [])),
                }
            ),
        }

    except Exception as e:
        return {"statusCode": 500, "body": json.dumps(f"Error: {str(e)}")}


# サポートされているファイル形式を確認
def is_supported_file(key):
    supported_extensions = [".pdf", ".txt", ".md", ".docx", ".html", ".htm", ".csv"]
    is_supported = any(key.lower().endswith(ext) for ext in supported_extensions)
    return is_supported
1-4. 基本設定のタイムアウトを5分に設定

スクリーンショット 2025-06-10 154022.png

2. 環境変数の設定

Lambda関数に以下の環境変数を設定します(XXXXXを適切な値に設定してください)

  • KNOWLEDGE_BASE_ID = XXXXXXXXXX
  • DATA_SOURCE_ID = XXXXXXXXXX
  • S3_BUCKET_NAME = XXXXXXXXXXXX
    スクリーンショット 2025-06-10 154326.png

3. IAM権限の設定

Lambdaの実行ロールに以下の権限があるかを確認する

  • AmzonBedrockFullAccess
  • S3バケット読み取り権限

4. イベント通知の設定

S3の「プロパティ」-「イベント通知」からイベント通知を作成をクリックする
以下の内容でイベント通知を作成する

  • イベント名:任意の名前
  • サフィックス:追加するファイルの拡張子を指定する
    • .pdf .txt .docx
  • イベントタイプ:「すべてのオブジェクト作成イベント」にチェックする

5. テスト

Lambda関数が正常に動かくか確認しましょう
以下のJSONでテストします

{
  "Records": [
    {
      "eventName": "ObjectCreated:Put",
      "s3": {
        "bucket": {
          "name": "<実際のバケット名>"
        },
        "object": {
          "key": "test-document.pdf"
        }
      }
    }
  ]
}

この結果が帰ってくれば成功です!

{
  "statusCode": 200,
  "body": "{\"message\": \"S3 events processed successfully\", \"processed_files\": 1, \"total_records\": 1}"
}
1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?