はじめに
前回、前々回と Amazon Bedrock AgentCore について買いてきました。
④ Gateway と ⑤ Identity 編について今回も触りながら学んでいこうと思います。
④ Gateway
Gateway を使用すると、開発者は API、Lambda 関数、および既存のサービスを MCP 互換ツールに変換し、わずか数行のコードで Gateway エンドポイントを介してエージェントが利用できるようにすることができます。
特徴
- ターゲットとして OpenAPI、Smithy、AWS Lambda をサポート
- 今回の検証では AWS Lambda を用います
- OpenAPI、Smithy をターゲットとする場合の作成もドキュメントの Quick Start に記載されているので興味のある方はそちらを参考に
- Salesforce、Slack、Jira、Asana、Zendesk などの人気ツールとの Integrations も提供
Gateway の作成とターゲット(Lambda)のアタッチ
my_bedrock_agentcore_gateway.py
# GatewayClient のインポート
from bedrock_agentcore_starter_toolkit.operations.gateway.client import GatewayClient
import logging
# GatewayClient のインスタンスを作成
client = GatewayClient(region_name="us-west-2")
client.logger.setLevel(logging.DEBUG)
# Cognito オーソライザーの作成
cognito_response = client.create_oauth_authorizer_with_cognito("TestGateway")
# ゲートウェイの作成
gateway = client.create_mcp_gateway(authorizer_config=cognito_response["authorizer_config"])
# Lambda ターゲットの作成
lambda_target = client.create_mcp_gateway_target(gateway=gateway, target_type="lambda")
my_bedrock_agentcore_gateway.py
以下の通り、Gateway, Lambda, Cognito が作成されていました。
なお、Lambda 内のコード(ツール)はデフォルトで以下の通り、天気と時間を Mock で返すだけのものがデプロイされていました。(今回はこのまま使います。)
lambda_function.py
import json
def lambda_handler(event, context):
# Extract tool name from context
tool_name = context.client_context.custom.get('bedrockAgentCoreToolName', 'unknown')
if 'get_weather' in tool_name:
return {
'statusCode': 200,
'body': json.dumps({
'location': event.get('location', 'Unknown'),
'temperature': '72°F',
'conditions': 'Sunny'
})
}
elif 'get_time' in tool_name:
return {
'statusCode': 200,
'body': json.dumps({
'timezone': event.get('timezone', 'UTC'),
'time': '2:30 PM'
})
}
else:
return {
'statusCode': 200,
'body': json.dumps({'message': 'Unknown tool'})
}
Agent の作成
Strands MCP Client を使って呼び出してみます。
ざっくりと、
- 認証
Cognito認証 → access_token 取得 → 初期化完了 - ツール操作
HTTPリクエスト(Bearer Token) → Gateway → Lambda → 結果返却 - Agent実行
プロンプト → LLM判断 → ツール実行 → 結果統合 → 回答
のようなフローとなっており、
例として
- 認証: Cognito → JWT Token
- 天気取得: GET weather(Tokyo) → 72°F, Sunny
- Agent: "東京の天気は?" → ツール呼び出し → "晴れで22℃です"
のような想定です。
次の値をご自身の環境にあった値へ置き換えます。
# クライアント初期化
client = BedrockAgentCoreGatewayClient(
gateway_url="https://testgatewayXXXXXXX-XXXXXXXXXX.gateway.bedrock-agentcore.us-west-2.amazonaws.com/mcp",
client_id="XXXXXXXXXXXXXXXXXXXXXXXXXX",
client_secret="XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX",
cognito_domain="agentcore-XXXXXXXX.auth.us-west-2.amazoncognito.com"
)
gateway_url の確認
以下の通り、Amazon Bedrock AgentCore の画面から確認します。
client_id, client_secret の確認
以下の通り、Amazon Cognito の画面から確認します。
cognito_domain の確認
以下の通り、Amazon Cognito の画面から確認します。
my_bedrock_agentcore_gateway_client.py
import json
import logging
import requests
import base64
from typing import Dict, Any
from strands import Agent
from strands.models import BedrockModel
from strands.tools.mcp.mcp_client import MCPClient
from mcp.client.streamable_http import streamablehttp_client
class BedrockAgentCoreGatewayClient:
"""
AWS Bedrock AgentCore Gateway用のMCPクライアント
"""
def __init__(self, gateway_url: str, client_id: str, client_secret: str, cognito_domain: str):
self.gateway_url = gateway_url
self.client_id = client_id
self.client_secret = client_secret
self.cognito_domain = cognito_domain
self.access_token = None
# ログ設定
self.logger = logging.getLogger(__name__)
# 初期トークン取得
self._refresh_token()
def _refresh_token(self):
"""Cognitoからアクセストークンを取得"""
credentials = base64.b64encode(f"{self.client_id}:{self.client_secret}".encode()).decode()
response = requests.post(
f"https://{self.cognito_domain}/oauth2/token",
headers={
"Authorization": f"Basic {credentials}",
"Content-Type": "application/x-www-form-urlencoded"
},
data={"grant_type": "client_credentials"}
)
if response.status_code != 200:
raise Exception(f"Failed to get access token: {response.text}")
self.access_token = response.json()["access_token"]
self.logger.info("Access token obtained successfully")
def _make_request(self, payload: Dict[str, Any]) -> Dict[str, Any]:
"""MCPリクエストを送信"""
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {self.access_token}"
}
response = requests.post(self.gateway_url, headers=headers, json=payload)
if response.status_code == 401:
# トークンが無効な場合は更新して再試行
self._refresh_token()
headers["Authorization"] = f"Bearer {self.access_token}"
response = requests.post(self.gateway_url, headers=headers, json=payload)
response.raise_for_status()
return response.json()
def list_tools(self) -> Dict[str, Any]:
"""利用可能なツールのリストを取得"""
payload = {
"jsonrpc": "2.0",
"id": "list-tools-request",
"method": "tools/list"
}
return self._make_request(payload)
def call_tool(self, tool_name: str, arguments: Dict[str, Any]) -> Dict[str, Any]:
"""ツールを実行"""
payload = {
"jsonrpc": "2.0",
"id": "call-tool-request",
"method": "tools/call",
"params": {
"name": tool_name,
"arguments": arguments
}
}
return self._make_request(payload)
def search_tools(self, query: str) -> Dict[str, Any]:
"""セマンティック検索を使用してツールを検索"""
return self.call_tool("x_amz_bedrock_agentcore_search", {"query": query})
def get_weather(self, location: str = "Unknown") -> Dict[str, Any]:
"""天気情報を取得"""
return self.call_tool("TestGatewayTargetb4830d2c___get_weather", {"location": location})
def get_time(self, timezone: str = "UTC") -> Dict[str, Any]:
"""時刻情報を取得"""
return self.call_tool("TestGatewayTargetb4830d2c___get_time", {"timezone": timezone})
def run_agent_with_strands(self, bedrock_model, prompt: str):
"""Strands Agentを使用してプロンプトを実行"""
mcp_client = MCPClient(lambda: streamablehttp_client(
self.gateway_url,
headers={"Authorization": f"Bearer {self.access_token}"}
))
with mcp_client:
tools = mcp_client.list_tools_sync()
self.logger.info(f"Found tools: {[tool.tool_name for tool in tools]}")
agent = Agent(model=bedrock_model, tools=tools)
return agent(prompt)
# 使用例
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
# クライアント初期化
client = BedrockAgentCoreGatewayClient(
gateway_url="https://testgatewayXXXXXXX-XXXXXXXXXX.gateway.bedrock-agentcore.us-west-2.amazonaws.com/mcp",
client_id="XXXXXXXXXXXXXXXXXXXXXXXXXX",
client_secret="XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX",
cognito_domain="agentcore-XXXXXXXX.auth.us-west-2.amazoncognito.com"
)
try:
# ツール一覧取得
tools = client.list_tools()
print("Available Tools:", json.dumps(tools, indent=2, ensure_ascii=False))
# 天気情報取得
weather_tokyo = client.get_weather("Tokyo")
print("Weather in Tokyo:", json.dumps(weather_tokyo, indent=2, ensure_ascii=False))
# 時刻情報取得
time_jst = client.get_time("JST")
print("Time (JST):", json.dumps(time_jst, indent=2, ensure_ascii=False))
# セマンティック検索
search_results = client.search_tools("weather information")
print("Search Results:", json.dumps(search_results, indent=2, ensure_ascii=False))
# Strands Agent実行
bedrock_model = BedrockModel(
region="us-west-2"
)
agent_result = client.run_agent_with_strands(
bedrock_model,
"東京の天気と時刻を教えてください。"
)
# print("Agent Result:", agent_result)
except Exception as e:
print(f"Error: {str(e)}")
my_bedrock_agentcore_gateway_client.py
以下の通り、ツールが呼び出され回答されていることが確認できました。
⑤ Identity
Identity は、AI エージェント専用のアイデンティティ・認証管理サービスであり、エージェントが安全に AWS や外部サービスにアクセスできるよう、認証情報とアクセス権限を一元管理します。
特徴
インバウンド認証とアウトバウンド認証がある。
インバウンド認証(Inbound Auth) | 認証方式 | アウトバウンド認証(Outbound Auth) |
---|---|---|
「誰がエージェントにアクセスできるか」 | 制御 | 「エージェントが何にアクセスできるか」 |
エージェントが外部サービスや AWS リソースにアクセスする際の認証 | 目的 | ユーザーや外部システムがエージェントを呼び出す際の認証 |
IAM SigV4 認証 JWT Bearer Token 認証(OAuth 2.0) |
認証方式 | IAM ロール OAuth 2.0(Google Drive、Slack など) API キー(外部 API 向け) |
ユーザーが Cognito でログインして、エージェント API を呼び出す | 例 | エージェントがユーザーの代理で Google Driveのファイルを取得 |
前提条件: Google クライアント ID とクライアントシークレットを取得する
次の方のページを参考に、Google クライアント ID と クライアントシークレット を取得します。
具体的には、
- Google Drive API を有効にする
- OAuth 同意画面を作成する
- エージェント用の新しい Web アプリケーション (例: 「My Agent 1」) を作成する
- 新しいウェブアプリケーション用の OAuth 2.0 認証情報を作成し、生成された Google クライアント ID と クライアントシークレット をメモする
- 新しいウェブアプリケーションの承認済みのリダイレクト URI に
https://bedrock-agentcore.us-west-1.amazonaws.com/identities/oauth2/callback
を追加する(自身のリージョンに変更して下さい)
OAuth 2.0 認証プロバイダの設定
AWS CLI コマンドを使用して、先ほど取得した Google クライアント ID と クライアントシークレット を使用して、新しい OAuth 2.0 認証情報プロバイダーを作成します。
aws bedrock-agentcore-control create-oauth2-credential-provider \
--region us-west-2 \
--name "google-provider" \
--credential-provider-vendor "GoogleOauth2" \
--oauth2-provider-config-input '{
"googleOauth2ProviderConfig": {
"clientId": "<Google クライアント ID>",
"clientSecret": "<Google クライアント シークレット>"
}
}'
AWS CLI コマンドが成功し、AWS コンソールから確認すると、Amazon Bedrock AgentCore > アイデンティティに作成されていることがわかります。
OAuth2 アクセストークンを使用して外部リソースを呼び出す
Amazon Bedrock AgentCore Identity で認証し、Google Drive にアクセスし、Google Drive のファイル情報を取得します。
次のドキュメントの Getting started を参考に試そうとしましたがうまく動かなかったので、ここだけは AI エージェントに力を借りて生成させました。
my_bedrock_agentcore_identity.py
import asyncio
import logging
from bedrock_agentcore.identity.auth import requires_access_token
from google.oauth2.credentials import Credentials
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
SCOPES = ["https://www.googleapis.com/auth/drive.metadata.readonly"]
def main(access_token):
"""Drive v3 APIの基本的な使用方法を示します。
ユーザーがアクセス可能な最初の10ファイルの名前とIDを出力します。
Args:
access_token (str): 有効なGoogle OAuth2アクセストークン
Raises:
ValueError: アクセストークンが空またはNoneの場合
HttpError: Google Drive API呼び出しが失敗した場合
"""
if not access_token or not access_token.strip():
raise ValueError("Access token cannot be empty or None")
creds = Credentials(token=access_token, scopes=SCOPES)
try:
service = build("drive", "v3", credentials=creds)
results = service.files().list(pageSize=10, fields="nextPageToken, files(id, name)").execute()
items = results.get("files", [])
if not items:
print("ファイルが見つかりません。")
return
print("ファイル:")
for item in items:
print(f"{item['name']} ({item['id']})")
except HttpError as error:
logging.error(f"Google Drive APIエラー: {error}")
print(f"エラーが発生しました: {error}")
raise
# このアノテーションは、エージェント開発者が外部アプリケーションからアクセストークンを取得するのに役立ちます
@requires_access_token(
provider_name="google-provider",
scopes=["https://www.googleapis.com/auth/drive.metadata.readonly"], # Google OAuth2スコープ
auth_flow="USER_FEDERATION", # 3LOフロー
on_auth_url=lambda x: print("この認証URLをブラウザにコピーして貼り付けてください", x), # 認証URLをコンソールに出力
force_authentication=True,
)
async def read_from_google_drive(*, access_token: str):
logging.info("アクセストークンを受信しました。API呼び出しを実行します")
# API呼び出しを実行...
main(access_token)
if __name__ == "__main__":
# ログ設定
logging.basicConfig(level=logging.INFO)
# テスト目的で、タイムアウトと適切な処理を追加
import platform
import sys
def setup_timeout():
"""プラットフォームに基づくタイムアウト処理の設定"""
if platform.system() != "Windows":
import signal
def timeout_handler(signum, frame):
_ = signum, frame # 未使用パラメータ警告を抑制
print("\nタイムアウトに達しました。正常に終了します。")
sys.exit(0)
signal.signal(signal.SIGALRM, timeout_handler)
signal.alarm(60) # OAuthフローの60秒タイムアウト
setup_timeout()
try:
asyncio.run(read_from_google_drive(access_token=""))
except KeyboardInterrupt:
print("\nユーザーによって中断されました")
except ValueError as e:
print(f"バリデーションエラー: {e}")
except Exception as e:
logging.error(f"予期しないエラー: {e}")
print(f"エラーが発生しました: {e}")
my_bedrock_agentcore_identity
実行すると、生成される認証 URL をブラウザで開き認証情報を入力後成功すると、「Authorization Successful!」と表示後、API が呼び出され、Google Drive の中身が確認できました。