0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Bedrock Agents でオンプレミスの API と連携する

Posted at

はじめに

Amazon Bedrock Agents で、外部の API を実行する機能があります。インターネット上の SaaS や、VPC 内の REST API は実行できることは明示的にわかります。一方、オンプレミスに API がある場合、これを Agents が実行できるか自信がなかったため検証してみました。

結果、オンプレミスの API も実行ができました。 以下の 2 つの条件を満たすと疎通ができました。

  • Agents の Action Group に紐づける Lambda 関数を VPC の中にデプロイする
  • VPC は Site-to-Site VPN や Direct Connect を利用して、オンプレミスと疎通をしている

検証に行った手順を以下に紹介します。

オンプレミスに Web アプリケーションを準備

まずは事前準備として、オンプレミスに Web アプリケーションを準備します。内容はシンプルで、Request で受け取ったタイムゾーンの現在時刻を返却するものです。Bedrock Agents で、ユーザーから「日本の現在時刻をおしえて?」といった質問に回答できるようにするための API です。

from flask import Flask, jsonify, request
from datetime import datetime
import pytz

app = Flask(__name__)

TIMEZONE_MAP = {
    'jst': 'Asia/Tokyo',
    'est': 'America/New_York',
    'gmt': 'GMT'
}

@app.route('/')
def get_time():
    # Get timezone from query parameter, default to JST
    tz_code = request.args.get('tz', 'jst').lower()
    
    if tz_code not in TIMEZONE_MAP:
        return jsonify({
            'error': f'Invalid timezone code: {tz_code}',
            'available_codes': list(TIMEZONE_MAP.keys())
        }), 400

    try:
        timezone = pytz.timezone(TIMEZONE_MAP[tz_code])
        current_time = datetime.now(timezone)
        
        return jsonify({
            'timezone': tz_code.upper(),
            'time': current_time.strftime('%Y-%m-%d %H:%M:%S %Z')
        })
    except pytz.exceptions.UnknownTimeZoneError:
        return jsonify({
            'error': 'Internal timezone error',
            'available_codes': list(TIMEZONE_MAP.keys())
        }), 500

if __name__ == '__main__':
    app.run(host='0.0.0.0', port=5000)

systemctl で管理し、gunicorn 経由で 8000 ポートでアクセスできるようにします。

[Unit]
Description=JST Time API Service
After=network.target

[Service]
User=ec2-user
WorkingDirectory=/home/ec2-user/time_api
ExecStart=/home/ec2-user/.local/bin/gunicorn --bind 0.0.0.0:8000 app:app
Restart=always

[Install]
WantedBy=multi-user.target

無事にアクセスが出来た。

$ curl http://192.168.0.5:8000
{"time":"2025-02-23 12:54:56 JST","timezone":"JST"}

$ curl http://192.168.0.5:8000?tz=est
{"time":"2025-02-22 22:55:19 EST","timezone":"EST"}

Lambda 関数の作成

次に、Bedrock Agents の API 実行で利用するための Lambda 関数を作成します。Python コードから、上記のオンプレミスの Web アプリケーションに curl でアクセスします。192.168.0.5 は、Private IP となっていて、オンプレミスにプライベートネットワークで接続しています。

import json
import requests
import logging
from typing import Dict, Any

# ロガーの設定
logger = logging.getLogger()
logger.setLevel(logging.INFO)

def parse_timezone(event: Dict[str, Any]) -> str:
    """Extract timezone from Bedrock Agent event"""
    try:
        # パラメータリストからタイムゾーンを取得
        for param in event.get('parameters', []):
            if param.get('name') == 'timezone':
                return param.get('value', 'jst').lower()
        return 'jst'  # デフォルトはJST
    except Exception as e:
        logger.error(f"Error parsing timezone: {str(e)}")
        return 'jst'  # デフォルトはJST

