この記事は AIエージェント構築&運用 Advent Calendar 2025 シリーズ1の18日目の記事です。
ソースコードは下記のリポジトリで公開しています(今回は Google Antigravity で、Gemini 3 Pro や Claude Opus 4.5 を使って実装してみました)。
※公開リポジトリの情報を載せるのが初めてなので、内心ビクビクしています...セキュリティ的にまずい情報・設定などあったらこっそり教えてくださるとありがたいです...
はじめに
自己紹介
私は社会人4年目で、普段はソフトウェアエンジニアとして家からリモートで働いています。
Qiita への記事投稿は初となりますので、お手柔らかにお願いします。
AI関連のスキルレベルとしては、業務ではAIはコーディングを始めとした使う側で活用していて、AIエージェント構築の実務経験はないといった感じです。
今回の検証のきっかけ
以前、Zenn が主催の 第3回 AI Agent Hackathon with Google Cloud に作品を提出する機会があり、そこで初めてAIを組み込んだアプリ開発を経験しました。開発したアプリも音声インターフェースのWebアプリだったんですが、
- 音声ファイルを Cloud Storage にアップロード
- イベント駆動でAIエージェントの処理を起動
- Speech-To-Text API で文字起こし
- 文字起こし内容をもとに、Gemini にリクエスト
- Gemini からのレスポンスをもとに、以下2つを並行して行う
- Text-To-Speech API での音声変換
- Gemini による画像生成
- フロントエンドから、生成された音声や画像にアクセス
といった挙動だったので、レスポンスが返るまでに時間がかかってしまう課題がありました。その後、ADK における双方向ストリーミングや Gemini Live API の存在を知り、今回のアドベントカレンダーの題材として検証してみようと思った次第です。
参考:
なんとつい先日の12/13に Gemini Live API が Vertex AI でGAとなりました!
英語ですが、公式ブログを載せておきます。
デモ動画
実際に作ってみたWebアプリのデモ動画になります。
音声インターフェースなので自分の声が入ってしまうのが恥ずかしいですが...
※動画でもわかるようにしていますが、画像生成中は2倍速再生にしており、実際は1分程度の待ち時間が発生しています。
構成
クライアントから Gemini Live API を直接呼び出すのではなく、Cloud Run を経由してアクセスするようにしています。Cloud Run 自体はパブリック公開となってしまいますが、WebSocket 接続開始前に Firebase のIDトークンをサーバ側で検証することで、アプリケーションのフロントエンドで認証済みのユーザーのみが通信を開始できるようにセキュリティを考慮しました。
今回は使用していないですが、Firebase App Check なども活用するとよりセキュリティを高められると考えています。
※ちなみに、構成図からだとわかりづらいですが、Gemini へのリクエストはすべて Vertex AI 経由でアクセスしており、APIキーはどこにも保持していません。
ADK による WebSocket 接続の抽象化
ADK がやりすぎなくらい WebSocket 接続を抽象化してくれるので、公式ドキュメントに沿ってすぐに実装することができました。
※ 一部省略していたり、コメントを追記したりしているので、正式なソースコードは最初に記載した GitHub リポジトリを参照してください。
# FastAPI アプリケーションの初期化
app = FastAPI()
# アプリケーション設定
# VertexAiSessionService を使用する場合、app_name には Agent Engine ID を指定する
APP_NAME = (
settings.vertex_ai_agent_engine_id
if settings.vertex_ai_agent_engine_id
and settings.session_type.lower() == "vertexai"
else "coco-ai-bidi-streaming"
)
# ADK コンポーネントの初期化
session_service = get_session_service(APP_NAME)
runner = Runner(app_name=APP_NAME, agent=agent, session_service=session_service)
# 状態として保存
# websocket.pyにおいてもルーターからアクセス可能にするため
app.state.runner = runner
app.state.session_service = session_service
app.state.app_name = APP_NAME
# ルーターの登録
app.include_router(websocket.router)
@router.websocket("/ws")
async def websocket_endpoint(
websocket: WebSocket,
token: str | None = None,
chat_id: str | None = None,
response_mode: str = "audio",
):
"""
WebSocket エンドポイント。
クライアントからの接続を認証・検証し、ADK Runner を使用して
双方向ストリーミングセッションを開始します。
Args:
websocket: WebSocket 接続オブジェクト。
token: Firebase Authentication ID トークン (クエリパラメータ、必須)。
chat_id: チャットセッションの ID (クエリパラメータ、必須)。
response_mode: レスポンスのモード。"audio" (デフォルト) または "text"。
"""
# 接続受け入れ前に必須パラメータの検証 → 省略
# Firebase ID トークンの検証 → 省略
# 認証成功後、WebSocket 接続を受け入れ
await websocket.accept()
# main.py で設定された Runner と SessionService を取得
runner = websocket.app.state.runner
session_service = websocket.app.state.session_service
app_name = websocket.app.state.app_name
# セッションの取得または作成
## 省略
# LiveRequestQueue の作成
# ストリーミングを行っている間、エージェントに送るメッセージの中継チャンネルとなる
live_request_queue = LiveRequestQueue()
# レスポンスモードの設定
response_modalities = [types.Modality.AUDIO]
output_audio_transcription = types.AudioTranscriptionConfig()
if response_mode and response_mode.lower() == "text":
response_modalities = [types.Modality.TEXT]
output_audio_transcription = None
# RunConfig の設定
# ストリーミングの挙動を設定する
# 音声レスポンス or 文字レスポンス、LLMへのinputやoutputにおいて文字起こしを有効にするかなど
run_config = RunConfig(
streaming_mode=StreamingMode.BIDI,
response_modalities=response_modalities,
input_audio_transcription=types.AudioTranscriptionConfig(),
output_audio_transcription=output_audio_transcription,
session_resumption=types.SessionResumptionConfig(),
)
# 双方向通信の実装(upstream / downstream)
async def upstream_task():
"""
WebSocket からメッセージを受信し、LiveRequestQueue に送信します。
"""
try:
while True:
message = await websocket.receive()
if "bytes" in message:
# 音声データ (bytes)
data = message["bytes"]
# ADK は types.Blob でラップする必要がある
# サンプルレートを含めないと policy violation エラーが発生する
blob = types.Blob(data=data, mime_type="audio/pcm;rate=16000")
live_request_queue.send_realtime(blob)
elif "text" in message:
# テキストメッセージ
text = message["text"]
content = types.Content(parts=[types.Part(text=text)])
live_request_queue.send_content(content)
except WebSocketDisconnect:
logger.info("クライアントが切断しました (Upstream)")
except Exception as e:
logger.error(f"Upstream エラー: {e}")
finally:
# クライアント切断時はキューを閉じて終了シグナルを送る
live_request_queue.close()
async def downstream_task():
"""
Runner からのイベントを受信し、WebSocket に送信します。
文字起こしが完了したイベントを Firestore に保存します。
"""
try:
async for event in runner.run_live(
user_id=user_id,
session_id=session_id,
live_request_queue=live_request_queue,
run_config=run_config,
):
# イベントを JSON にシリアライズして送信
event_json = event.model_dump_json(exclude_none=True, by_alias=True)
await websocket.send_text(event_json)
# 文字起こし完了イベントを Firestore に保存
await _save_transcription_if_finished(event, chat_id)
except SessionFinishedException:
# セッション終了: クライアントに通知してから再スロー
## 省略
except Exception as e:
logger.error(f"Downstream エラー: {e}")
# 双方向タスクの並行実行
try:
await asyncio.gather(upstream_task(), downstream_task())
except SessionFinishedException:
logger.info("Session ended by tool (User requested termination).")
# クライアントにセッション終了を通知
## 省略
except Exception as e:
logger.error(f"セッション全体のエラー: {e}")
finally:
logger.info("セッション終了処理")
live_request_queue.close()
try:
await websocket.close()
except Exception:
pass
画像生成ツールの追加
Gemini Live API を叩くだけだと自律的に動く「エージェント」感がないなと思ったので、ユーザーから「絵を描いて」といったリクエストがきたら、Gemini Live API が、ツールとして Nano Banana Pro を呼び出し、画像を生成する機能をつけてみました。
Firestore で画像生成ジョブの実行ステータスを管理し、これをフロントエンド側でリアルタイム監視することで、
- 画像生成が始まったら画面上に「画像を生成しています...」と表示
- 画像生成が完了したら Cloud Storage から画像を取得して表示
するようになっています。
セッション管理には Vertex AI Agent Engine を利用
Gemini のセッション管理には Vertex AI Agent Engine におけるマネージドのセッション管理機能を利用しています。
Agent Engine のセッション管理機能はてっきりエージェントを Agent Engine にデプロイする前提かと思っていたんですが、エージェントをデプロイすることなく利用することができました。
今回はブラウザとエージェント間で WebSocket 接続を行う必要があり、Vertex AI Agent Engine が WebSocket 接続 に対応していなさそうだったため、WebSocket 接続を直接受け付けるために、ADK で実装したエージェントはコンテナ化して Cloud Run にデプロイしています(間違っていたらご指摘ください)。
セッション管理機能を使うために Agent Engine のインスタンス作成は必要となります。必要なライブラリをインストールしたうえで、下記のコードを Cloud Shell から実行してインスタンスを作成しています。
pip install google-cloud-aiplatform
import vertexai
from vertexai import types
import os
# プロジェクトIDとロケーションを設定
PROJECT_ID = os.environ.get("GOOGLE_CLOUD_PROJECT", "YOUR_PROJECT_ID")
LOCATION = "us-central1"
# Clientの初期化 (v1beta1 APIを使用)
client = vertexai.Client(
project=PROJECT_ID,
location=LOCATION,
http_options=dict(api_version="v1beta1")
)
# Agent Engine インスタンスの作成 (コードなし、IDのみ)
try:
print("Creating Agent Engine (Identity Only)...")
remote_agent = client.agent_engines.create(
config={
"identity_type": types.IdentityType.AGENT_IDENTITY
}
)
print("✅ Created successfully.")
print(f"Resource Name: {remote_agent.name}")
except Exception as e:
print(f"Error creating Agent Engine: {e}")
つまづいた点
サンプリングレートの調整
こちらに記載の通り、Gemini Live API に音声を渡す際は下記の条件を満たしている必要があります。
- Format: 16-bit PCM (signed integer)
- Sample Rate: 16,000 Hz (16kHz)
- Channels: Mono (single channel)
ブラウザでの音声録音機能は Flutter Sound を利用したのですが、パラメーターでサンプリングレートを指定してもなかなかうまく動きませんでした...
したがって、会話の音声データを取得、内部でリサンプリングするロジックを含めています。
ver. 9.8.1 beta-2のChangeLogによると、Webではサンプリングレートがハードウェアに依存するため、パラメーター自体が非推奨のようです。
正直Antigravityが解決してくれた感じなので、有識者の方でより良い対応策があれば教えていただけると幸いです。
利用するリージョンについて
Gemini Live API ですが、まだ us-central1 リージョンでしか利用できない(少なくとも東京リージョンでは利用できない)ようです。
(プレビュー版の記載のままですが)参考ドキュメント:
ADKのLlmAgentクラス(自分も最初良く分からず使っていたんですが、AgentクラスはLlmAgentクラスのTypeAliasとして定義されています)を利用する場合、環境変数 GOOGLE_CLOUD_LOCATION の値を見て、内部でどのリージョンのエンドポイントを叩くか決定しているようで、GOOGLE_CLOUD_LOCATION="us-central1" に設定する必要がありました。
agent = Agent(
name="coco_ai_live_agent",
model=settings.model_id, # gemini-live-2.5-flash-native-audio
instruction=ROOT_SYSTEM_INSTRUCTION,
tools=[generate_image_tool, end_session_tool, set_chat_title_tool],
)
一度 asia-northeast1 で Google Cloud のリソースなどを作成してしまっていたため、エラーになってから気づいて慌てて作り直すことになりました。
インフラ構築(initial_setup_infra.sh)やデプロイ(cloudbuild.yml)もスクリプトで管理していたので、やり直しがきいて助かりました。
以前はとりあえず東京リージョンにリソースを作っていく感覚でしたが、AIエージェントでは検証したいサービスがどのリージョンで使えるのか事前に確かめておくのが大事ですね...
ちなみに、Google Gen AI SDK を利用して Vertex AI 上のモデルにリクエストする場合は、クライアントの初期化時に location パラメータでエンドポイント設定を上書きできます。今回 Nano Banana Pro を叩く際は、global エンドポイントを指定しています。
client = genai.Client(
vertexai=True,
project=settings.google_cloud_project,
location=settings.image_gen_location, # global
)
response = await asyncio.to_thread(
client.models.generate_content,
model=settings.image_gen_model_id, # gemini-3-pro-image-preview
contents=prompt,
config=types.GenerateContentConfig(
response_modalities=["IMAGE"],
image_config=types.ImageConfig(aspect_ratio="16:9"),
),
)
おわりに
AWSでも、今年の re:Invent 2025 で、双方向ストリーミング関連の新発表があったそうです!
AWS方面の生成AI技術をほぼキャッチアップできていないので(業務はAWSメインなのに...)、この機会に今回作ったものを Strands Agents + AgentCore でも試してみたいなーと思います!
参考:
