0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

カスタムコネクタを使用して Amazon MSK から Amazon Bedrock ナレッジベースにデータをストリーミングして取り込む

0
Last updated at Posted at 2026-03-22

はじめに

AWS Blog "Stream ingest data from Kafka to Amazon Bedrock Knowledge Bases using custom connectors" では、Amazon Bedrock のカスタムコネクタ(カスタムデータソース)を使用して Amazon MSK から Amazon Bedrock ナレッジベースにデータをストリーミングして取り込む方法を紹介しています。本記事では、この記事の内容の検証と改良点を紹介します。

この AWS Blog 記事は 2025 年 4 月に公開されました。検証を行った 2026 年 2 月時点では、記事の手順にある Apache Kafka や Java + Kafka CLI クライアントで使われているミドルウェアのバージョンがかなり古く、手順どおりに実行するとエラーが発生しました。

そこで、本記事では AWS 推奨の Apache Kafka や、2026 年 2 月に公開された Amazon MSK CreateTopic API を使用するなど、より最新のバージョンを使用した手順を紹介します。

参考情報

サンプル実装の概要

AWS Blog で紹介されているサンプル実装の概要

AWS Blog で紹介されているサンプル実装の概要は、Apache Kafka(Amazon MSK)からのストリーミングデータを Lambda でポーリングし、そのデータを Amazon Bedrock Knowledge Bases にリアルタイムで取り込み、RAG で即座に検索・回答可能にするものです。ベクトルストアには、OpenSearch Serverless を使用しています。
投入するデータのサンプルとして、架空の株価データを使用しています。株価動向の把握に関心のあるユーザーの利用をユースケースとしています。

従来の RAG アーキテクチャでは、ドキュメントを S3 にアップロードし、Bedrock Knowledge Base のデータソース同期(バッチ処理)でベクトルストアに反映するのが一般的です。この方式では、データの規模によっては同期完了まで数分~数十分のラグが発生します。サンプル実装では IngestKnowledgeBaseDocuments API を使ったインライン取り込みによって、データ発生から数秒以内にナレッジベースへ反映するリアルタイムパイプラインを実現しています。

AWS Blog のサンプル実装では、Amazon SageMaker Studio のノートブックを使用して Kafka トピックの作成や Lambda 関数に MSK をイベントソースとして設定する手順を紹介しています。以下のような手順で環境構築を行っています。

元記事での環境構築手順

  1. 手動で Bedrock Knowledge Base を作成(ベクトルストアに OpenSearch Serverless を使用)
  2. CloudFormation テンプレートを使用してスタックを実行
  3. SageMaker Studio のノートブックを使用して、Apache Kafka クライアントで Kafka トピックの作成や Lambda 関数に MSK をイベントソースとして設定
  4. SageMaker Studio のノートブックを使用して、テストデータを Amazon MSK に投入

上記のとおり、Kafka トピックの作成は SageMaker Studio 内のターミナルを使い、Java + Kafka CLI クライアントで行っています。この方法では、手動コマンド実行が必要になり、ノートブックのセル実行だけではトピック作成・確認が完結しません。また、使われている Apache Kafka クライアントのバージョンが古いため、そのままでは動作しません。

本記事で作成するサンプル実装の概要

本記事では、ベクトルストアに S3 Vectors を使用することで検証コストを抑える構成としました。そのほかに、以下の変更を行いました。

  • Apache Kafka バージョンを 2.8.0 から AWS 推奨の 3.9 に更新
    • AWS 推奨バージョンへの追従であり、MSK Topic Management API(CreateTopic 等)の利用に必要な 3.6 以上の要件を満たす。なお、MSK Topic Management API は Provisioned クラスター限定で、MSK Serverless では利用不可
  • Kafka トピックの作成を Java + Apache Kafka クライアントから MSK CreateTopic API(boto3)に変更
    • 手動コマンド実行が不要になり、ノートブックのセル実行だけでトピック作成・確認が完結
  • Lambda 予約同時実行数の変更: 50 → 1 に削減
    • 1 トピック・1 パーティションのサンプル構成では同時実行 1 で十分であり、不要なコスト発生を防止。本番環境ではパーティション数や負荷に応じて調整が必要
  • Lambda boto3 レイヤーの導入
    • 元記事では Lambda コード内で pip install boto3 を実行してコールドスタートのたびにパッケージをインストールしている。これを Lambda Layer に置き換え、Lambda 関数のコールドスタート時間を短縮
  • MSK CloudWatch Logs の追加
    • MSK ブローカーログを CloudWatch Logs に出力する設定を追加し、トラブルシューティングを容易に
  • IAM ポリシーの拡充
    • SageMaker 実行ロールに MSK Topic Management API 用のパーミッション(kafka:CreateTopic、kafka:ListTopics、kafka:DeleteTopic 等)を追加
  • CloudFormation テンプレートにパラメータを追加
    • KnowledgeBaseName、DataSourceName、Boto3LayerArn を追加し、スタック作成時にカスタマイズ可能に

