2
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

ADKのエージェントをバッチで動かしてみる

Last updated at Posted at 2025-12-11

本記事は AIエージェント構築&運用 Advent Calendar 2025 の12日目の記事です🦌

はじめに

Agent Development Kit (ADK) はAIエージェントを開発・デプロイするためのフレームワークです。Googleによって開発が進められているのもあり、GeminiやGoogleエコシステムとの連携がスムーズですが、他の選択肢からモデルやデプロイ先を柔軟に選択することができます。
ADKに限らない話かもですが、チャットボットや対話型アシスタントのように、通常AIエージェントは人間との会話を通じて何かしらのタスクを遂行するケースが多いかと思います。
ですが今回は、このエージェントをバッチ処理の用途で動かしてみようと思います。つまり、人間と会話はせずに何かしらのタスクを大量にエージェントに遂行させる、という使い方です。

背景

バッチ処理で大量に生成を行う場合、LLMを組み込んだワークフローを構築するアプローチもありますが、AIエージェントを採用する理由は以下のあたりがあるかなと思います。

  • 処理内容に応じて動的に判断させてツールを選択させたい
  • マルチエージェントシステムを構築することで最終アウトプットの品質を高めたい

AIエージェントにやらせたい処理の流れを事前に完全に把握できる場合、むしろワークフローを構築すれば良いかなと思いますが、そうでない場合は必要に応じてツール呼び出しなどの判断をしてくれるAIエージェントが良いかなと思います。

実装

ADKのエージェントをバッチで並列実行するためのコードは以下のようになります。
なお、以下のコードはPythonのバージョンが>=3.12 <3.14google-adk==1.16.0で動作確認しております。

import asyncio
import datetime
from collections.abc import Awaitable, Sequence
from dataclasses import dataclass
from typing import TypeVar

from google.adk.agents import Agent, LlmAgent
from google.adk.runners import Runner
from google.adk.sessions import BaseSessionService, InMemorySessionService
from google.genai import types
from pydantic import BaseModel, ValidationError

_T = TypeVar("_T")


@dataclass(frozen=True)
class AsyncExecutor:
    sem: asyncio.Semaphore

    async def run_single(self, awaitable: Awaitable[_T]) -> _T:
        async with self.sem:
            return await awaitable

    async def run(self, awaitables: Sequence[Awaitable[_T]]) -> Sequence[_T]:
        tasks = [self.run_single(awaitable) for awaitable in awaitables]

        return await asyncio.gather(*tasks)

    def get_event_loop(self) -> asyncio.AbstractEventLoop:
        try:
            loop = asyncio.get_running_loop()
        except RuntimeError:
            loop = asyncio.new_event_loop()
        return loop

    @classmethod
    def new(cls, max_concurrency: int) -> "AsyncExecutor":
        return cls(sem=asyncio.Semaphore(max_concurrency))


ResponseT = TypeVar("ResponseT", bound=BaseModel)


class InvocationResult(BaseModel, frozen=True):
    is_success: bool
    response_text: str | None
    error_message: str | None
    start_at: datetime.datetime
    end_at: datetime.datetime

    @property
    def duration_seconds(self) -> float:
        return (self.end_at - self.start_at).total_seconds()

    def parse_response_text(self, response_type: type[ResponseT]) -> ResponseT | None:
        if not self.is_success or not self.response_text:
            return None
        try:
            return response_type.model_validate_json(self.response_text)
        except ValidationError:
            return None

    @classmethod
    def success(
        cls,
        response_text: str,
        start_at: datetime.datetime,
        end_at: datetime.datetime | None = None,
    ) -> "InvocationResult":
        if end_at is None:
            end_at = datetime.datetime.now()
        return cls(
            is_success=True,
            response_text=response_text,
            error_message=None,
            start_at=start_at,
            end_at=end_at,
        )

    @classmethod
    def failure(
        cls,
        error_message: str | None,
        start_at: datetime.datetime,
        end_at: datetime.datetime | None = None,
    ) -> "InvocationResult":
        if end_at is None:
            end_at = datetime.datetime.now()
        return cls(
            is_success=False,
            response_text=None,
            error_message=error_message,
            start_at=start_at,
            end_at=end_at,
        )


@dataclass(frozen=True)
class RunnerManager:
    _runner: Runner
    user_id: str
    session_service: BaseSessionService
    async_executor: AsyncExecutor

    async def _arun_single(self, content: types.Content) -> InvocationResult:
        start_at = datetime.datetime.now()
        session = await self.session_service.create_session(
            app_name=self._runner.app_name, user_id=self.user_id
        )
        try:
            async for event in self._runner.run_async(
                user_id=self.user_id,
                session_id=session.id,
                new_message=content,
            ):
                if event.is_final_response() and event.content is not None:
                    if (
                        event.content
                        and event.content.parts
                        and event.content.parts[0].text
                    ):
                        return InvocationResult.success(
                            response_text=event.content.parts[0].text,
                            start_at=start_at,
                        )
                    else:
                        return InvocationResult.failure(
                            error_message="No content in final response: "
                            f"{event.error_message}",
                            start_at=start_at,
                        )
        except Exception as e:
            return InvocationResult.failure(
                error_message=str(e),
                start_at=start_at,
            )

        return InvocationResult.failure(
            error_message="No final response received from runner.",
            start_at=start_at,
        )

    async def arun(
        self, contents: Sequence[types.Content]
    ) -> Sequence[InvocationResult]:
        return await self.async_executor.run(
            [self._arun_single(content) for content in contents]
        )

    def run(self, contents: Sequence[types.Content]) -> Sequence[InvocationResult]:
        loop = self.async_executor.get_event_loop()
        return loop.run_until_complete(self.arun(contents))

    @classmethod
    def new(
        cls,
        app_name: str,
        agent: Agent,
        user_id: str,
        max_concurrency: int,
    ) -> "RunnerManager":
        session_service = InMemorySessionService()
        runner = Runner(app_name=app_name, agent=agent, session_service=session_service)
        return cls(
            _runner=runner,
            user_id=user_id,
            session_service=session_service,
            async_executor=AsyncExecutor.new(max_concurrency=max_concurrency),
        )


def main() -> None:
    class Summary(BaseModel, frozen=True):
        """要約結果のスキーマ"""

        summary: str
        keywords: list[str]

    # Geminiモデルを使う場合、環境変数を設定する必要があります。
    # https://google.github.io/adk-docs/agents/models/#using-google-gemini-models
    agent = LlmAgent(
        name="summarizer",
        model="gemini-2.5-flash",
        instruction="与えられたテキストを要約し、キーワードを抽出してください。",
        output_schema=Summary,
        disallow_transfer_to_parent=True,
        disallow_transfer_to_peers=True,
    )

    runner_manager = RunnerManager.new(
        app_name="batch_summarization",
        agent=agent,
        user_id="batch_user",
        max_concurrency=10,
    )

    # ダミーの入力データ
    texts = [
        "{ブログ記事1}",
        "{ブログ記事2}",
        "{ブログ記事3}",
    ]
    contents = [
        types.Content(role="user", parts=[types.Part(text=text)]) for text in texts
    ]

    # 並列実行
    results = runner_manager.run(contents)

    # 結果出力
    for result in results:
        if result.is_success:
            summary = result.parse_response_text(Summary)
            print(f"Summary: {summary}")


if __name__ == "__main__":
    main()

上記のコードは大きく3つのクラスで構成されています:

  1. AsyncExecutor: 並行数を制御しながら非同期処理を実行
  2. InvocationResult: エージェント実行の結果を保持
  3. RunnerManager: ADK Runnerをラップして並列実行を管理

実装のポイント1: セッション管理

通常の会話ベースでは、セッションを永続化して会話履歴を蓄積していきますが、バッチ処理では会話履歴が不要です。
RunnerManager._arun_singleメソッドを見ると、リクエストごとに新しいセッションを作成しています:

  session = await self.session_service.create_session(
      app_name=self._runner.app_name, user_id=self.user_id
  )

また、InMemorySessionServiceを使うことで永続化のオーバーヘッドを削減しています。バッチ処理ではセッションを使い捨てにできるため、メモリ内で完結させる方が効率的です。

実装のポイント2: 並列実行

AsyncExecutorクラスでは、asyncio.Semaphoreを使って並行数を制御しています:

  async def run_single(self, awaitable: Awaitable[_T]) -> _T:
      async with self.sem:
          return await awaitable

これにより、同時実行数を制御しながら複数のエージェント呼び出しを並列化できます。max_concurrency=10とすれば、最大10件を同時に処理する形になります。

実装のポイント3: エラーハンドリング

AIエージェントはツール呼び出しやLLMの応答など、予期せぬところで処理落ちする可能性があります。特にバッチ処理では、一部のリクエストが失敗しても全体の処理を継続させたいケースが多いです。

Runnerを呼び出している最中に例外発生する可能性があるため、その部分で例外をキャッチします:

  try:
      async for event in self._runner.run_async(...):
          if event.is_final_response():
              return InvocationResult.success(...)
  except Exception as e:
      return InvocationResult.failure(
          error_message=str(e),
          start_at=start_at,
      )

これにより、並列実行中の一部のタスクが失敗しても、他のタスクは影響を受けずに継続できます。実際の運用では、大量のデータを処理する中で数件の失敗は許容範囲内というケースも多いため、こういった実装は必要になってくるかなと思います。

おわりに

本記事ではADKエージェントを会話ベースではなく、バッチ処理の用途で動かしてみました。同様のアプローチは、コンテンツの一括生成やデータ拡張といった他のユースケースにも応用できそうです。
最後までご覧いただきありがとうございました!

2
2
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?