はじめに
AWS re:Invent 2025 で発表があった Amazon Bedrock AgentCore Runtime が双方向ストリーミングに対応したことについてシンプルに纏めます。
内容
この機能により、AI エージェントがユーザーの入力を受け取りながら同時に応答を生成する、リアルタイムの対話型アプリケーションの構築が可能になります。
双方向ストリーミングで実現すること
これにより、音声エージェントでは自然な対話体験を、テキストベースの対話では応答性の向上を実現できます。
始め方
やりながら理解していきます。
1. 必要なパッケージをインストールする
bedrock-agentcore - AI エージェント構築のための Amazon Bedrock AgentCore SDK です。この中に Python WebSockets ライブラリの依存関係が含まれています。
mkdir agentcore-runtime-quickstart-websocket
cd agentcore-runtime-quickstart-websocket
python3 -m venv .venv
source .venv/bin/activate
pip install --upgrade pip
pip install bedrock-agentcore strands-agents
2. 双方向ストリーミングエージェントを作成する
@app.websocket デコレータ - ポート 8080 の /ws パスでの接続を自動的に処理します。
websocket_echo_agent.py
import asyncio
import json
from bedrock_agentcore import BedrockAgentCoreApp
from strands import Agent
from strands.models.bedrock import BedrockModel
from starlette.websockets import WebSocketDisconnect
# アプリケーションインスタンスの作成
app = BedrockAgentCoreApp()
# Bedrockモデルの設定: 適切なモデルIDとリージョン名を指定
model = BedrockModel(
model_id="global.amazon.nova-2-lite-v1:0",
region_name="us-west-2",
max_tokens=5000
)
# システムプロンプトの定義
system_prompt="""あなたはパーソナルトレーナーです。
ユーザーの目標を達成するための最適なトレーニングプランを提案してください。
日本語で回答してください。"""
# エージェントの初期化
agent = Agent(
model=model,
system_prompt=system_prompt,
callback_handler=None
)
# WebSocketハンドラーの定義
@app.websocket
async def websocket_handler(websocket, context):
"""
WebSocketエンドポイント: ストリーミング応答と割り込み対応
クライアントからのメッセージを受信し、ストリーミングで応答を返す
途中で新しいメッセージが来た場合、現在の応答をキャンセルして新しい応答を開始
"""
# WebSocket接続を受け入れ
await websocket.accept()
# 現在のストリーミングタスクを管理する変数ß
current_task: asyncio.Task | None = None
task_lock = asyncio.Lock()
# ストリーミングレスポンス送信関数の定義
async def stream_response(user_message: str):
"""
エージェントからのストリーミングレスポンスを処理し、クライアントに送信
Args:
user_message: ユーザーからの入力メッセージ
返却値なし
例外:
asyncio.CancelledError: タスクがキャンセルされた場合に発生
その他の例外はストリーミング中のエラーとして処理
"""
full_response = ""
chunk_count = 0
print(f"[INFO] ストリーミング開始: ユーザーメッセージ='{user_message}'")
try:
# エージェントからのストリーミング応答を非同期で受信
async for event in agent.stream_async(user_message):
try:
if "data" in event:
chunk = event["data"]
if chunk:
full_response += chunk
chunk_count += 1
await websocket.send_json({
"type": "chunk",
"data": chunk
})
elif "error" in event:
error_msg = event["error"]
await websocket.send_json({"error": f"エージェントエラー: {error_msg}"})
print(f"[ERROR] エージェントエラー: {error_msg}")
return
except Exception as chunk_error:
print(f"[ERROR] チャンク処理エラー: {chunk_error}")
continue
# ストリーミング完了
await websocket.send_json({
"type": "complete",
"fullResponse": full_response
})
print(f"[INFO] ストリーミング完了: チャンク数={chunk_count}, 合計文字数={len(full_response)}")
# 割り込みキャンセル処理
except asyncio.CancelledError:
try:
await websocket.send_json({
"type": "interrupted",
"partialResponse": full_response
})
except Exception as ws_error:
print(f"[ERROR] キャンセル通知の送信に失敗: {ws_error}")
print(f"[WARN] 割り込みでキャンセル: チャンク数={chunk_count}, 部分レスポンス={len(full_response)}文字")
raise
# ストリーミング中のその他の例外処理
except Exception as stream_error:
try:
await websocket.send_json({
"type": "error",
"error": f"ストリーミングエラー: {str(stream_error)}",
"partialResponse": full_response
})
except Exception as ws_error:
print(f"[ERROR] エラー通知の送信に失敗: {ws_error}")
print(f"[ERROR] ストリーミングエラー: {type(stream_error).__name__}: {stream_error}")
raise
# メイン受信ループ
try:
# クライアントからのメッセージを待機
while True:
try:
# メッセージ受信
raw_data = await websocket.receive_text()
data = json.loads(raw_data)
user_message = data.get("inputText", "")
except json.JSONDecodeError as json_error:
print(f"[ERROR] JSON解析エラー: {json_error}")
try:
await websocket.send_json({"error": "不正なJSONフォーマットです"})
except Exception:
pass
continue
except Exception as receive_error:
print(f"[ERROR] メッセージ受信エラー: {receive_error}")
break
# 入力メッセージを検証
if not user_message:
try:
await websocket.send_json({"error": "inputTextが見つかりません"})
except Exception as ws_error:
print(f"[ERROR] バリデーションエラー通知の送信に失敗: {ws_error}")
break
continue
print(f"[INFO] メッセージ受信: {user_message}")
# Race Conditionを防ぐ安全なタスク管理
async with task_lock:
# 既存のストリーミングタスクがあればキャンセル
if current_task and not current_task.done():
print("[WARN] 割り込み: 現在の回答をキャンセルします")
current_task.cancel()
try:
await current_task
except asyncio.CancelledError:
pass
# 新しいストリーミングタスクを開始
current_task = asyncio.create_task(stream_response(user_message))
# クライアント切断時の処理
except WebSocketDisconnect:
print("[INFO] クライアントが切断しました")
# その他の例外処理
except Exception as e:
print(f"[ERROR] WebSocketエラー: {e}")
try:
await websocket.send_json({"error": str(e)})
except Exception:
pass
finally:
# リソースクリーンアップ: 実行中のタスクがあればキャンセル
if current_task and not current_task.done():
print("[INFO] 実行中のタスクをクリーンアップしています")
current_task.cancel()
try:
await current_task
except asyncio.CancelledError:
print("[INFO] タスクのキャンセルが完了しました")
except Exception as cleanup_error:
print(f"[ERROR] クリーンアップ中のエラー: {cleanup_error}")
# エントリーポイント
if __name__ == "__main__":
app.run(log_level="info")
requirements.txt
bedrock-agentcore
strands-agents
3. ローカルでテストする
ターミナル 1 で双方向ストリーミングエージェントを起動します。
python websocket_echo_agent.py
INFO: Started server process [8077]
INFO: Waiting for application startup.
INFO: Application startup complete.
INFO: Uvicorn running on http://127.0.0.1:8080 (Press CTRL+C to quit)
テスト用のクライアントを作成します。(ここはいい感じにストリーミングを体験したかったので、AIエージェントに作成させます。)
単純にやっている処理は基本的に ws://localhost:8080/ws に接続してリアルタイム処理をしているだけです。
websocket_agent_client.py
import asyncio
import json
import signal
import sys
import websockets
from websockets.exceptions import ConnectionClosed
class WebSocketAgentClient:
"""
WebSocketエージェントクライアント
対話型チャットをサポートし、レスポンスの割り込み機能を備えています。
主要機能:
- WebSocketを介した双方向通信
- リアルタイムレスポンス受信
- レスポンス中断・割り込み機能
- 対話型チャットインターフェース
"""
def __init__(self, uri: str = "ws://localhost:8080/ws"):
"""
クライアントの初期化
Args:
uri: WebSocketサーバーのURI(デフォルト: localhost:8080)
"""
self.uri = uri
self.websocket = None # WebSocket接続オブジェクト
self.running = False # アプリケーション実行状態フラグ
# 非同期タスク管理用のインスタンス変数
self.response_task: asyncio.Task | None = None # レスポンス受信タスク
self.task_lock = asyncio.Lock() # タスク同期用ロック(将来の拡張用)
async def connect(self):
"""
WebSocketサーバーに接続
Raises:
Exception: 接続に失敗した場合
"""
try:
# WebSocket接続を確立
self.websocket = await websockets.connect(self.uri)
print(f"✅ 接続成功: {self.uri}")
self.running = True
except Exception as e:
print(f"❌ 接続エラー: {e}")
raise
async def disconnect(self):
"""
WebSocket接続を安全に切断
実行順序:
1. 実行状態フラグをFalseに設定
2. 実行中のレスポンスタスクをキャンセル
3. WebSocket接続を閉じる
"""
self.running = False
# 実行中のレスポンスタスクをキャンセル
if self.response_task and not self.response_task.done():
self.response_task.cancel()
try:
await self.response_task
except asyncio.CancelledError:
pass # キャンセル例外は正常な処理
# WebSocket接続を閉じる
if self.websocket:
await self.websocket.close()
print("✅ 接続切断: WebSocket接続を終了しました")
async def send_message(self, message: str):
"""
サーバーにメッセージを送信
Args:
message: 送信するメッセージ文字列
Raises:
RuntimeError: WebSocket接続が確立されていない場合
"""
if not self.websocket:
raise RuntimeError("WebSocketに接続されていません")
# JSONペイロードを作成してサーバーに送信
payload = {"inputText": message}
await self.websocket.send(json.dumps(payload))
print(f"📤 送信完了: {message}")
async def receive_response(self):
"""
サーバーからのレスポンスを受信して表示
レスポンス形式:
- chunk: ストリーミング中の部分レスポンス
- complete: レスポンス完了
- interrupted: レスポンス中断
- error: エラー発生
"""
if not self.websocket:
return
# レスポンス受信開始の表示
print("🤖 エージェント: ", end="", flush=True)
full_response = "" # 完全なレスポンステキストを蓄積
try:
# WebSocketメッセージを非同期で受信
async for message in self.websocket:
try:
# JSONメッセージをパース
data = json.loads(message)
response_type = data.get("type")
# ストリーミングデータのチャンク処理
if response_type == "chunk":
chunk = data.get("data", "")
full_response += chunk
print(chunk, end="", flush=True) # リアルタイム表示
# レスポンス完了処理
elif response_type == "complete":
print("\n✅ レスポンス完了")
break
# レスポンス中断処理
elif response_type == "interrupted":
print("\n⚠️ レスポンス中断: 新しいリクエストにより処理が中断されました")
partial_response = data.get("partialResponse", "")
if partial_response != full_response:
print(f"📝 部分レスポンス: {partial_response}")
break
# エラー処理
elif "error" in data:
error_msg = data["error"]
print(f"\n❌ サーバーエラー: {error_msg}")
break
except json.JSONDecodeError:
print(f"\n❌ JSON解析エラー: 不正なメッセージ形式 - {message}")
except ConnectionClosed:
print("\n🔌 接続切断: サーバーとの接続が予期せず切断されました")
except asyncio.CancelledError:
print("\n⏹️ 処理中断: レスポンス受信がキャンセルされました")
raise # タスクキャンセルは上位に伝播
async def interactive_chat(self):
"""
対話型チャット機能
機能詳細:
- ユーザー入力の非同期受付
- レスポンス中断機能
- 終了コマンド対応(quit, exit, q)
- エラー処理とクリーンアップ
"""
# チャット開始メッセージ
print("💬 チャット開始: 対話型エージェントチャットを開始します")
print("💡 使い方: 'quit', 'exit', 'q'で終了 | 新しいメッセージで前回レスポンス中断")
print("" + "="*50) # 区切り線
try:
# メインチャットループ
while self.running:
try:
# ユーザー入力を非同期で受け取る
user_input = await asyncio.get_event_loop().run_in_executor(
None, input, "👤 あなた: "
)
# 終了コマンドチェック
if user_input.strip().lower() in ['quit', 'exit', 'q']:
print("👋 チャット終了: ご利用ありがとうございました")
break
# 空入力のスキップ
if not user_input.strip():
continue
# 進行中のレスポンスがあれば中断
if self.response_task and not self.response_task.done():
print("⚠️ 処理中断: 前のレスポンスを中断して新しいリクエストを処理します...")
self.response_task.cancel()
try:
await self.response_task
except asyncio.CancelledError:
pass # 正常なキャンセル処理
# メッセージ送信
await self.send_message(user_input)
# レスポンス受信タスクを作成・実行
self.response_task = asyncio.create_task(self.receive_response())
await self.response_task
print() # レスポンス後の改行
except EOFError:
# Ctrl+D(EOF)による終了
print("\n👋 チャット終了: EOF信号を受信しました")
break
except KeyboardInterrupt:
# Ctrl+C による中断(継続可能)
print("\n⚠️ 入力中断: キーボード割り込みが発生しました(チャットは継続中)")
continue
except Exception as e:
print(f"❌ チャットエラー: 予期しないエラーが発生しました - {e}")
finally:
# チャット終了時のクリーンアップ
await self.disconnect()
async def main():
"""
メインエントリーポイント
実行フロー:
1. WebSocketクライアントの初期化
2. シグナルハンドラーの設定
3. サーバー接続
4. 対話チャット開始
5. エラーハンドリングとクリーンアップ
"""
# WebSocketクライアントインスタンスを作成
client = WebSocketAgentClient()
# システム終了シグナル用ハンドラー
def signal_handler():
print("\n🛑 システム終了: 終了シグナルを受信しました")
client.running = False
# Unix系システムでのシグナルハンドラー登録
if sys.platform != "win32":
loop = asyncio.get_running_loop()
# SIGINT (Ctrl+C) と SIGTERM の両方をハンドル
for signame in {'SIGINT', 'SIGTERM'}:
loop.add_signal_handler(getattr(signal, signame), signal_handler)
try:
# 1. WebSocketサーバーへの接続確立
await client.connect()
# 2. 対話型チャットセッションの開始
await client.interactive_chat()
except KeyboardInterrupt:
print("\n🛑 プログラム中断: キーボード割り込みによりアプリケーションを終了します")
except Exception as e:
print(f"❌ アプリケーションエラー: 予期しないエラーが発生しました - {e}")
finally:
# 必要なクリーンアップ処理
await client.disconnect()
if __name__ == "__main__":
"""
アプリケーションエントリーポイント
セットアップ手順:
1. WebSocketサーバー(websocket_echo_agent.py)を起動
2. このクライアントを実行: python websocket_agent_client.py
3. 表示される指示に従って対話チャットを開始
注意事項:
- サーバーが起動していることを確認してください
- デフォルトポート: 8080
"""
print("🚀 起動開始: WebSocketエージェントクライアントを初期化中...")
try:
# メイン処理の非同期実行
asyncio.run(main())
except KeyboardInterrupt:
print("\n🛑 アプリケーション終了: ユーザーによる中断")
except Exception as e:
print(f"❌ 起動失敗: アプリケーション開始時にエラーが発生しました - {e}")
sys.exit(1) # エラー終了
テスト用のクライアントを実行し、「1週間で10キロ痩せたい」と質問してみます。
python websocket_agent_client.py
🚀 起動開始: WebSocketエージェントクライアントを初期化中...
✅ 接続成功: ws://localhost:8080/ws
💬 チャット開始: 対話型エージェントチャットを開始します
💡 使い方: 'quit', 'exit', 'q'で終了 | 新しいメッセージで前回レスポンス中断
==================================================
👤 あなた: 1週間で10キロ痩せたい
いい感じでストリーミング処理されました。
ちなみに 1週間で10キロ痩せることは健康的に非常に困難ですと Amazon Nova Lite2 がちゃんと返してくれて安心しました。
4. AgentCore Runtime にデプロイしテストする
これまで作成した双方向ストリーミングエージェントを Amazon Bedrock AgentCore スターターキットでデプロイするのでインストールしておきます。
pip install bedrock-agentcore-starter-toolkit
対話形式で設定ファイルを作成し、双方向ストリーミングエージェントをデプロイします。
agentcore configure -e websocket_echo_agent.py
agentcore launch
デプロイが完了したら、テスト用のクライアントを作成します。先程ローカルでテストしたクライアントを AgentCore Runtime にデプロイした双方向ストリーミングエージェントへアクセスできるように置き換えます。
ざっくりと、AgentCoreRuntimeClient の generate_ws_connection() を使用して AWS 認証付きの接続 URL・ヘッダーを生成し、AgentCore Runtime へアクセスが変更になった意外は先程と同じです。
websocket_agent_client.py
import asyncio
import json
import os
import signal
import sys
import websockets
from websockets.exceptions import ConnectionClosed
from bedrock_agentcore.runtime import AgentCoreRuntimeClient
class WebSocketAgentClient:
"""
Amazon Bedrock AgentCore Runtime WebSocketクライアント
対話型チャットをサポートし、レスポンスの割り込み機能を備えています。
主要機能:
- AgentCore Runtime WebSocketを介した双方向通信
- AWS認証による安全な接続
- リアルタイムレスポンス受信
- レスポンス中断・割り込み機能
- 対話型チャットインターフェース
"""
def __init__(self, runtime_arn: str = None, region: str = "us-west-2"):
"""
クライアントの初期化
Args:
runtime_arn: AgentCore RuntimeのARN
region: AWSリージョン(デフォルト: us-west-2)
"""
self.runtime_arn = runtime_arn or os.getenv("AGENT_ARN")
if not self.runtime_arn:
raise ValueError("AGENT_ARN環境変数が設定されていないか、runtime_arnが指定されていません")
self.region = region
self.websocket = None # WebSocket接続オブジェクト
self.running = False # アプリケーション実行状態フラグ
self.client = AgentCoreRuntimeClient(region=region)
# 非同期タスク管理用のインスタンス変数
self.response_task: asyncio.Task | None = None # レスポンス受信タスク
self.task_lock = asyncio.Lock() # タスク同期用ロック(将来の拡張用)
async def connect(self):
"""
AgentCore RuntimeのWebSocketサーバーに接続
Raises:
Exception: 接続に失敗した場合
"""
try:
# AgentCore Runtime用のWebSocket接続URLとヘッダーを生成
ws_url, headers = self.client.generate_ws_connection(
runtime_arn=self.runtime_arn,
session_id=None
)
# WebSocket接続を確立(認証ヘッダー付き)
self.websocket = await websockets.connect(ws_url, additional_headers=headers)
print(f"✅ AgentCore Runtime接続成功: {self.runtime_arn}")
self.running = True
except Exception as e:
print(f"❌ AgentCore Runtime接続エラー: {e}")
raise
async def disconnect(self):
"""
WebSocket接続を安全に切断
実行順序:
1. 実行状態フラグをFalseに設定
2. 実行中のレスポンスタスクをキャンセル
3. WebSocket接続を閉じる
"""
self.running = False
# 実行中のレスポンスタスクをキャンセル
if self.response_task and not self.response_task.done():
self.response_task.cancel()
try:
await self.response_task
except asyncio.CancelledError:
pass # キャンセル例外は正常な処理
# WebSocket接続を閉じる
if self.websocket:
await self.websocket.close()
print("✅ 接続切断: WebSocket接続を終了しました")
async def send_message(self, message: str):
"""
サーバーにメッセージを送信
Args:
message: 送信するメッセージ文字列
Raises:
RuntimeError: WebSocket接続が確立されていない場合
"""
if not self.websocket:
raise RuntimeError("WebSocketに接続されていません")
# JSONペイロードを作成してサーバーに送信
payload = {"inputText": message}
await self.websocket.send(json.dumps(payload))
print(f"📤 送信完了: {message}")
async def receive_response(self):
"""
サーバーからのレスポンスを受信して表示
レスポンス形式:
- chunk: ストリーミング中の部分レスポンス
- complete: レスポンス完了
- interrupted: レスポンス中断
- error: エラー発生
"""
if not self.websocket:
return
# レスポンス受信開始の表示
print("🤖 エージェント: ", end="", flush=True)
full_response = "" # 完全なレスポンステキストを蓄積
try:
# WebSocketメッセージを非同期で受信
async for message in self.websocket:
try:
# JSONメッセージをパース
data = json.loads(message)
response_type = data.get("type")
# ストリーミングデータのチャンク処理
if response_type == "chunk":
chunk = data.get("data", "")
full_response += chunk
print(chunk, end="", flush=True) # リアルタイム表示
# レスポンス完了処理
elif response_type == "complete":
print("\n✅ レスポンス完了")
break
# レスポンス中断処理
elif response_type == "interrupted":
print("\n⚠️ レスポンス中断: 新しいリクエストにより処理が中断されました")
partial_response = data.get("partialResponse", "")
if partial_response != full_response:
print(f"📝 部分レスポンス: {partial_response}")
break
# エラー処理
elif "error" in data:
error_msg = data["error"]
print(f"\n❌ サーバーエラー: {error_msg}")
break
except json.JSONDecodeError:
print(f"\n❌ JSON解析エラー: 不正なメッセージ形式 - {message}")
except ConnectionClosed:
print("\n🔌 接続切断: サーバーとの接続が予期せず切断されました")
except asyncio.CancelledError:
print("\n⏹️ 処理中断: レスポンス受信がキャンセルされました")
raise # タスクキャンセルは上位に伝播
async def interactive_chat(self):
"""
対話型チャット機能
機能詳細:
- ユーザー入力の非同期受付
- レスポンス中断機能
- 終了コマンド対応(quit, exit, q)
- エラー処理とクリーンアップ
"""
# チャット開始メッセージ
print("💬 チャット開始: 対話型エージェントチャットを開始します")
print("💡 使い方: 'quit', 'exit', 'q'で終了 | 新しいメッセージで前回レスポンス中断")
print("" + "="*50) # 区切り線
try:
# メインチャットループ
while self.running:
try:
# ユーザー入力を非同期で受け取る
user_input = await asyncio.get_event_loop().run_in_executor(
None, input, "👤 あなた: "
)
# 終了コマンドチェック
if user_input.strip().lower() in ['quit', 'exit', 'q']:
print("👋 チャット終了: ご利用ありがとうございました")
break
# 空入力のスキップ
if not user_input.strip():
continue
# 進行中のレスポンスがあれば中断
if self.response_task and not self.response_task.done():
print("⚠️ 処理中断: 前のレスポンスを中断して新しいリクエストを処理します...")
self.response_task.cancel()
try:
await self.response_task
except asyncio.CancelledError:
pass # 正常なキャンセル処理
# メッセージ送信
await self.send_message(user_input)
# レスポンス受信タスクを作成・実行
self.response_task = asyncio.create_task(self.receive_response())
await self.response_task
print() # レスポンス後の改行
except EOFError:
# Ctrl+D(EOF)による終了
print("\n👋 チャット終了: EOF信号を受信しました")
break
except KeyboardInterrupt:
# Ctrl+C による中断(継続可能)
print("\n⚠️ 入力中断: キーボード割り込みが発生しました(チャットは継続中)")
continue
except Exception as e:
print(f"❌ チャットエラー: 予期しないエラーが発生しました - {e}")
finally:
# チャット終了時のクリーンアップ
await self.disconnect()
async def main():
"""
メインエントリーポイント
実行フロー:
1. WebSocketクライアントの初期化
2. シグナルハンドラーの設定
3. サーバー接続
4. 対話チャット開始
5. エラーハンドリングとクリーンアップ
"""
# 環境変数からAgentCore Runtime ARNを取得
runtime_arn = os.getenv("AGENT_ARN")
# WebSocketクライアントインスタンスを作成
client = WebSocketAgentClient(runtime_arn=runtime_arn)
# システム終了シグナル用ハンドラー
def signal_handler():
print("\n🛑 システム終了: 終了シグナルを受信しました")
client.running = False
# Unix系システムでのシグナルハンドラー登録
if sys.platform != "win32":
loop = asyncio.get_running_loop()
# SIGINT (Ctrl+C) と SIGTERM の両方をハンドル
for signame in {'SIGINT', 'SIGTERM'}:
loop.add_signal_handler(getattr(signal, signame), signal_handler)
try:
# 1. WebSocketサーバーへの接続確立
await client.connect()
# 2. 対話型チャットセッションの開始
await client.interactive_chat()
except KeyboardInterrupt:
print("\n🛑 プログラム中断: キーボード割り込みによりアプリケーションを終了します")
except Exception as e:
print(f"❌ アプリケーションエラー: 予期しないエラーが発生しました - {e}")
finally:
# 必要なクリーンアップ処理
await client.disconnect()
if __name__ == "__main__":
"""
アプリケーションエントリーポイント
セットアップ手順:
1. AgentCore RuntimeにWebSocketエージェントをデプロイ
2. 環境変数AGENT_ARNにランタイムARNを設定、またはコード内で指定
3. このクライアントを実行: python websocket_agent_client.py
4. 表示される指示に従って対話チャットを開始
必要な環境変数:
- AGENT_ARN: AgentCore RuntimeのARN(オプション、コード内でデフォルト値を設定済み)
- AWS認証情報(AWS CLI設定、IAMロール、環境変数など)
必要なIAM権限:
- bedrock-agentcore:InvokeAgentRuntimeWithWebSocketStream
"""
print("🚀 起動開始: AgentCore Runtime WebSocketクライアントを初期化中...")
try:
# メイン処理の非同期実行
asyncio.run(main())
except KeyboardInterrupt:
print("\n🛑 アプリケーション終了: ユーザーによる中断")
except Exception as e:
print(f"❌ 起動失敗: アプリケーション開始時にエラーが発生しました - {e}")
sys.exit(1) # エラー終了
AgentCore Runtime の arn を環境変数に指定します。
export AGENT_ARN="arn:aws:bedrock-agentcore:us-west-2:123456789012:runtime/websocket_echo_agent-3lHuhlFuym"
テスト用のクライアントを実行し、「半年後にプロレスラーみたいなめっちゃマッチョになりたいです。」と質問してみます。
agentcore-runtime-quickstart-websocket % python websocket_agent_client.py
🚀 起動開始: AgentCore Runtime WebSocketクライアントを初期化中...
✅ AgentCore Runtime接続成功: arn:aws:bedrock-agentcore:us-west-2:123456789012:runtime/websocket_echo_agent-3lHuhlFuym
💬 チャット開始: 対話型エージェントチャットを開始します
💡 使い方: 'quit', 'exit', 'q'で終了 | 新しいメッセージで前回レスポンス中断
==================================================
👤 あなた: 半年後にプロレスラーみたいなめっちゃマッチョになりたいです。
今度も Amazon Nova Lite2 君は、このプランをしっかり守れば、半年後には「プロレスラーのようなマッチョな肉体」に近づけるはずです! 頑張ってください と返してくれたので、単純な回答を求めるだけぐらいの AI エージェントならば悪くなさそうですよ。
まとめ
双方向ストリーミングのサポートにより、Amazon Bedrock AgentCore Runtime は音声エージェントやリアルタイムチャットボットの構築基盤として、より実用的な選択肢になりました。
特に、カスタマーサポートや音声アシスタントなど、自然な対話が求められるユースケースでの活用が期待されそうです。


