1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Amazon Bedrock AgentCore Runtime が双方向ストリーミングに対応したとのことで動かしてみた

Posted at

はじめに

AWS re:Invent 2025 で発表があった Amazon Bedrock AgentCore Runtime が双方向ストリーミングに対応したことについてシンプルに纏めます。

内容

この機能により、AI エージェントがユーザーの入力を受け取りながら同時に応答を生成する、リアルタイムの対話型アプリケーションの構築が可能になります。

双方向ストリーミングで実現すること

スクリーンショット 2025-12-18 23.39.34.png

これにより、音声エージェントでは自然な対話体験を、テキストベースの対話では応答性の向上を実現できます。

始め方

やりながら理解していきます。

1. 必要なパッケージをインストールする

bedrock-agentcore - AI エージェント構築のための Amazon Bedrock AgentCore SDK です。この中に Python WebSockets ライブラリの依存関係が含まれています。

mkdir agentcore-runtime-quickstart-websocket
cd agentcore-runtime-quickstart-websocket
python3 -m venv .venv
source .venv/bin/activate

pip install --upgrade pip

pip install bedrock-agentcore strands-agents

2. 双方向ストリーミングエージェントを作成する

@app.websocket デコレータ - ポート 8080/ws パスでの接続を自動的に処理します。

websocket_echo_agent.py
websocket_echo_agent.py
import asyncio
import json
from bedrock_agentcore import BedrockAgentCoreApp
from strands import Agent
from strands.models.bedrock import BedrockModel
from starlette.websockets import WebSocketDisconnect

# アプリケーションインスタンスの作成
app = BedrockAgentCoreApp()

# Bedrockモデルの設定: 適切なモデルIDとリージョン名を指定
model = BedrockModel(
    model_id="global.amazon.nova-2-lite-v1:0",
    region_name="us-west-2",
    max_tokens=5000
)

# システムプロンプトの定義
system_prompt="""あなたはパーソナルトレーナーです。
ユーザーの目標を達成するための最適なトレーニングプランを提案してください。
日本語で回答してください。"""

# エージェントの初期化
agent = Agent(
    model=model,
    system_prompt=system_prompt,
    callback_handler=None
)