def lambda_handler(event: Dict[str, Any], context: Any) -> Dict[str, Any]:
    """Lambda function that calls a timezone API and returns the response

    Parameters
    ----------
    event: dict
        Bedrock Agent Function event format

    context: object
        Lambda Context runtime methods and attributes

    Returns
    ------
    dict
        Response format for Bedrock Agent
    """
    # 入力イベントをログに出力
    logger.info("Input Event:")
    logger.info(json.dumps(event, indent=2, ensure_ascii=False))
    
    # イベントから必要な情報を取得
    function = event['function']
    session_attributes = event['sessionAttributes']
    prompt_session_attributes = event['promptSessionAttributes']
    
    try:
        # タイムゾーンを取得
        timezone = parse_timezone(event)
        logger.info(f"Requested Timezone: {timezone}")
        
        # REST APIを呼び出し(タイムゾーンパラメータ付き)
        response = requests.get(f"http://192.168.0.5:8000/?tz={timezone}")
        
        if response.status_code == 200:
            api_response = response.json()
            logger.info("API Response:")
            logger.info(json.dumps(api_response, indent=2, ensure_ascii=False))

            # レスポンスボディの作成
            body = {
                'timezone': api_response['timezone'],
                'current_time': api_response['time']
            }

            response_body = {
                'TEXT': {
                    'body': json.dumps(body, ensure_ascii=False)
                }
            }

            # アクションレスポンスの作成
            action_response = {
                'actionGroup': event['actionGroup'],
                'function': function,
                'functionResponse': {
                    'responseBody': response_body
                }
            }

            # 最終的なレスポンスの作成
            final_response = {
                'messageVersion': '1.0',
                'response': action_response,
                'sessionAttributes': session_attributes,
                'promptSessionAttributes': prompt_session_attributes
            }

            # 最終レスポンスをログに出力
            logger.info("Final Response:")
            logger.info(json.dumps(final_response, indent=2, ensure_ascii=False))

            return final_response
        else:
            error_response = response.json()
            logger.error("Error Response:")
            logger.error(json.dumps(error_response, indent=2, ensure_ascii=False))

            # エラーレスポンスの作成
            body = {
                'error': error_response.get('error', 'Unknown error'),
                'available_timezones': error_response.get('available_codes', [])
            }

            response_body = {
                'TEXT': {
                    'body': json.dumps(body, ensure_ascii=False)
                }
            }

            action_response = {
                'actionGroup': event['actionGroup'],
                'function': function,
                'functionResponse': {
                    'responseBody': response_body
                }
            }

            final_response = {
                'messageVersion': '1.0',
                'response': action_response,
                'sessionAttributes': session_attributes,
                'promptSessionAttributes': prompt_session_attributes
            }

            # エラー時の最終レスポンスをログに出力
            logger.error("Final Error Response:")
            logger.error(json.dumps(final_response, indent=2, ensure_ascii=False))

            return final_response
            
    except requests.RequestException as e:
        error_msg = f"Error calling API: {str(e)}"
        logger.error(error_msg)

        # エラーレスポンスの作成
        body = {
            'error': 'Error calling timezone service',
            'details': str(e)
        }

        response_body = {
            'TEXT': {
                'body': json.dumps(body, ensure_ascii=False)
            }
        }

        action_response = {
            'actionGroup': event['actionGroup'],
            'function': function,
            'functionResponse': {
                'responseBody': response_body
            }
        }

        final_response = {
            'messageVersion': '1.0',
            'response': action_response,
            'sessionAttributes': session_attributes,
            'promptSessionAttributes': prompt_session_attributes
        }

        # 例外時の最終レスポンスをログに出力
        logger.error("Final Exception Response:")
        logger.error(json.dumps(final_response, indent=2, ensure_ascii=False))

        return final_response

SAM CLI から Lambda を実行して、無事に正常終了しています。

> sam remote invoke HelloWorldFunction --event '{"messageVersion":"1.0","function":"get-current-date-basedon-userinputed-timezone","parameters":[{"name":"timezone","type":"string","value":"gmt"}],"inp
utText":"GMTの時刻を教えてください","sessionId":"372952752580877","agent":{"name":"access-to-onpremis","version":"DRAFT","id":"PAMAPAQH3X","alias":"TSTALIASID"},"actionGroup":"GetCurrentDate","session
Attributes":{"lastTimezone":"jst"},"promptSessionAttributes":{"previousRequest":"日本の時刻"}}' --region ap-northeast-1

