はじめに
AWS Blog "Stream ingest data from Kafka to Amazon Bedrock Knowledge Bases using custom connectors" では、Amazon Bedrock のカスタムコネクタ(カスタムデータソース)を使用して Amazon MSK から Amazon Bedrock ナレッジベースにデータをストリーミングして取り込む方法を紹介しています。本記事では、この記事の内容の検証と改良点を紹介します。
この AWS Blog 記事は 2025 年 4 月に公開されました。検証を行った 2026 年 2 月時点では、記事の手順にある Apache Kafka や Java + Kafka CLI クライアントで使われているミドルウェアのバージョンがかなり古く、手順どおりに実行するとエラーが発生しました。
そこで、本記事では AWS 推奨の Apache Kafka や、2026 年 2 月に公開された Amazon MSK CreateTopic API を使用するなど、より最新のバージョンを使用した手順を紹介します。
参考情報
サンプル実装の概要
AWS Blog で紹介されているサンプル実装の概要
AWS Blog で紹介されているサンプル実装の概要は、Apache Kafka(Amazon MSK)からのストリーミングデータを Lambda でポーリングし、そのデータを Amazon Bedrock Knowledge Bases にリアルタイムで取り込み、RAG で即座に検索・回答可能にするものです。ベクトルストアには、OpenSearch Serverless を使用しています。
投入するデータのサンプルとして、架空の株価データを使用しています。株価動向の把握に関心のあるユーザーの利用をユースケースとしています。
従来の RAG アーキテクチャでは、ドキュメントを S3 にアップロードし、Bedrock Knowledge Base のデータソース同期(バッチ処理)でベクトルストアに反映するのが一般的です。この方式では、データの規模によっては同期完了まで数分~数十分のラグが発生します。サンプル実装では IngestKnowledgeBaseDocuments API を使ったインライン取り込みによって、データ発生から数秒以内にナレッジベースへ反映するリアルタイムパイプラインを実現しています。
AWS Blog のサンプル実装では、Amazon SageMaker Studio のノートブックを使用して Kafka トピックの作成や Lambda 関数に MSK をイベントソースとして設定する手順を紹介しています。以下のような手順で環境構築を行っています。
元記事での環境構築手順
- 手動で Bedrock Knowledge Base を作成(ベクトルストアに OpenSearch Serverless を使用)
- CloudFormation テンプレートを使用してスタックを実行
- SageMaker Studio のノートブックを使用して、Apache Kafka クライアントで Kafka トピックの作成や Lambda 関数に MSK をイベントソースとして設定
- SageMaker Studio のノートブックを使用して、テストデータを Amazon MSK に投入
上記のとおり、Kafka トピックの作成は SageMaker Studio 内のターミナルを使い、Java + Kafka CLI クライアントで行っています。この方法では、手動コマンド実行が必要になり、ノートブックのセル実行だけではトピック作成・確認が完結しません。また、使われている Apache Kafka クライアントのバージョンが古いため、そのままでは動作しません。
本記事で作成するサンプル実装の概要
本記事では、ベクトルストアに S3 Vectors を使用することで検証コストを抑える構成としました。そのほかに、以下の変更を行いました。
- Apache Kafka バージョンを 2.8.0 から AWS 推奨の 3.9 に更新
- AWS 推奨バージョンへの追従であり、MSK Topic Management API(CreateTopic 等)の利用に必要な 3.6 以上の要件を満たす。なお、MSK Topic Management API は Provisioned クラスター限定で、MSK Serverless では利用不可
- Kafka トピックの作成を Java + Apache Kafka クライアントから MSK CreateTopic API(boto3)に変更
- 手動コマンド実行が不要になり、ノートブックのセル実行だけでトピック作成・確認が完結
- Lambda 予約同時実行数の変更: 50 → 1 に削減
- 1 トピック・1 パーティションのサンプル構成では同時実行 1 で十分であり、不要なコスト発生を防止。本番環境ではパーティション数や負荷に応じて調整が必要
- Lambda boto3 レイヤーの導入
- 元記事では Lambda コード内で pip install boto3 を実行してコールドスタートのたびにパッケージをインストールしている。これを Lambda Layer に置き換え、Lambda 関数のコールドスタート時間を短縮
- MSK CloudWatch Logs の追加
- MSK ブローカーログを CloudWatch Logs に出力する設定を追加し、トラブルシューティングを容易に
- IAM ポリシーの拡充
- SageMaker 実行ロールに MSK Topic Management API 用のパーミッション(kafka:CreateTopic、kafka:ListTopics、kafka:DeleteTopic 等)を追加
- CloudFormation テンプレートにパラメータを追加
- KnowledgeBaseName、DataSourceName、Boto3LayerArn を追加し、スタック作成時にカスタマイズ可能に
構成図
本記事の全体構成を以下に示します。元記事の手順に従い VPC を作成し、その中に MSK と Lambda を配置しています。この構成を試す際は、サンドボックス用のアカウントを使用することを推奨します。
この構成では、Amazon MSK と NAT Gateway で時間単位の費用が生じます。検証後は、後述の後片付けを行ってリソースを削除してください。
主要コンポーネントの役割は以下のとおりです。
- Amazon MSK
- kafka.t3.small × 2 ブローカー(2 AZ)、Kafka 3.9.x
- ストリーミングデータの受信基盤
- AWS Lambda
- Python 3.13
- MSK Event Source Mapping 経由で自動起動
- Kafka メッセージをデコードし、Bedrock Knowledge Base にインジェスト
- Amazon Bedrock Knowledge Base
- カスタムデータソース + Titan Text Embeddings V2 でテキストをベクトル化
- Amazon S3 Vectors
- ベクトルデータの格納先
- 従量課金(最低利用料金なし)で検証コストが低額
- SageMaker Studio
- VpcOnly モードでプライベートサブネットに配置
- ノートブックから MSK への直接接続とセットアップを担当
環境構築
本記事で使用するコードを GitHub に公開しています。
以降、すべての AWS リソースを us-east-1 リージョンで作成することを前提とします。他のリージョンを使用する場合は、記事内のリージョン指定を使用するリージョン名に読み替えてください。
Knowledge Bases の作成
ナレッジベースの新規作成手順に沿ってナレッジベースを作成します。ベクトルストアを使うため、Knowledge Base with vector store を選択します。
ナレッジベース名を BedrockStreamIngestKnowledgeBase とします。任意の名前でも構いませんが、その場合は後述の CloudFormation テンプレートを実行する際に、KnowledgeBaseName パラメータを指定する必要があります。
データソースタイプは、Custom を選択します。
データソース名を BedrockStreamIngestKBCustomDS とします。ここも、任意の名前を使う場合は後述の CloudFormation テンプレートを実行する際に、DataSourceName パラメータを指定する必要があります。
埋め込みモデルは、Titan Text Embeddings V2 を選択します。
ベクトルストアの作成は、Quick Create a new vector store、ベクトルストアタイプは Amazon S3 Vectors を選択します。
このようにしてナレッジベースとデータソースを作成します。
boto3 Lambda Layer の作成
以下のコマンドでリポジトリをクローンします。
git clone --branch v20260217 https://github.com/revsystem/sample-stream-ingest-amazon-bedrock-knowledge-base.git
必要に応じて、プロファイルとリージョンを指定し、スクリプトを実行します。
cd sample-stream-ingest-amazon-bedrock-knowledge-base/scripts
./build-boto3-layer.sh --profile {YOUR_PROFILE} --region {YOUR_REGION}
実行すると以下のようなログが表示されます。Boto3LayerArn は CloudFormation テンプレートを実行する際に使用します。
==> Using AWS profile: {YOUR_PROFILE}
==> Using AWS region: {YOUR_REGION}
==> Creating build directory: {YOUR_DIRECTORY}/sample-stream-ingest-amazon-bedrock-knowledge-base/build/layer
==> Installing boto3>=1.42.46 into python/
==> Creating zip (root must contain python/)
==> Publishing layer: aws lambda publish-layer-version --layer-name boto3-layer --zip-file fileb://boto3-layer.zip --compatible-runtimes python3.13
Boto3LayerArn for CloudFormation parameter:
arn:aws:lambda:{YOUR_REGION}:{YOUR_ACCOUNT_ID}:layer:boto3-layer:1
Example create-stack:
ParameterKey=Boto3LayerArn,ParameterValue=arn:aws:lambda:{YOUR_REGION}:{YOUR_ACCOUNT_ID}:layer:boto3-layer:1
CloudFormation テンプレートを使用してスタックを作成
さきほどクローンしたリポジトリの templates ディレクトリに移動します。ここにある bedrock-kb-stream-ingest.yml を使ってスタックを作成します。
cd sample-stream-ingest-amazon-bedrock-knowledge-base/templates
元記事を踏襲し、CloudFormation テンプレートを使用してスタックを作成します。Create stack をクリックします。
bedrock-kb-stream-ingest.yml を使ってスタックを作成します。Prepare template では、Choose an existing template を選択します。Specify template は、Upload a template file を選択し、bedrock-kb-stream-ingest.yml をアップロードします。
先ほど作成した Boto3LayerArn、ナレッジベースとデータソースの名前を設定します。スタック名は任意ですが、ここでは BedrockStreamIngest とします。
その他の項目はデフォルトのままで問題ありません。Create stack をクリックします。リソースの作成完了まで、数十分ほどかかります。
CloudFormation テンプレートを使用してスタックを作成(AWS CLI)
AWS CLI を使用してスタックを作成する場合は、以下のコマンドを実行します。CloudFormation の画面と同様に、スタック名、Boto3LayerArn、ナレッジベースとデータソースの名前を設定します。
aws cloudformation deploy \
--template-file templates/bedrock-kb-stream-ingest.yml \
--stack-name BedrockStreamIngest \
--parameter-overrides \
KnowledgeBaseName=BedrockStreamIngestKnowledgeBase \
DataSourceName=BedrockStreamIngestKBCustomDS \
Boto3LayerArn=arn:aws:lambda:{YOUR_REGION}:{YOUR_ACCOUNT_ID}:layer:boto3-layer:1 \
--capabilities CAPABILITY_IAM \
--profile {YOUR_PROFILE} \
--region {YOUR_REGION}
SageMaker Studio から Jupyter Notebook を起動
SageMaker Studio から Profile を選択します。https://us-east-1.console.aws.amazon.com/sagemaker/home?region=us-east-1#/studio-landing にアクセスすると、以下のような画面が表示されます。
デフォルトで表示されているプロファイルBedrockStreamIngestSageMakerUserProfileを選択します。
JupyterLab をクリックし、メニューに進みます。
Quick start をクリックし、スペースを起動します。
ステータスが Running になったら、Open をクリックして、Notebook を起動します。
notebooks ディレクトリにあるファイルをアップロードします。アップロードは、下記スクリーンショットの Upload Files ボタンをクリックします。
cd sample-stream-ingest-amazon-bedrock-knowledge-base/notebooks
Notebook を実行
1.Setup.ipynb と 2.StreamIngest.ipynb を順に実行していきます。
1.Setup.ipynb
1.Setup.ipynb では Kafka トピックの作成や、Lambda 関数に MSK をイベントソースとして設定します。
1.Setup.ipynb をダブルクリックして開きます。初回実行時は以下のダイアログが表示されます。Python 3(ipykernel) を選択します。
上から順にセルを実行していきます。2番目のセルを実行すると以下のようにエラーメッセージが表示されます。これは、SageMaker Studio の既存パッケージと一部競合が発生しているためです。コメントに書いているとおり、"Successfully installed boto3-... botocore-..." というメッセージが表示されていれば正常です。
# Set the stack name and Kafka topic name のセルには、スタック名と Kafka トピック名を設定します。スタック名は、CloudFormation テンプレートを実行する際に設定したスタック名を入力します。Kafka トピック名は、任意の名前を入力します。
# Create Kafka topic using MSK CreateTopic API のセルを実行すると、Amazon MSK に Kafka トピックを作成します。トピックの作成には数分かかるため、セルの実行結果には以下のように CREATING と表示されます。
Topic ARN: arn:aws:kafka:{YOUR_REGION}:{YOUR_ACCOUNT_ID}:topic/BedrockStreamIngestMSKCluster/4bec31dd-0396-4f5c-93d7-524abbc52d44-22/streamtopic
Topic Name: streamtopic
Status: CREATING
https://us-east-1.console.aws.amazon.com/msk/home?region=us-east-1#/clusters にアクセスし、クラスターの一覧から BedrockStreamIngestMSKCluster を選択します。その後、Topics タブをクリックし、streamtopic というトピックが表示されていることを確認します。
この後は、最後のセルまで順に実行していきます。
2.StreamIngest.ipynb
2.StreamIngest.ipynb では、Kafka トピックにデータを投入します。
2.StreamIngest.ipynb をダブルクリックして開きます。初回実行時は以下のダイアログが表示されます。Python 3(ipykernel) を選択します。
上から順にセルを実行していきます。
最後のセルを実行すると、以下のようなログが表示されます。これで、TestData.csv のデータが Kafka トピックに投入されました。
データの確認
Bedrock Knowledge Base に戻りデータソース BedrockStreamIngestKBCustomDS の Documents をみると、以下のように投入したデータがインデックスに登録されていることを確認できます。S3 をデータソースとしてバッチ同期する従来方式のような同期処理は不要です。
今回は投入したデータが 17 件と少ないためデータソースへの登録が一瞬で終わっていますが、データ件数が多い場合、順に登録が進む様子を確認できます。
このように、検索と回答生成も正常に動作していることを確認できます。
注意点
今回は使用したサンプルデータの件数が少ないため、Bedrock の API 呼び出し制限をあまり考慮していません。大量のデータを投入する場合は、API レート制限を考慮して実装する必要があります。
後片付け
Notebook の 3.Cleanup.ipynb を実行し、Lambda のイベントソース設定を削除します。次に、JupyterLab のスペースを停止し、スペースを削除します。スペースの削除を忘れると、スタックの削除時に SageMaker User Profile の削除に失敗します。その場合は、スペースを削除後にスタックの削除を再実行します。
CloudFormation のスタックを削除します。削除には数十分かかります。
Bedrock Knowledge BaseとS3 Vectorsは手動で削除します。
まとめ
本記事では、Amazon Bedrock のカスタムデータソース(カスタムコネクタ)と Amazon MSK を活用し、ストリーミングデータをリアルタイムで Bedrock Knowledge Base に取り込む最新手法について解説しました。従来のバッチ処理(S3 同期)に比べ、即時性の高い検索・回答生成が可能となる点が大きな特徴です。また、ベクトルストアに S3 Vectors を利用することで、検証コストと運用負担を大幅に削減しています。
この構成の主な特徴・利点は以下のとおりです。
- ストリーミングデータを数秒以内にナレッジベースへリアルタイム取り込み
- データ同期の待ち時間がなく、即座に検索・回答生成が可能
- ベクトルストアに S3 Vectors を選択することで、最小限のコストで検証が可能
- Kafka トピック管理・データ投入をノートブック上のセル実行のみで完結できるシンプルな運用
- Lambda Layer による boto3 導入/高速コールドスタート、CloudWatch Logs による運用性向上
このように、最新の AWS サービスと推奨アーキテクチャを組み合わせることで、低コストかつ即応性の高い RAG システムを実現可能です。





