# WebSocketハンドラーの定義
@app.websocket
async def websocket_handler(websocket, context):
    """
    WebSocketエンドポイント: ストリーミング応答と割り込み対応
    クライアントからのメッセージを受信し、ストリーミングで応答を返す
    途中で新しいメッセージが来た場合、現在の応答をキャンセルして新しい応答を開始
    """

    # WebSocket接続を受け入れ
    await websocket.accept()

    # 現在のストリーミングタスクを管理する変数ß
    current_task: asyncio.Task | None = None
    task_lock = asyncio.Lock()

    # ストリーミングレスポンス送信関数の定義
    async def stream_response(user_message: str):
        """
        エージェントからのストリーミングレスポンスを処理し、クライアントに送信
        Args:
            user_message: ユーザーからの入力メッセージ
        返却値なし
        例外:
            asyncio.CancelledError: タスクがキャンセルされた場合に発生
            その他の例外はストリーミング中のエラーとして処理        
        """

        full_response = ""
        chunk_count = 0
        print(f"[INFO] ストリーミング開始: ユーザーメッセージ='{user_message}'")
        
        try:
            # エージェントからのストリーミング応答を非同期で受信
            async for event in agent.stream_async(user_message):
                try:
                    if "data" in event:
                        chunk = event["data"]
                        if chunk:
                            full_response += chunk
                            chunk_count += 1
                            
                            await websocket.send_json({
                                "type": "chunk",
                                "data": chunk
                            })
                    
                    elif "error" in event:
                        error_msg = event["error"]
                        await websocket.send_json({"error": f"エージェントエラー: {error_msg}"})
                        print(f"[ERROR] エージェントエラー: {error_msg}")                      
                        return
                        
                except Exception as chunk_error:
                    print(f"[ERROR] チャンク処理エラー: {chunk_error}")
                    continue

            # ストリーミング完了
            await websocket.send_json({
                "type": "complete",
                "fullResponse": full_response
            })
            print(f"[INFO] ストリーミング完了: チャンク数={chunk_count}, 合計文字数={len(full_response)}")

        # 割り込みキャンセル処理
        except asyncio.CancelledError:
            try:
                await websocket.send_json({
                    "type": "interrupted",
                    "partialResponse": full_response
                })
            except Exception as ws_error:
                print(f"[ERROR] キャンセル通知の送信に失敗: {ws_error}")
            print(f"[WARN] 割り込みでキャンセル: チャンク数={chunk_count}, 部分レスポンス={len(full_response)}文字")
            raise
        
        # ストリーミング中のその他の例外処理
        except Exception as stream_error:
            try:
                await websocket.send_json({
                    "type": "error",
                    "error": f"ストリーミングエラー: {str(stream_error)}",
                    "partialResponse": full_response
                })
            except Exception as ws_error:
                print(f"[ERROR] エラー通知の送信に失敗: {ws_error}")
            print(f"[ERROR] ストリーミングエラー: {type(stream_error).__name__}: {stream_error}")
            raise

    # メイン受信ループ
    try:
        # クライアントからのメッセージを待機
        while True:
            try:
                # メッセージ受信
                raw_data = await websocket.receive_text()
                data = json.loads(raw_data)
                user_message = data.get("inputText", "")
            except json.JSONDecodeError as json_error:
                print(f"[ERROR] JSON解析エラー: {json_error}")
                try:
                    await websocket.send_json({"error": "不正なJSONフォーマットです"})
                except Exception:
                    pass
                continue
            except Exception as receive_error:
                print(f"[ERROR] メッセージ受信エラー: {receive_error}")
                break

            # 入力メッセージを検証
            if not user_message:
                try:
                    await websocket.send_json({"error": "inputTextが見つかりません"})
                except Exception as ws_error:
                    print(f"[ERROR] バリデーションエラー通知の送信に失敗: {ws_error}")
                    break
                continue

            print(f"[INFO] メッセージ受信: {user_message}")

            # Race Conditionを防ぐ安全なタスク管理
            async with task_lock:
                # 既存のストリーミングタスクがあればキャンセル
                if current_task and not current_task.done():
                    print("[WARN] 割り込み: 現在の回答をキャンセルします")
                    current_task.cancel()
                    try:
                        await current_task
                    except asyncio.CancelledError:
                        pass

                # 新しいストリーミングタスクを開始
                current_task = asyncio.create_task(stream_response(user_message))

    # クライアント切断時の処理
    except WebSocketDisconnect:
        print("[INFO] クライアントが切断しました")

    # その他の例外処理
    except Exception as e:
        print(f"[ERROR] WebSocketエラー: {e}")
        try:
            await websocket.send_json({"error": str(e)})
        except Exception:
            pass
    finally:
        # リソースクリーンアップ: 実行中のタスクがあればキャンセル
        if current_task and not current_task.done():
            print("[INFO] 実行中のタスクをクリーンアップしています")
            current_task.cancel()
            try:
                await current_task
            except asyncio.CancelledError:
                print("[INFO] タスクのキャンセルが完了しました")
            except Exception as cleanup_error:
                print(f"[ERROR] クリーンアップ中のエラー: {cleanup_error}")

# エントリーポイント
if __name__ == "__main__":
    app.run(log_level="info")
requirements.txt
requirements.txt
bedrock-agentcore
strands-agents

3. ローカルでテストする

ターミナル 1 で双方向ストリーミングエージェントを起動します。

python websocket_echo_agent.py

INFO:     Started server process [8077]
INFO:     Waiting for application startup.
INFO:     Application startup complete.
INFO:     Uvicorn running on http://127.0.0.1:8080 (Press CTRL+C to quit)

テスト用のクライアントを作成します。(ここはいい感じにストリーミングを体験したかったので、AIエージェントに作成させます。)

単純にやっている処理は基本的に ws://localhost:8080/ws に接続してリアルタイム処理をしているだけです。

websocket_agent_client.py
websocket_agent_client.py
import asyncio
import json
import signal
import sys
import websockets
from websockets.exceptions import ConnectionClosed