省略

{"messageVersion": "1.0", "response": {"actionGroup": "GetCurrentDate", "function": "get-current-date-basedon-userinputed-timezone", "functionResponse": {"responseBody": {"TEXT": {"body": "{\"timezone\": \"GMT\", \"current_time\": \"2025-02-23 08:46:51 GMT\"}"}}}}, "sessionAttributes": {"lastTimezone": "jst"}, "promptSessionAttributes": {"previousRequest": "\u65e5\u672c\u306e\u6642\u523b"}}⏎ 

Bedrock Agents の作成

つぎに、Bedrock Agents 側の設定です。Create Agent で作成をしていきます。

image-20250223121457125.png

適当に Agent の名前を入れます。

image-20250223121520830.png

モデルや Agent の説明を入れます。

image-20250223125827290.png

Action Group の追加をします。

image-20250223121812103.png

以下のパラメーターを指定します

  • 上記の手順で作成した Lambda 関数を紐づける
  • Parameter に timezone 指定。この timezone には、JST or EST or GMT しか与えられない制限を LLM に認識してもらうために、Description に制限事項を記載している

image-20250223130229989.png

Lambda の権限設定

このドキュメントにあるように Resource Based Policy で、Agent が Lambda 関数を呼び出せるように設定します。

image-20250223131214461.png

Add permissions を入れます。

image-20250223131227028.png

次のように設定します

  • Source ARN : Bedrock Agent の ARN を指定

image-20250223131522045.png

動作確認 in AWS マネジメントコンソール

AWS マネジメントコンソールで、チャット形式で Agent を実行できます。

「JST の時刻をおしえてください。」と投げてみると、想定通り、Agent が Lambda 関数を実行して、オンプレミスの API を実行し、現在の日本時刻を取得できていることがわかります。

image-20250223155956193.png

動作確認 in SDK

プログラム経由からも確認できます。invoke_agent で Agent を呼び出します。

import boto3
import logging
from botocore.exceptions import ClientError

# ロガーの設定
logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO)

class BedrockAgentClient:
    def __init__(self, region_name="ap-northeast-1"):
        self.agents_runtime_client = boto3.client(
            service_name="bedrock-agent-runtime",
            region_name=region_name
        )

    def invoke_agent(self, agent_id, agent_alias_id, session_id, prompt):
        try:
            response = self.agents_runtime_client.invoke_agent(
                agentId=agent_id,
                agentAliasId=agent_alias_id,
                sessionId=session_id,
                inputText=prompt,
            )

            completion = ""

            # ストリーミングレスポンスを処理
            for event in response["completion"]:
                if "chunk" in event and "bytes" in event["chunk"]:
                    chunk_bytes = event["chunk"]["bytes"]
                    if isinstance(chunk_bytes, (str, bytes)):
                        if isinstance(chunk_bytes, bytes):
                            completion += chunk_bytes.decode("utf-8")
                        else:
                            completion += chunk_bytes

        except ClientError as e:
            logger.error(f"Couldn't invoke agent. Error: {str(e)}")
            raise
        except Exception as e:
            logger.error(f"Unexpected error while invoking agent: {str(e)}")
            raise

        return completion


def main():
    # クライアントの初期化
    agent_client = BedrockAgentClient()

    agent_id = "PAMAPAQH3X"
    agent_alias_id = "JQ19PTI4CE"
    session_id = "test-session-001"
    prompt = "日本の時刻をおしえてください"

    try:
        # エージェントの呼び出し
        response = agent_client.invoke_agent(
            agent_id=agent_id,
            agent_alias_id=agent_alias_id,
            session_id=session_id,
            prompt=prompt
        )
        print("応答:", response)

    except Exception as e:
        print(f"エラーが発生しました: {str(e)}")


if __name__ == "__main__":
    main()

Tips: Lambda 関数が受け取る Event

