はじめに
Amazon Bedrock Agents で、外部の API を実行する機能があります。インターネット上の SaaS や、VPC 内の REST API は実行できることは明示的にわかります。一方、オンプレミスに API がある場合、これを Agents が実行できるか自信がなかったため検証してみました。
結果、オンプレミスの API も実行ができました。 以下の 2 つの条件を満たすと疎通ができました。
- Agents の Action Group に紐づける Lambda 関数を VPC の中にデプロイする
- VPC は Site-to-Site VPN や Direct Connect を利用して、オンプレミスと疎通をしている
検証に行った手順を以下に紹介します。
オンプレミスに Web アプリケーションを準備
まずは事前準備として、オンプレミスに Web アプリケーションを準備します。内容はシンプルで、Request で受け取ったタイムゾーンの現在時刻を返却するものです。Bedrock Agents で、ユーザーから「日本の現在時刻をおしえて?」といった質問に回答できるようにするための API です。
from flask import Flask, jsonify, request
from datetime import datetime
import pytz
app = Flask(__name__)
TIMEZONE_MAP = {
'jst': 'Asia/Tokyo',
'est': 'America/New_York',
'gmt': 'GMT'
}
@app.route('/')
def get_time():
# Get timezone from query parameter, default to JST
tz_code = request.args.get('tz', 'jst').lower()
if tz_code not in TIMEZONE_MAP:
return jsonify({
'error': f'Invalid timezone code: {tz_code}',
'available_codes': list(TIMEZONE_MAP.keys())
}), 400
try:
timezone = pytz.timezone(TIMEZONE_MAP[tz_code])
current_time = datetime.now(timezone)
return jsonify({
'timezone': tz_code.upper(),
'time': current_time.strftime('%Y-%m-%d %H:%M:%S %Z')
})
except pytz.exceptions.UnknownTimeZoneError:
return jsonify({
'error': 'Internal timezone error',
'available_codes': list(TIMEZONE_MAP.keys())
}), 500
if __name__ == '__main__':
app.run(host='0.0.0.0', port=5000)
systemctl で管理し、gunicorn 経由で 8000 ポートでアクセスできるようにします。
[Unit]
Description=JST Time API Service
After=network.target
[Service]
User=ec2-user
WorkingDirectory=/home/ec2-user/time_api
ExecStart=/home/ec2-user/.local/bin/gunicorn --bind 0.0.0.0:8000 app:app
Restart=always
[Install]
WantedBy=multi-user.target
無事にアクセスが出来た。
$ curl http://192.168.0.5:8000
{"time":"2025-02-23 12:54:56 JST","timezone":"JST"}
$ curl http://192.168.0.5:8000?tz=est
{"time":"2025-02-22 22:55:19 EST","timezone":"EST"}
Lambda 関数の作成
次に、Bedrock Agents の API 実行で利用するための Lambda 関数を作成します。Python コードから、上記のオンプレミスの Web アプリケーションに curl でアクセスします。192.168.0.5 は、Private IP となっていて、オンプレミスにプライベートネットワークで接続しています。
import json
import requests
import logging
from typing import Dict, Any
# ロガーの設定
logger = logging.getLogger()
logger.setLevel(logging.INFO)
def parse_timezone(event: Dict[str, Any]) -> str:
"""Extract timezone from Bedrock Agent event"""
try:
# パラメータリストからタイムゾーンを取得
for param in event.get('parameters', []):
if param.get('name') == 'timezone':
return param.get('value', 'jst').lower()
return 'jst' # デフォルトはJST
except Exception as e:
logger.error(f"Error parsing timezone: {str(e)}")
return 'jst' # デフォルトはJST
def lambda_handler(event: Dict[str, Any], context: Any) -> Dict[str, Any]:
"""Lambda function that calls a timezone API and returns the response
Parameters
----------
event: dict
Bedrock Agent Function event format
context: object
Lambda Context runtime methods and attributes
Returns
------
dict
Response format for Bedrock Agent
"""
# 入力イベントをログに出力
logger.info("Input Event:")
logger.info(json.dumps(event, indent=2, ensure_ascii=False))
# イベントから必要な情報を取得
function = event['function']
session_attributes = event['sessionAttributes']
prompt_session_attributes = event['promptSessionAttributes']
try:
# タイムゾーンを取得
timezone = parse_timezone(event)
logger.info(f"Requested Timezone: {timezone}")
# REST APIを呼び出し(タイムゾーンパラメータ付き)
response = requests.get(f"http://192.168.0.5:8000/?tz={timezone}")
if response.status_code == 200:
api_response = response.json()
logger.info("API Response:")
logger.info(json.dumps(api_response, indent=2, ensure_ascii=False))
# レスポンスボディの作成
body = {
'timezone': api_response['timezone'],
'current_time': api_response['time']
}
response_body = {
'TEXT': {
'body': json.dumps(body, ensure_ascii=False)
}
}
# アクションレスポンスの作成
action_response = {
'actionGroup': event['actionGroup'],
'function': function,
'functionResponse': {
'responseBody': response_body
}
}
# 最終的なレスポンスの作成
final_response = {
'messageVersion': '1.0',
'response': action_response,
'sessionAttributes': session_attributes,
'promptSessionAttributes': prompt_session_attributes
}
# 最終レスポンスをログに出力
logger.info("Final Response:")
logger.info(json.dumps(final_response, indent=2, ensure_ascii=False))
return final_response
else:
error_response = response.json()
logger.error("Error Response:")
logger.error(json.dumps(error_response, indent=2, ensure_ascii=False))
# エラーレスポンスの作成
body = {
'error': error_response.get('error', 'Unknown error'),
'available_timezones': error_response.get('available_codes', [])
}
response_body = {
'TEXT': {
'body': json.dumps(body, ensure_ascii=False)
}
}
action_response = {
'actionGroup': event['actionGroup'],
'function': function,
'functionResponse': {
'responseBody': response_body
}
}
final_response = {
'messageVersion': '1.0',
'response': action_response,
'sessionAttributes': session_attributes,
'promptSessionAttributes': prompt_session_attributes
}
# エラー時の最終レスポンスをログに出力
logger.error("Final Error Response:")
logger.error(json.dumps(final_response, indent=2, ensure_ascii=False))
return final_response
except requests.RequestException as e:
error_msg = f"Error calling API: {str(e)}"
logger.error(error_msg)
# エラーレスポンスの作成
body = {
'error': 'Error calling timezone service',
'details': str(e)
}
response_body = {
'TEXT': {
'body': json.dumps(body, ensure_ascii=False)
}
}
action_response = {
'actionGroup': event['actionGroup'],
'function': function,
'functionResponse': {
'responseBody': response_body
}
}
final_response = {
'messageVersion': '1.0',
'response': action_response,
'sessionAttributes': session_attributes,
'promptSessionAttributes': prompt_session_attributes
}
# 例外時の最終レスポンスをログに出力
logger.error("Final Exception Response:")
logger.error(json.dumps(final_response, indent=2, ensure_ascii=False))
return final_response
SAM CLI から Lambda を実行して、無事に正常終了しています。
> sam remote invoke HelloWorldFunction --event '{"messageVersion":"1.0","function":"get-current-date-basedon-userinputed-timezone","parameters":[{"name":"timezone","type":"string","value":"gmt"}],"inp
utText":"GMTの時刻を教えてください","sessionId":"372952752580877","agent":{"name":"access-to-onpremis","version":"DRAFT","id":"PAMAPAQH3X","alias":"TSTALIASID"},"actionGroup":"GetCurrentDate","session
Attributes":{"lastTimezone":"jst"},"promptSessionAttributes":{"previousRequest":"日本の時刻"}}' --region ap-northeast-1
省略
{"messageVersion": "1.0", "response": {"actionGroup": "GetCurrentDate", "function": "get-current-date-basedon-userinputed-timezone", "functionResponse": {"responseBody": {"TEXT": {"body": "{\"timezone\": \"GMT\", \"current_time\": \"2025-02-23 08:46:51 GMT\"}"}}}}, "sessionAttributes": {"lastTimezone": "jst"}, "promptSessionAttributes": {"previousRequest": "\u65e5\u672c\u306e\u6642\u523b"}}⏎
Bedrock Agents の作成
つぎに、Bedrock Agents 側の設定です。Create Agent で作成をしていきます。
適当に Agent の名前を入れます。
モデルや Agent の説明を入れます。
Action Group の追加をします。
以下のパラメーターを指定します
- 上記の手順で作成した Lambda 関数を紐づける
- Parameter に timezone 指定。この timezone には、JST or EST or GMT しか与えられない制限を LLM に認識してもらうために、Description に制限事項を記載している
Lambda の権限設定
このドキュメントにあるように Resource Based Policy で、Agent が Lambda 関数を呼び出せるように設定します。
Add permissions を入れます。
次のように設定します
- Source ARN : Bedrock Agent の ARN を指定
動作確認 in AWS マネジメントコンソール
AWS マネジメントコンソールで、チャット形式で Agent を実行できます。
「JST の時刻をおしえてください。」と投げてみると、想定通り、Agent が Lambda 関数を実行して、オンプレミスの API を実行し、現在の日本時刻を取得できていることがわかります。
動作確認 in SDK
プログラム経由からも確認できます。invoke_agent
で Agent を呼び出します。
import boto3
import logging
from botocore.exceptions import ClientError
# ロガーの設定
logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO)
class BedrockAgentClient:
def __init__(self, region_name="ap-northeast-1"):
self.agents_runtime_client = boto3.client(
service_name="bedrock-agent-runtime",
region_name=region_name
)
def invoke_agent(self, agent_id, agent_alias_id, session_id, prompt):
try:
response = self.agents_runtime_client.invoke_agent(
agentId=agent_id,
agentAliasId=agent_alias_id,
sessionId=session_id,
inputText=prompt,
)
completion = ""
# ストリーミングレスポンスを処理
for event in response["completion"]:
if "chunk" in event and "bytes" in event["chunk"]:
chunk_bytes = event["chunk"]["bytes"]
if isinstance(chunk_bytes, (str, bytes)):
if isinstance(chunk_bytes, bytes):
completion += chunk_bytes.decode("utf-8")
else:
completion += chunk_bytes
except ClientError as e:
logger.error(f"Couldn't invoke agent. Error: {str(e)}")
raise
except Exception as e:
logger.error(f"Unexpected error while invoking agent: {str(e)}")
raise
return completion
def main():
# クライアントの初期化
agent_client = BedrockAgentClient()
agent_id = "PAMAPAQH3X"
agent_alias_id = "JQ19PTI4CE"
session_id = "test-session-001"
prompt = "日本の時刻をおしえてください"
try:
# エージェントの呼び出し
response = agent_client.invoke_agent(
agent_id=agent_id,
agent_alias_id=agent_alias_id,
session_id=session_id,
prompt=prompt
)
print("応答:", response)
except Exception as e:
print(f"エラーが発生しました: {str(e)}")
if __name__ == "__main__":
main()
Tips: Lambda 関数が受け取る Event
Lambda 関数が input として受け取る Event はこんな感じです。parameters
に、オンプレミスの API を実行するべきパラメーターが含まれており、これを元に Lambda はオンプレミスの API を実行するようにコードを実装しています。
{
"messageVersion": "1.0",
"function": "get-current-date-basedon-userinputed-timezone",
"parameters": [
{
"name": "timezone",
"type": "string",
"value": "jst"
}
],
"sessionId": "372952752580343",
"agent": {
"name": "access-to-onpremis",
"version": "DRAFT",
"id": "PAMAPAQH3X",
"alias": "TSTALIASID"
},
"actionGroup": "GetCurrentDate",
"sessionAttributes": {},
"promptSessionAttributes": {},
"inputText": "JSTの時刻をおしえてください"
}
Tips : Agent が Action Group を実行する際に、確認を行う
セキュリティ面での Tips です。Agent が外部の API を実行するときに、実行の直前に人間による確認をしたいときがあります。例えば、機密情報を外部に出すことはないか、といった確認です。Bedrock Agents には、ユーザーに確認を促すための API が提供されています。
Action Group を選択します。
Enable of confirmation of action group function を Enabled に変更します。
すると、AWS マネジメントコンソール上で実行すると、以下の画像のようにユーザーへアクションを実行しても良いか確認がされます。get-current-date-basedon-userinputed-timezone
という Function を、timezone : jst
で実行してもいいか?というのが見えます。もしここで、意図しない関数や、パラメーターが LLM のハルシネーションによって設定されていると、Deny で防ぐことができます。
Confirm を押すと、実行されました
Tips : SDK で Action Group の実行を確認する
上記の Action Group の実行の前に確認することを SDK 上でもコントロールできます。
Python コードでユーザーの承認を行う場合はこんな感じで実行できます。表示の文章などは、好きにアプリケーション側で実装できるので、より分かりやすい日本語にすることもできます。
> python3 app.py
プロンプト: 日本時刻をおしえてください
=== アクション承認が必要です ===
アクション詳細:
- グループ: GetCurrentDate
- 関数: get-current-date-basedon-userinputed-timezone
- パラメータ:
* timezone: jst (string)
このアクションを承認しますか? (y/n):
ソースコードの全文はこちらです。ちょっと長いのですが、_handle_action_approval
関数あたりが重要な処理です。
import boto3
import logging
import json
from botocore.exceptions import ClientError
# ロガーの設定
logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO)
class BedrockAgentClient:
def __init__(self, region_name="ap-northeast-1"):
self.agents_runtime_client = boto3.client(
service_name="bedrock-agent-runtime",
region_name=region_name
)
def _process_chunk(self, chunk_bytes):
"""チャンクデータを処理し、テキストを返す"""
if isinstance(chunk_bytes, bytes):
return chunk_bytes.decode("utf-8")
return chunk_bytes
def _handle_action_approval(self, func_input, auto_approve):
"""アクション承認の処理"""
action_group = func_input["actionGroup"]
function_name = func_input["function"]
parameters = func_input["parameters"]
print(f"\nアクション詳細:")
print(f"- グループ: {action_group}")
print(f"- 関数: {function_name}")
print("- パラメータ:")
for param in parameters:
print(f" * {param['name']}: {param['value']} ({param['type']})")
if auto_approve:
print("自動承認します...")
confirmation_state = "CONFIRM"
else:
user_input = input("\nこのアクションを承認しますか? (y/n): ")
confirmation_state = "CONFIRM" if user_input.lower() == 'y' else "DENY"
return action_group, function_name, confirmation_state
def _create_session_state(self, invocation_id, action_group, function_name, confirmation_state):
"""セッション状態を作成"""
session_state = {
"invocationId": invocation_id,
"returnControlInvocationResults": [
{
"functionResult": {
"actionGroup": action_group,
"function": function_name,
"responseBody": {
"TEXT": {
"body": json.dumps({
"confirmationState": confirmation_state
})
}
}
}
}
]
}
print("\nPrint session_state:")
print(json.dumps(session_state, indent=2, ensure_ascii=False))
return session_state
def _process_completion_events(self, events):
"""completionイベントの処理"""
completion = ""
for event in events:
if "chunk" in event and "bytes" in event["chunk"]:
completion += self._process_chunk(event["chunk"]["bytes"])
return completion
def _handle_return_control(self, event, agent_id, agent_alias_id, session_id, auto_approve):
"""returnControlイベントの処理"""
print("\n=== アクション承認が必要です ===")
control_data = event["returnControl"]
invocation_id = control_data["invocationId"]
for input_data in control_data.get("invocationInputs", []):
if "functionInvocationInput" in input_data:
action_group, function_name, confirmation_state = self._handle_action_approval(
input_data["functionInvocationInput"],
auto_approve
)
session_state = self._create_session_state(
invocation_id,
action_group,
function_name,
confirmation_state
)
new_response = self.agents_runtime_client.invoke_agent(
agentId=agent_id,
agentAliasId=agent_alias_id,
sessionId=session_id,
inputText="",
sessionState=session_state
)
if "completion" in new_response:
return self._process_completion_events(new_response["completion"])
return ""
def invoke_agent(self, agent_id, agent_alias_id, session_id, prompt, auto_approve=True):
"""
Sends a prompt for the agent to process and respond to.
:param agent_id: The unique identifier of the agent to use.
:param agent_alias_id: The alias of the agent to use.
:param session_id: The unique identifier of the session.
:param prompt: The prompt that you want Claude to complete.
:param auto_approve: 自動的にアクションを承認するかどうか
:return: Inference response from the model.
"""
try:
response = self.agents_runtime_client.invoke_agent(
agentId=agent_id,
agentAliasId=agent_alias_id,
sessionId=session_id,
inputText=prompt,
enableTrace=True
)
if "completion" not in response:
return ""
completion = ""
for event in response["completion"]:
if "returnControl" in event:
completion = self._handle_return_control(
event,
agent_id,
agent_alias_id,
session_id,
auto_approve
)
elif "chunk" in event and "bytes" in event["chunk"]:
completion += self._process_chunk(event["chunk"]["bytes"])
return completion
except ClientError as e:
logger.error(f"Couldn't invoke agent. Error: {str(e)}")
raise
except Exception as e:
logger.error(f"Unexpected error while invoking agent: {str(e)}")
raise
def main():
# クライアントの初期化
agent_client = BedrockAgentClient()
# テスト用のパラメータ
agent_id = "PAMAPAQH3X"
agent_alias_id = "QPKHPSPHNH"
session_id = "test-session-001"
prompt = "日本時刻をおしえてください"
try:
print(f"プロンプト: {prompt}")
# エージェントの呼び出し(auto_approve=Falseで手動承認モードに設定可能)
response = agent_client.invoke_agent(
agent_id=agent_id,
agent_alias_id=agent_alias_id,
session_id=session_id,
prompt=prompt,
auto_approve=False # Falseにすると手動承認モードになります
)
print("\n=== 最終レスポンス ===")
print(response)
except Exception as e:
print(f"エラーが発生しました: {str(e)}")
if __name__ == "__main__":
main()
検証を通じてわかったこと
- Agent 周りで意識した方がいい Service Quota
- 1 個の Agent あたり、11 個の Action Group を有効化で設定可能 (Enabled action groups per agent)
- 1 個の Action Group Function あたり、5 個の Parameter を設定可能 (Parameters per function)
参考 URL