class WebSocketAgentClient:
    """
    WebSocketエージェントクライアント
    対話型チャットをサポートし、レスポンスの割り込み機能を備えています。
    
    主要機能:
    - WebSocketを介した双方向通信
    - リアルタイムレスポンス受信
    - レスポンス中断・割り込み機能
    - 対話型チャットインターフェース
    """
    
    def __init__(self, uri: str = "ws://localhost:8080/ws"):
        """
        クライアントの初期化
        
        Args:
            uri: WebSocketサーバーのURI(デフォルト: localhost:8080)
        """
        self.uri = uri
        self.websocket = None  # WebSocket接続オブジェクト
        self.running = False   # アプリケーション実行状態フラグ
        
        # 非同期タスク管理用のインスタンス変数
        self.response_task: asyncio.Task | None = None  # レスポンス受信タスク
        self.task_lock = asyncio.Lock()  # タスク同期用ロック(将来の拡張用)        

    async def connect(self):
        """
        WebSocketサーバーに接続
        
        Raises:
            Exception: 接続に失敗した場合
        """
        try:
            # WebSocket接続を確立
            self.websocket = await websockets.connect(self.uri)
            print(f"✅ 接続成功: {self.uri}")
            self.running = True
        except Exception as e:
            print(f"❌ 接続エラー: {e}")
            raise

    async def disconnect(self):
        """
        WebSocket接続を安全に切断
        
        実行順序:
        1. 実行状態フラグをFalseに設定
        2. 実行中のレスポンスタスクをキャンセル
        3. WebSocket接続を閉じる
        """
        self.running = False
        
        # 実行中のレスポンスタスクをキャンセル
        if self.response_task and not self.response_task.done():
            self.response_task.cancel()
            try:
                await self.response_task
            except asyncio.CancelledError:
                pass  # キャンセル例外は正常な処理
        
        # WebSocket接続を閉じる
        if self.websocket:
            await self.websocket.close()
            print("✅ 接続切断: WebSocket接続を終了しました")

    async def send_message(self, message: str):
        """
        サーバーにメッセージを送信
        
        Args:
            message: 送信するメッセージ文字列
            
        Raises:
            RuntimeError: WebSocket接続が確立されていない場合
        """
        if not self.websocket:
            raise RuntimeError("WebSocketに接続されていません")
        
        # JSONペイロードを作成してサーバーに送信
        payload = {"inputText": message}
        await self.websocket.send(json.dumps(payload))
        print(f"📤 送信完了: {message}")

    async def receive_response(self):
        """
        サーバーからのレスポンスを受信して表示
        
        レスポンス形式:
        - chunk: ストリーミング中の部分レスポンス
        - complete: レスポンス完了
        - interrupted: レスポンス中断
        - error: エラー発生
        """
        if not self.websocket:
            return
        
        # レスポンス受信開始の表示
        print("🤖 エージェント: ", end="", flush=True)
        full_response = ""  # 完全なレスポンステキストを蓄積
        
        try:
            # WebSocketメッセージを非同期で受信
            async for message in self.websocket:
                try:
                    # JSONメッセージをパース
                    data = json.loads(message)
                    response_type = data.get("type")
                    
                    # ストリーミングデータのチャンク処理
                    if response_type == "chunk":
                        chunk = data.get("data", "")
                        full_response += chunk
                        print(chunk, end="", flush=True)  # リアルタイム表示
                        
                    # レスポンス完了処理
                    elif response_type == "complete":
                        print("\n✅ レスポンス完了")
                        break
                        
                    # レスポンス中断処理
                    elif response_type == "interrupted":
                        print("\n⚠️  レスポンス中断: 新しいリクエストにより処理が中断されました")
                        partial_response = data.get("partialResponse", "")
                        if partial_response != full_response:
                            print(f"📝 部分レスポンス: {partial_response}")
                        break
                        
                    # エラー処理
                    elif "error" in data:
                        error_msg = data["error"]
                        print(f"\n❌ サーバーエラー: {error_msg}")
                        break
                        
                except json.JSONDecodeError:
                    print(f"\n❌ JSON解析エラー: 不正なメッセージ形式 - {message}")
                    
        except ConnectionClosed:
            print("\n🔌 接続切断: サーバーとの接続が予期せず切断されました")
        except asyncio.CancelledError:
            print("\n⏹️  処理中断: レスポンス受信がキャンセルされました")
            raise  # タスクキャンセルは上位に伝播

    async def interactive_chat(self):
        """
        対話型チャット機能
        
        機能詳細:
        - ユーザー入力の非同期受付
        - レスポンス中断機能
        - 終了コマンド対応(quit, exit, q)
        - エラー処理とクリーンアップ
        """
        # チャット開始メッセージ
        print("💬 チャット開始: 対話型エージェントチャットを開始します")
        print("💡 使い方: 'quit', 'exit', 'q'で終了 | 新しいメッセージで前回レスポンス中断")
        print("" + "="*50)  # 区切り線
        
        try:
            # メインチャットループ
            while self.running:
                try:
                    # ユーザー入力を非同期で受け取る
                    user_input = await asyncio.get_event_loop().run_in_executor(
                        None, input, "👤 あなた: "
                    )

                    # 終了コマンドチェック
                    if user_input.strip().lower() in ['quit', 'exit', 'q']:
                        print("👋 チャット終了: ご利用ありがとうございました")
                        break
                    
                    # 空入力のスキップ
                    if not user_input.strip():
                        continue
                    
                    # 進行中のレスポンスがあれば中断
                    if self.response_task and not self.response_task.done():
                        print("⚠️  処理中断: 前のレスポンスを中断して新しいリクエストを処理します...")
                        self.response_task.cancel()
                        try:
                            await self.response_task
                        except asyncio.CancelledError:
                            pass  # 正常なキャンセル処理
                    
                    # メッセージ送信
                    await self.send_message(user_input)
                    
                    # レスポンス受信タスクを作成・実行
                    self.response_task = asyncio.create_task(self.receive_response())
                    await self.response_task
                    
                    print()  # レスポンス後の改行
                    
                except EOFError:
                    # Ctrl+D(EOF)による終了
                    print("\n👋 チャット終了: EOF信号を受信しました")
                    break
                except KeyboardInterrupt:
                    # Ctrl+C による中断(継続可能)
                    print("\n⚠️  入力中断: キーボード割り込みが発生しました(チャットは継続中)")
                    continue
                    
        except Exception as e:
            print(f"❌ チャットエラー: 予期しないエラーが発生しました - {e}")
        finally:
            # チャット終了時のクリーンアップ
            await self.disconnect()