Lambda 関数が input として受け取る Event はこんな感じです。parameters に、オンプレミスの API を実行するべきパラメーターが含まれており、これを元に Lambda はオンプレミスの API を実行するようにコードを実装しています。

{
    "messageVersion": "1.0",
    "function": "get-current-date-basedon-userinputed-timezone",
    "parameters": [
        {
            "name": "timezone",
            "type": "string",
            "value": "jst"
        }
    ],
    "sessionId": "372952752580343",
    "agent": {
        "name": "access-to-onpremis",
        "version": "DRAFT",
        "id": "PAMAPAQH3X",
        "alias": "TSTALIASID"
    },
    "actionGroup": "GetCurrentDate",
    "sessionAttributes": {},
    "promptSessionAttributes": {},
    "inputText": "JSTの時刻をおしえてください"
}

Tips : Agent が Action Group を実行する際に、確認を行う

セキュリティ面での Tips です。Agent が外部の API を実行するときに、実行の直前に人間による確認をしたいときがあります。例えば、機密情報を外部に出すことはないか、といった確認です。Bedrock Agents には、ユーザーに確認を促すための API が提供されています。

Action Group を選択します。

image-20250223155535895.png

Enable of confirmation of action group function を Enabled に変更します。

image-20250223155555285.png

すると、AWS マネジメントコンソール上で実行すると、以下の画像のようにユーザーへアクションを実行しても良いか確認がされます。get-current-date-basedon-userinputed-timezone という Function を、timezone : jst で実行してもいいか?というのが見えます。もしここで、意図しない関数や、パラメーターが LLM のハルシネーションによって設定されていると、Deny で防ぐことができます。

image-20250223155653437.png

Confirm を押すと、実行されました

image-20250223155807696.png

Tips : SDK で Action Group の実行を確認する

上記の Action Group の実行の前に確認することを SDK 上でもコントロールできます。

Python コードでユーザーの承認を行う場合はこんな感じで実行できます。表示の文章などは、好きにアプリケーション側で実装できるので、より分かりやすい日本語にすることもできます。

> python3 app.py
プロンプト: 日本時刻をおしえてください

=== アクション承認が必要です ===

アクション詳細:
- グループ: GetCurrentDate
- 関数: get-current-date-basedon-userinputed-timezone
- パラメータ:
  * timezone: jst (string)

このアクションを承認しますか? (y/n): 

ソースコードの全文はこちらです。ちょっと長いのですが、_handle_action_approval 関数あたりが重要な処理です。

import boto3
import logging
import json
from botocore.exceptions import ClientError

# ロガーの設定
logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO)