構成図

本記事の全体構成を以下に示します。元記事の手順に従い VPC を作成し、その中に MSK と Lambda を配置しています。この構成を試す際は、サンドボックス用のアカウントを使用することを推奨します。

この構成では、Amazon MSK と NAT Gateway で時間単位の費用が生じます。検証後は、後述の後片付けを行ってリソースを削除してください。

構成図

主要コンポーネントの役割は以下のとおりです。

  • Amazon MSK
    • kafka.t3.small × 2 ブローカー(2 AZ)、Kafka 3.9.x
    • ストリーミングデータの受信基盤
  • AWS Lambda
    • Python 3.13
    • MSK Event Source Mapping 経由で自動起動
    • Kafka メッセージをデコードし、Bedrock Knowledge Base にインジェスト
  • Amazon Bedrock Knowledge Base
    • カスタムデータソース + Titan Text Embeddings V2 でテキストをベクトル化
  • Amazon S3 Vectors
    • ベクトルデータの格納先
    • 従量課金(最低利用料金なし)で検証コストが低額
  • SageMaker Studio
    • VpcOnly モードでプライベートサブネットに配置
    • ノートブックから MSK への直接接続とセットアップを担当

環境構築

本記事で使用するコードを GitHub に公開しています。

以降、すべての AWS リソースを us-east-1 リージョンで作成することを前提とします。他のリージョンを使用する場合は、記事内のリージョン指定を使用するリージョン名に読み替えてください。

Knowledge Bases の作成

ナレッジベースの新規作成手順に沿ってナレッジベースを作成します。ベクトルストアを使うため、Knowledge Base with vector store を選択します。

Knowledge Basesの作成

ナレッジベース名を BedrockStreamIngestKnowledgeBase とします。任意の名前でも構いませんが、その場合は後述の CloudFormation テンプレートを実行する際に、KnowledgeBaseName パラメータを指定する必要があります。

Knowledge Basesの名称設定

データソースタイプは、Custom を選択します。

データソースタイプの選択

データソース名を BedrockStreamIngestKBCustomDS とします。ここも、任意の名前を使う場合は後述の CloudFormation テンプレートを実行する際に、DataSourceName パラメータを指定する必要があります。

データソース名の設定

埋め込みモデルは、Titan Text Embeddings V2 を選択します。

埋め込みモデルの選択

ベクトルストアの作成は、Quick Create a new vector store、ベクトルストアタイプは Amazon S3 Vectors を選択します。

ベクトルストアの作成

このようにしてナレッジベースとデータソースを作成します。

boto3 Lambda Layer の作成

以下のコマンドでリポジトリをクローンします。

git clone --branch v20260217 https://github.com/revsystem/sample-stream-ingest-amazon-bedrock-knowledge-base.git

必要に応じて、プロファイルとリージョンを指定し、スクリプトを実行します。

cd sample-stream-ingest-amazon-bedrock-knowledge-base/scripts
./build-boto3-layer.sh --profile {YOUR_PROFILE} --region {YOUR_REGION}

実行すると以下のようなログが表示されます。Boto3LayerArn は CloudFormation テンプレートを実行する際に使用します。