async def main():
    """
    メインエントリーポイント
    
    実行フロー:
    1. WebSocketクライアントの初期化
    2. シグナルハンドラーの設定
    3. サーバー接続
    4. 対話チャット開始
    5. エラーハンドリングとクリーンアップ
    """

    # WebSocketクライアントインスタンスを作成
    client = WebSocketAgentClient()
    
    # システム終了シグナル用ハンドラー
    def signal_handler():
        print("\n🛑 システム終了: 終了シグナルを受信しました")
        client.running = False
    
    # Unix系システムでのシグナルハンドラー登録
    if sys.platform != "win32":
        loop = asyncio.get_running_loop()
        # SIGINT (Ctrl+C) と SIGTERM の両方をハンドル
        for signame in {'SIGINT', 'SIGTERM'}:
            loop.add_signal_handler(getattr(signal, signame), signal_handler)
    
    try:
        # 1. WebSocketサーバーへの接続確立
        await client.connect()
        
        # 2. 対話型チャットセッションの開始
        await client.interactive_chat()
        
    except KeyboardInterrupt:
        print("\n🛑 プログラム中断: キーボード割り込みによりアプリケーションを終了します")
    except Exception as e:
        print(f"❌ アプリケーションエラー: 予期しないエラーが発生しました - {e}")
    finally:
        # 必要なクリーンアップ処理
        await client.disconnect()

if __name__ == "__main__":
    """
    アプリケーションエントリーポイント
    
    セットアップ手順:
    1. WebSocketサーバー(websocket_echo_agent.py)を起動
    2. このクライアントを実行: python websocket_agent_client.py
    3. 表示される指示に従って対話チャットを開始
    
    注意事項:
    - サーバーが起動していることを確認してください
    - デフォルトポート: 8080
    """
    print("🚀 起動開始: WebSocketエージェントクライアントを初期化中...")
    
    try:
        # メイン処理の非同期実行
        asyncio.run(main())
    except KeyboardInterrupt:
        print("\n🛑 アプリケーション終了: ユーザーによる中断")
    except Exception as e:
        print(f"❌ 起動失敗: アプリケーション開始時にエラーが発生しました - {e}")
        sys.exit(1)  # エラー終了

テスト用のクライアントを実行し、「1週間で10キロ痩せたい」と質問してみます。

python websocket_agent_client.py

🚀 起動開始: WebSocketエージェントクライアントを初期化中...
✅ 接続成功: ws://localhost:8080/ws
💬 チャット開始: 対話型エージェントチャットを開始します
💡 使い方: 'quit', 'exit', 'q'で終了 | 新しいメッセージで前回レスポンス中断
==================================================
👤 あなた: 1週間で10キロ痩せたい