class BedrockAgentClient:
    def __init__(self, region_name="ap-northeast-1"):
        self.agents_runtime_client = boto3.client(
            service_name="bedrock-agent-runtime",
            region_name=region_name
        )

    def _process_chunk(self, chunk_bytes):
        """チャンクデータを処理し、テキストを返す"""
        if isinstance(chunk_bytes, bytes):
            return chunk_bytes.decode("utf-8")
        return chunk_bytes

    def _handle_action_approval(self, func_input, auto_approve):
        """アクション承認の処理"""
        action_group = func_input["actionGroup"]
        function_name = func_input["function"]
        parameters = func_input["parameters"]

        print(f"\nアクション詳細:")
        print(f"- グループ: {action_group}")
        print(f"- 関数: {function_name}")
        print("- パラメータ:")
        for param in parameters:
            print(f"  * {param['name']}: {param['value']} ({param['type']})")

        if auto_approve:
            print("自動承認します...")
            confirmation_state = "CONFIRM"
        else:
            user_input = input("\nこのアクションを承認しますか? (y/n): ")
            confirmation_state = "CONFIRM" if user_input.lower() == 'y' else "DENY"

        return action_group, function_name, confirmation_state

    def _create_session_state(self, invocation_id, action_group, function_name, confirmation_state):
        """セッション状態を作成"""
        session_state = {
            "invocationId": invocation_id,
            "returnControlInvocationResults": [
                {
                    "functionResult": {
                        "actionGroup": action_group,
                        "function": function_name,
                        "responseBody": {
                            "TEXT": {
                                "body": json.dumps({
                                    "confirmationState": confirmation_state
                                })
                            }
                        }
                    }
                }
            ]
        }
        print("\nPrint session_state:")
        print(json.dumps(session_state, indent=2, ensure_ascii=False))
        return session_state

    def _process_completion_events(self, events):
        """completionイベントの処理"""
        completion = ""
        for event in events:
            if "chunk" in event and "bytes" in event["chunk"]:
                completion += self._process_chunk(event["chunk"]["bytes"])
        return completion

    def _handle_return_control(self, event, agent_id, agent_alias_id, session_id, auto_approve):
        """returnControlイベントの処理"""
        print("\n=== アクション承認が必要です ===")
        control_data = event["returnControl"]
        invocation_id = control_data["invocationId"]

        for input_data in control_data.get("invocationInputs", []):
            if "functionInvocationInput" in input_data:
                action_group, function_name, confirmation_state = self._handle_action_approval(
                    input_data["functionInvocationInput"],
                    auto_approve
                )

                session_state = self._create_session_state(
                    invocation_id,
                    action_group,
                    function_name,
                    confirmation_state
                )

                new_response = self.agents_runtime_client.invoke_agent(
                    agentId=agent_id,
                    agentAliasId=agent_alias_id,
                    sessionId=session_id,
                    inputText="",
                    sessionState=session_state
                )

                if "completion" in new_response:
                    return self._process_completion_events(new_response["completion"])
        return ""

    def invoke_agent(self, agent_id, agent_alias_id, session_id, prompt, auto_approve=True):
        """
        Sends a prompt for the agent to process and respond to.

        :param agent_id: The unique identifier of the agent to use.
        :param agent_alias_id: The alias of the agent to use.
        :param session_id: The unique identifier of the session.
        :param prompt: The prompt that you want Claude to complete.
        :param auto_approve: 自動的にアクションを承認するかどうか
        :return: Inference response from the model.
        """
        try:
            response = self.agents_runtime_client.invoke_agent(
                agentId=agent_id,
                agentAliasId=agent_alias_id,
                sessionId=session_id,
                inputText=prompt,
                enableTrace=True
            )

            if "completion" not in response:
                return ""

            completion = ""
            for event in response["completion"]:
                if "returnControl" in event:
                    completion = self._handle_return_control(
                        event,
                        agent_id,
                        agent_alias_id,
                        session_id,
                        auto_approve
                    )
                elif "chunk" in event and "bytes" in event["chunk"]:
                    completion += self._process_chunk(event["chunk"]["bytes"])

            return completion

        except ClientError as e:
            logger.error(f"Couldn't invoke agent. Error: {str(e)}")
            raise
        except Exception as e:
            logger.error(f"Unexpected error while invoking agent: {str(e)}")
            raise


def main():
    # クライアントの初期化
    agent_client = BedrockAgentClient()

    # テスト用のパラメータ
    agent_id = "PAMAPAQH3X"
    agent_alias_id = "QPKHPSPHNH"
    session_id = "test-session-001"
    prompt = "日本時刻をおしえてください"

    try:
        print(f"プロンプト: {prompt}")
        # エージェントの呼び出し(auto_approve=Falseで手動承認モードに設定可能)
        response = agent_client.invoke_agent(
            agent_id=agent_id,
            agent_alias_id=agent_alias_id,
            session_id=session_id,
            prompt=prompt,
            auto_approve=False  # Falseにすると手動承認モードになります
        )
        print("\n=== 最終レスポンス ===")
        print(response)

    except Exception as e:
        print(f"エラーが発生しました: {str(e)}")


if __name__ == "__main__":
    main()

検証を通じてわかったこと

  • Agent 周りで意識した方がいい Service Quota
    • 1 個の Agent あたり、11 個の Action Group を有効化で設定可能 (Enabled action groups per agent)
    • 1 個の Action Group Function あたり、5 個の Parameter を設定可能 (Parameters per function)

参考 URL

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?