==> Using AWS profile: {YOUR_PROFILE}
==> Using AWS region: {YOUR_REGION}
==> Creating build directory: {YOUR_DIRECTORY}/sample-stream-ingest-amazon-bedrock-knowledge-base/build/layer
==> Installing boto3>=1.42.46 into python/
==> Creating zip (root must contain python/)
==> Publishing layer: aws lambda publish-layer-version --layer-name boto3-layer --zip-file fileb://boto3-layer.zip --compatible-runtimes python3.13

Boto3LayerArn for CloudFormation parameter:
  arn:aws:lambda:{YOUR_REGION}:{YOUR_ACCOUNT_ID}:layer:boto3-layer:1

Example create-stack:
  ParameterKey=Boto3LayerArn,ParameterValue=arn:aws:lambda:{YOUR_REGION}:{YOUR_ACCOUNT_ID}:layer:boto3-layer:1

CloudFormation テンプレートを使用してスタックを作成

さきほどクローンしたリポジトリの templates ディレクトリに移動します。ここにある bedrock-kb-stream-ingest.yml を使ってスタックを作成します。

cd sample-stream-ingest-amazon-bedrock-knowledge-base/templates

元記事を踏襲し、CloudFormation テンプレートを使用してスタックを作成します。Create stack をクリックします。

CloudFormationのスタック作成

bedrock-kb-stream-ingest.yml を使ってスタックを作成します。Prepare template では、Choose an existing template を選択します。Specify template は、Upload a template file を選択し、bedrock-kb-stream-ingest.yml をアップロードします。

CloudFormationのテンプレートを使用してスタックを作成

先ほど作成した Boto3LayerArn、ナレッジベースとデータソースの名前を設定します。スタック名は任意ですが、ここでは BedrockStreamIngest とします。

CloudFormationのテンプレートのパラメータ設定

その他の項目はデフォルトのままで問題ありません。Create stack をクリックします。リソースの作成完了まで、数十分ほどかかります。

CloudFormation テンプレートを使用してスタックを作成(AWS CLI)

AWS CLI を使用してスタックを作成する場合は、以下のコマンドを実行します。CloudFormation の画面と同様に、スタック名、Boto3LayerArn、ナレッジベースとデータソースの名前を設定します。

aws cloudformation deploy \
  --template-file templates/bedrock-kb-stream-ingest.yml \
  --stack-name BedrockStreamIngest \
  --parameter-overrides \
    KnowledgeBaseName=BedrockStreamIngestKnowledgeBase \
    DataSourceName=BedrockStreamIngestKBCustomDS \
    Boto3LayerArn=arn:aws:lambda:{YOUR_REGION}:{YOUR_ACCOUNT_ID}:layer:boto3-layer:1 \
  --capabilities CAPABILITY_IAM \
  --profile {YOUR_PROFILE} \
  --region {YOUR_REGION}

SageMaker Studio から Jupyter Notebook を起動

SageMaker Studio から Profile を選択します。https://us-east-1.console.aws.amazon.com/sagemaker/home?region=us-east-1#/studio-landing にアクセスすると、以下のような画面が表示されます。

デフォルトで表示されているプロファイルBedrockStreamIngestSageMakerUserProfileを選択します。

Get Started with SageMaker Studio

JupyterLab をクリックし、メニューに進みます。

JupyterLabをクリック

Quick start をクリックし、スペースを起動します。

Quick start で Space を起動

ステータスが Running になったら、Open をクリックして、Notebook を起動します。

Open Notebook

notebooks ディレクトリにあるファイルをアップロードします。アップロードは、下記スクリーンショットの Upload Files ボタンをクリックします。

cd sample-stream-ingest-amazon-bedrock-knowledge-base/notebooks

ファイルアップロード

Notebook を実行

1.Setup.ipynb と 2.StreamIngest.ipynb を順に実行していきます。

1.Setup.ipynb

1.Setup.ipynb では Kafka トピックの作成や、Lambda 関数に MSK をイベントソースとして設定します。

1.Setup.ipynb をダブルクリックして開きます。初回実行時は以下のダイアログが表示されます。Python 3(ipykernel) を選択します。

Select Kernel

上から順にセルを実行していきます。2番目のセルを実行すると以下のようにエラーメッセージが表示されます。これは、SageMaker Studio の既存パッケージと一部競合が発生しているためです。コメントに書いているとおり、"Successfully installed boto3-... botocore-..." というメッセージが表示されていれば正常です。