タイトルなし.gif

いい感じでストリーミング処理されました。

ちなみに 1週間で10キロ痩せることは健康的に非常に困難ですと Amazon Nova Lite2 がちゃんと返してくれて安心しました。

4. AgentCore Runtime にデプロイしテストする

これまで作成した双方向ストリーミングエージェントを Amazon Bedrock AgentCore スターターキットでデプロイするのでインストールしておきます。

pip install bedrock-agentcore-starter-toolkit

対話形式で設定ファイルを作成し、双方向ストリーミングエージェントをデプロイします。

agentcore configure -e websocket_echo_agent.py

agentcore launch

デプロイが完了したら、テスト用のクライアントを作成します。先程ローカルでテストしたクライアントを AgentCore Runtime にデプロイした双方向ストリーミングエージェントへアクセスできるように置き換えます。

ざっくりと、AgentCoreRuntimeClientgenerate_ws_connection() を使用して AWS 認証付きの接続 URL・ヘッダーを生成し、AgentCore Runtime へアクセスが変更になった意外は先程と同じです。

websocket_agent_client.py
websocket_agent_client.py
import asyncio
import json
import os
import signal
import sys
import websockets
from websockets.exceptions import ConnectionClosed
from bedrock_agentcore.runtime import AgentCoreRuntimeClient

class WebSocketAgentClient:
    """
    Amazon Bedrock AgentCore Runtime WebSocketクライアント
    対話型チャットをサポートし、レスポンスの割り込み機能を備えています。
    
    主要機能:
    - AgentCore Runtime WebSocketを介した双方向通信
    - AWS認証による安全な接続
    - リアルタイムレスポンス受信
    - レスポンス中断・割り込み機能
    - 対話型チャットインターフェース
    """
    
    def __init__(self, runtime_arn: str = None, region: str = "us-west-2"):
        """
        クライアントの初期化
        
        Args:
            runtime_arn: AgentCore RuntimeのARN
            region: AWSリージョン(デフォルト: us-west-2)
        """
        self.runtime_arn = runtime_arn or os.getenv("AGENT_ARN")
        if not self.runtime_arn:
            raise ValueError("AGENT_ARN環境変数が設定されていないか、runtime_arnが指定されていません")
        
        self.region = region
        self.websocket = None  # WebSocket接続オブジェクト
        self.running = False   # アプリケーション実行状態フラグ
        self.client = AgentCoreRuntimeClient(region=region)
        
        # 非同期タスク管理用のインスタンス変数
        self.response_task: asyncio.Task | None = None  # レスポンス受信タスク
        self.task_lock = asyncio.Lock()  # タスク同期用ロック(将来の拡張用)        

    async def connect(self):
        """
        AgentCore RuntimeのWebSocketサーバーに接続
        
        Raises:
            Exception: 接続に失敗した場合
        """
        try:
            # AgentCore Runtime用のWebSocket接続URLとヘッダーを生成
            ws_url, headers = self.client.generate_ws_connection(
                runtime_arn=self.runtime_arn, 
                session_id=None
            )
            
            # WebSocket接続を確立(認証ヘッダー付き)
            self.websocket = await websockets.connect(ws_url, additional_headers=headers)
            print(f"✅ AgentCore Runtime接続成功: {self.runtime_arn}")
            self.running = True
        except Exception as e:
            print(f"❌ AgentCore Runtime接続エラー: {e}")
            raise

    async def disconnect(self):
        """
        WebSocket接続を安全に切断
        
        実行順序:
        1. 実行状態フラグをFalseに設定
        2. 実行中のレスポンスタスクをキャンセル
        3. WebSocket接続を閉じる
        """
        self.running = False
        
        # 実行中のレスポンスタスクをキャンセル
        if self.response_task and not self.response_task.done():
            self.response_task.cancel()
            try:
                await self.response_task
            except asyncio.CancelledError:
                pass  # キャンセル例外は正常な処理
        
        # WebSocket接続を閉じる
        if self.websocket:
            await self.websocket.close()
            print("✅ 接続切断: WebSocket接続を終了しました")

    async def send_message(self, message: str):
        """
        サーバーにメッセージを送信
        
        Args:
            message: 送信するメッセージ文字列
            
        Raises:
            RuntimeError: WebSocket接続が確立されていない場合
        """
        if not self.websocket:
            raise RuntimeError("WebSocketに接続されていません")
        
        # JSONペイロードを作成してサーバーに送信
        payload = {"inputText": message}
        await self.websocket.send(json.dumps(payload))
        print(f"📤 送信完了: {message}")

    async def receive_response(self):
        """
        サーバーからのレスポンスを受信して表示
        
        レスポンス形式:
        - chunk: ストリーミング中の部分レスポンス
        - complete: レスポンス完了
        - interrupted: レスポンス中断
        - error: エラー発生
        """
        if not self.websocket:
            return
        
        # レスポンス受信開始の表示
        print("🤖 エージェント: ", end="", flush=True)
        full_response = ""  # 完全なレスポンステキストを蓄積
        
        try:
            # WebSocketメッセージを非同期で受信
            async for message in self.websocket:
                try:
                    # JSONメッセージをパース
                    data = json.loads(message)
                    response_type = data.get("type")
                    
                    # ストリーミングデータのチャンク処理
                    if response_type == "chunk":
                        chunk = data.get("data", "")
                        full_response += chunk
                        print(chunk, end="", flush=True)  # リアルタイム表示
                        
                    # レスポンス完了処理
                    elif response_type == "complete":
                        print("\n✅ レスポンス完了")
                        break
                        
                    # レスポンス中断処理
                    elif response_type == "interrupted":
                        print("\n⚠️  レスポンス中断: 新しいリクエストにより処理が中断されました")
                        partial_response = data.get("partialResponse", "")
                        if partial_response != full_response:
                            print(f"📝 部分レスポンス: {partial_response}")
                        break
                        
                    # エラー処理
                    elif "error" in data:
                        error_msg = data["error"]
                        print(f"\n❌ サーバーエラー: {error_msg}")
                        break
                        
                except json.JSONDecodeError:
                    print(f"\n❌ JSON解析エラー: 不正なメッセージ形式 - {message}")
                    
        except ConnectionClosed:
            print("\n🔌 接続切断: サーバーとの接続が予期せず切断されました")
        except asyncio.CancelledError:
            print("\n⏹️  処理中断: レスポンス受信がキャンセルされました")
            raise  # タスクキャンセルは上位に伝播

    async def interactive_chat(self):
        """
        対話型チャット機能
        
        機能詳細:
        - ユーザー入力の非同期受付
        - レスポンス中断機能
        - 終了コマンド対応(quit, exit, q)
        - エラー処理とクリーンアップ
        """
        # チャット開始メッセージ
        print("💬 チャット開始: 対話型エージェントチャットを開始します")
        print("💡 使い方: 'quit', 'exit', 'q'で終了 | 新しいメッセージで前回レスポンス中断")
        print("" + "="*50)  # 区切り線
        
        try:
            # メインチャットループ
            while self.running:
                try:
                    # ユーザー入力を非同期で受け取る
                    user_input = await asyncio.get_event_loop().run_in_executor(
                        None, input, "👤 あなた: "
                    )

                    # 終了コマンドチェック
                    if user_input.strip().lower() in ['quit', 'exit', 'q']:
                        print("👋 チャット終了: ご利用ありがとうございました")
                        break
                    
                    # 空入力のスキップ
                    if not user_input.strip():
                        continue
                    
                    # 進行中のレスポンスがあれば中断
                    if self.response_task and not self.response_task.done():
                        print("⚠️  処理中断: 前のレスポンスを中断して新しいリクエストを処理します...")
                        self.response_task.cancel()
                        try:
                            await self.response_task
                        except asyncio.CancelledError:
                            pass  # 正常なキャンセル処理
                    
                    # メッセージ送信
                    await self.send_message(user_input)
                    
                    # レスポンス受信タスクを作成・実行
                    self.response_task = asyncio.create_task(self.receive_response())
                    await self.response_task
                    
                    print()  # レスポンス後の改行
                    
                except EOFError:
                    # Ctrl+D(EOF)による終了
                    print("\n👋 チャット終了: EOF信号を受信しました")
                    break
                except KeyboardInterrupt:
                    # Ctrl+C による中断(継続可能)
                    print("\n⚠️  入力中断: キーボード割り込みが発生しました(チャットは継続中)")
                    continue
                    
        except Exception as e:
            print(f"❌ チャットエラー: 予期しないエラーが発生しました - {e}")
        finally:
            # チャット終了時のクリーンアップ
            await self.disconnect()