pip boto3インストールエラー

# Set the stack name and Kafka topic name のセルには、スタック名と Kafka トピック名を設定します。スタック名は、CloudFormation テンプレートを実行する際に設定したスタック名を入力します。Kafka トピック名は、任意の名前を入力します。

# Create Kafka topic using MSK CreateTopic API のセルを実行すると、Amazon MSK に Kafka トピックを作成します。トピックの作成には数分かかるため、セルの実行結果には以下のように CREATING と表示されます。

Topic ARN: arn:aws:kafka:{YOUR_REGION}:{YOUR_ACCOUNT_ID}:topic/BedrockStreamIngestMSKCluster/4bec31dd-0396-4f5c-93d7-524abbc52d44-22/streamtopic
Topic Name: streamtopic
Status: CREATING

https://us-east-1.console.aws.amazon.com/msk/home?region=us-east-1#/clusters にアクセスし、クラスターの一覧から BedrockStreamIngestMSKCluster を選択します。その後、Topics タブをクリックし、streamtopic というトピックが表示されていることを確認します。

この後は、最後のセルまで順に実行していきます。

2.StreamIngest.ipynb

2.StreamIngest.ipynb では、Kafka トピックにデータを投入します。

2.StreamIngest.ipynb をダブルクリックして開きます。初回実行時は以下のダイアログが表示されます。Python 3(ipykernel) を選択します。

Select Kernel

上から順にセルを実行していきます。

最後のセルを実行すると、以下のようなログが表示されます。これで、TestData.csv のデータが Kafka トピックに投入されました。

StreamIngest.ipynbの実行結果

データの確認

Bedrock Knowledge Base に戻りデータソース BedrockStreamIngestKBCustomDS の Documents をみると、以下のように投入したデータがインデックスに登録されていることを確認できます。S3 をデータソースとしてバッチ同期する従来方式のような同期処理は不要です。

今回は投入したデータが 17 件と少ないためデータソースへの登録が一瞬で終わっていますが、データ件数が多い場合、順に登録が進む様子を確認できます。

データソースの確認

このように、検索と回答生成も正常に動作していることを確認できます。

検索と回答生成

注意点

今回は使用したサンプルデータの件数が少ないため、Bedrock の API 呼び出し制限をあまり考慮していません。大量のデータを投入する場合は、API レート制限を考慮して実装する必要があります。

後片付け

Notebook の 3.Cleanup.ipynb を実行し、Lambda のイベントソース設定を削除します。次に、JupyterLab のスペースを停止し、スペースを削除します。スペースの削除を忘れると、スタックの削除時に SageMaker User Profile の削除に失敗します。その場合は、スペースを削除後にスタックの削除を再実行します。

JupyterLabのスペースの停止

CloudFormation のスタックを削除します。削除には数十分かかります。

スタックの削除

Bedrock Knowledge BaseとS3 Vectorsは手動で削除します。

まとめ

本記事では、Amazon Bedrock のカスタムデータソース(カスタムコネクタ)と Amazon MSK を活用し、ストリーミングデータをリアルタイムで Bedrock Knowledge Base に取り込む最新手法について解説しました。従来のバッチ処理(S3 同期)に比べ、即時性の高い検索・回答生成が可能となる点が大きな特徴です。また、ベクトルストアに S3 Vectors を利用することで、検証コストと運用負担を大幅に削減しています。

この構成の主な特徴・利点は以下のとおりです。

  • ストリーミングデータを数秒以内にナレッジベースへリアルタイム取り込み
  • データ同期の待ち時間がなく、即座に検索・回答生成が可能
  • ベクトルストアに S3 Vectors を選択することで、最小限のコストで検証が可能
  • Kafka トピック管理・データ投入をノートブック上のセル実行のみで完結できるシンプルな運用
  • Lambda Layer による boto3 導入/高速コールドスタート、CloudWatch Logs による運用性向上

このように、最新の AWS サービスと推奨アーキテクチャを組み合わせることで、低コストかつ即応性の高い RAG システムを実現可能です。

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?