async def main():
    """
    メインエントリーポイント
    
    実行フロー:
    1. WebSocketクライアントの初期化
    2. シグナルハンドラーの設定
    3. サーバー接続
    4. 対話チャット開始
    5. エラーハンドリングとクリーンアップ
    """

    # 環境変数からAgentCore Runtime ARNを取得
    runtime_arn = os.getenv("AGENT_ARN") 
    # WebSocketクライアントインスタンスを作成
    client = WebSocketAgentClient(runtime_arn=runtime_arn)
    
    # システム終了シグナル用ハンドラー
    def signal_handler():
        print("\n🛑 システム終了: 終了シグナルを受信しました")
        client.running = False
    
    # Unix系システムでのシグナルハンドラー登録
    if sys.platform != "win32":
        loop = asyncio.get_running_loop()
        # SIGINT (Ctrl+C) と SIGTERM の両方をハンドル
        for signame in {'SIGINT', 'SIGTERM'}:
            loop.add_signal_handler(getattr(signal, signame), signal_handler)
    
    try:
        # 1. WebSocketサーバーへの接続確立
        await client.connect()
        
        # 2. 対話型チャットセッションの開始
        await client.interactive_chat()
        
    except KeyboardInterrupt:
        print("\n🛑 プログラム中断: キーボード割り込みによりアプリケーションを終了します")
    except Exception as e:
        print(f"❌ アプリケーションエラー: 予期しないエラーが発生しました - {e}")
    finally:
        # 必要なクリーンアップ処理
        await client.disconnect()

if __name__ == "__main__":
    """
    アプリケーションエントリーポイント
    
    セットアップ手順:
    1. AgentCore RuntimeにWebSocketエージェントをデプロイ
    2. 環境変数AGENT_ARNにランタイムARNを設定、またはコード内で指定
    3. このクライアントを実行: python websocket_agent_client.py
    4. 表示される指示に従って対話チャットを開始
    
    必要な環境変数:
    - AGENT_ARN: AgentCore RuntimeのARN(オプション、コード内でデフォルト値を設定済み)
    - AWS認証情報(AWS CLI設定、IAMロール、環境変数など)
    
    必要なIAM権限:
    - bedrock-agentcore:InvokeAgentRuntimeWithWebSocketStream
    """
    print("🚀 起動開始: AgentCore Runtime WebSocketクライアントを初期化中...")
    
    try:
        # メイン処理の非同期実行
        asyncio.run(main())
    except KeyboardInterrupt:
        print("\n🛑 アプリケーション終了: ユーザーによる中断")
    except Exception as e:
        print(f"❌ 起動失敗: アプリケーション開始時にエラーが発生しました - {e}")
        sys.exit(1)  # エラー終了

AgentCore Runtime の arn を環境変数に指定します。

export AGENT_ARN="arn:aws:bedrock-agentcore:us-west-2:123456789012:runtime/websocket_echo_agent-3lHuhlFuym"

テスト用のクライアントを実行し、「半年後にプロレスラーみたいなめっちゃマッチョになりたいです。」と質問してみます。

agentcore-runtime-quickstart-websocket % python websocket_agent_client.py

🚀 起動開始: AgentCore Runtime WebSocketクライアントを初期化中...
✅ AgentCore Runtime接続成功: arn:aws:bedrock-agentcore:us-west-2:123456789012:runtime/websocket_echo_agent-3lHuhlFuym
💬 チャット開始: 対話型エージェントチャットを開始します
💡 使い方: 'quit', 'exit', 'q'で終了 | 新しいメッセージで前回レスポンス中断
==================================================
👤 あなた: 半年後にプロレスラーみたいなめっちゃマッチョになりたいです。

タイトルなし.gif

今度も Amazon Nova Lite2 君は、このプランをしっかり守れば、半年後には「プロレスラーのようなマッチョな肉体」に近づけるはずです! 頑張ってください と返してくれたので、単純な回答を求めるだけぐらいの AI エージェントならば悪くなさそうですよ。

まとめ

双方向ストリーミングのサポートにより、Amazon Bedrock AgentCore Runtime は音声エージェントやリアルタイムチャットボットの構築基盤として、より実用的な選択肢になりました。
特に、カスタマーサポートや音声アシスタントなど、自然な対話が求められるユースケースでの活用が期待されそうです。

1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?