0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

第6章:AIループシステムの開発・実装プラクティス

0
Posted at

AIループシステムの開発・実装プラクティス

AIエージェントシステムを概念実証(PoC)から本番運用のプロダクトへとスケールさせる上で、最も厚い壁となるのが「エンジニアリング(開発・デバッグ・運用)」の難しさです。特にループや状態遷移を含むシステムは、状態の競合(デッドロック)、ログ追跡の複雑化、そして無限ループ時のリソース管理といった、伝統的な分散システムに共通する技術的課題を多く抱えています。

本章では、エージェントループを実装するための代表的なフレームワークの比較、非同期処理におけるスレッドセーフな状態管理設計、そして LangSmith や Arize Phoenix を用いたループシステムのデバッグとプロファイリング手法について、実践的なコードを交えて解説します。


6.1 ループ設計のためのフレームワーク・ライブラリ選定

自律エージェントやHITLループをゼロからスクラッチで実装することも可能ですが、本番環境の要件(永続化、非同期制御、可視化)を満たすためには、既存のフレームワークを活用するのが現実的です。現在、代表的な3つのフレームワークの特徴を比較します。

主要フレームワークの比較

評価軸 LangGraph (LangChainファミリー) AutoGen (Microsoft) LlamaIndex Workflows
制御モデル 状態遷移(グラフ)指向
明確なNodeとEdgeを定義する。
対話(会話)指向
エージェント同士のチャットメッセージで駆動。
イベント(Event)駆動
特定のイベント検知で関数が非同期実行される。
ループ制御 極めて厳密
無限ループ防止や遷移ルートを型レベルで固定。
半自律(動的)
エージェントが会話をどう進めるかは動的。
柔軟
イベントの配信と受け取りで動的・静的なループを両方記述可能。
状態管理 (State) 中央集権的 (State Object)
全ノードで単一のStateを共有・マージ。
分散的 (Message History)
各エージェントが自身の会話履歴を持つ。
中央集権・イベント付随
Contextオブジェクトを介した状態共有。
HITLサポート 強力
ビルトインの breakpoint 機能で一時停止/再開。
標準搭載
ユーザー入力ノードを会話の中に挟める。
カスタム実装が必要
イベントループの一時停止を自作する。
並列処理 (Map-Reduce) 強力
Sendオブジェクトによる動的並列処理。
難解
会話の並列化はオーケストレーターの自作が必要。
容易
複数のイベントを待つ join の記述がシンプル。
主なユースケース 厳格な業務フロー、承認ゲート付きHITL、Self-RAG、高信頼性のコード実行。 複数AIによるブレインストーミング、自律的なディスカッション、シミュレーション。 ドキュメント読み込みとRAG、柔軟な非同期データパイプライン。

選定のアドバイス

  • LangGraph: 金融や業務フローなど、「何が起きるか予測可能で、人間の承認(HITL)が必須なループ」 を設計する場合の最適解です。
  • AutoGen: 役割の異なるAIを複数配置し、「自律的でクリエイティブなコラボレーション(対話)をさせたい」 場合に適しています。
  • LlamaIndex Workflows: 既存のLlamaIndexによるRAG資産があり、「非同期かつイベント駆動でデータの検索・要約パイプラインを作りたい」 場合に最適です。

6.2 非同期処理と状態管理の設計パターン

エージェントシステムをWebサーバー(FastAPI等)に組み込む場合、同時に何百ものエージェントインスタンスが非同期(asyncio)で動き、状態を更新します。このとき、最も注意すべきなのが 「競合状態(Race Condition)」「状態の上書きによる消失」 です。

1. 状態競合の防止:Reducer(レデューサー)パターン

複数の並列ノードが同時に状態(State)を書き換える場合、単純な上書き(state["messages"] = new_messages)を行うと、先に完了したノードのデータが消去されてしまいます。
これを防ぐため、状態の更新は「新しいデータの追加(マージ)」または「純粋関数による結合」に限定する Reducer パターン を採用します。

Pythonによるスレッドセーフな非同期状態管理の実装例

以下は、非同期実行において asyncio.Lock を使用してスレッドセーフに状態を更新する、シンプルな Reducer 構造のカスタム実装です。

import asyncio
from typing import TypedDict, List, Dict, Any

# =====================================================================
# State の定義
# =====================================================================
class AppState(TypedDict):
    tasks: List[str]
    logs: List[str]

class ThreadSafeStateTracker:
    def __init__(self, initial_state: AppState):
        self._state = initial_state
        self._lock = asyncio.Lock()  # 非同期競合を防ぐロック

    async def get_state(self) -> AppState:
        async with self._lock:
            # 状態のディープコピーを返して、外部からの予期せぬ直接変更を防ぐ
            return {
                "tasks": list(self._state["tasks"]),
                "logs": list(self._state["logs"])
            }

    async def update_state(self, key: str, value: Any, reducer_type: str = "append"):
        """スレッドセーフにStateを更新する(Reducerパターン)"""
        async with self._lock:
            if key not in self._state:
                self._state[key] = []

            # 状態の更新ロジックを分岐(Reducer)
            if reducer_type == "append":
                if isinstance(value, list):
                    self._state[key].extend(value)
                else:
                    self._state[key].append(value)
            elif reducer_type == "overwrite":
                self._state[key] = value
                
            print(f"  [State Updated] Key: '{key}', Current Size: {len(self._state[key])}")

# =====================================================================
# 並列実行タスク(並列ノードのモック)
# =====================================================================
async def agent_task_node(node_id: int, tracker: ThreadSafeStateTracker):
    print(f"[Node {node_id}] 実行開始...")
    await asyncio.sleep(0.5)  # 非同期処理のシミュレーション
    
    # 状態の更新(Reducer: append を用いて安全にマージ)
    await tracker.update_state("tasks", f"Task completed by Node {node_id}")
    await tracker.update_state("logs", f"Log from Node {node_id} at step 1")
    print(f"[Node {node_id}] 完了")

async def main():
    # 初期状態
    init_state: AppState = {"tasks": [], "logs": []}
    tracker = ThreadSafeStateTracker(init_state)

    # 3つのノードを並列で実行し、同じStateに書き込ませる (Map-Reduce)
    print("=== 並列タスクのディスパッチ ===")
    await asyncio.gather(
        agent_task_node(1, tracker),
        agent_task_node(2, tracker),
        agent_task_node(3, tracker)
    )

    # 最終状態の確認
    final_state = await tracker.get_state()
    print("\n=== 最終状態 ===")
    print("Tasks:", final_state["tasks"])
    print("Logs Count:", len(final_state["logs"]))

if __name__ == "__main__":
    asyncio.run(main())

6.3 ループ型システムのデバッグ・プロファイリング・可視化

AIエージェントのループシステムは、開発者にとって恐るべき「ブラックボックス」になりがちです。
LLMの呼び出し、ツールの実行、エラーからの自己修復ループが何往復も繰り返されるため、コンソールログ(print デバッグ)だけでは、「どのループの、どのプロンプトが原因で精度が低下したのか」「どこで無駄なAPIトークンを消費しているのか」を特定するのはほぼ不可能です。

これを可視化し、プロファイリングするために トレース(Tracing)ツール を必ず導入します。

代表的なトレースツール

  1. LangSmith:
    • LangChain / LangGraphの開発元が提供する最高峰のデバッグプラットフォーム。グラフ全体のStateの遷移、ノードごとの入出力、LLMプロンプトの生データをGUIで完全にツリー構造として視覚化できます。
  2. Arize Phoenix / OpenInference:
    • オープンソース(OSS)でローカル実行可能なLLM監視・評価ツール。LlamaIndexやその他のカスタムフレームワークとも相性が良く、コスト(トークン)や遅延(Latency)の可視化、データ蓄積が容易です。
[ LangSmith / Phoenix トレース階層のイメージ ]

└─ Ⓜ️ Graph Run (全体の入力: "〇〇を調べて")  ➔ [Latency: 8.5s | Cost: $0.08]
   ├─ 🤖 Node: planner (プラン生成)
   ├─ 🤖 Node: agent_react (ループ 1ターン目)
   │  └─ 🛠️ Tool: web_search (引数: "東京 天気") ➔ 実行時間: 1.2s
   ├─ 🤖 Node: agent_react (ループ 2ターン目: 自己修復)
   │  └─ ⚠️ Error (FormatError) ➔ エラーメッセージ記録
   └─ 🤖 Node: generator (最終回答生成)

Pythonでトレースを紐付ける最小構成(OpenTelemetry / Phoenix の例)

ここでは、特別なフレームワークを使わない自作のループシステムであっても、デコレーターやSDKを用いて自動で実行ツリー(スパン)をトレースし、可視化サーバーへ送信するコード例を示します。

import os
import time
from functools import wraps
from phoenix.otel import register
from opentelemetry import trace

# =====================================================================
# 1. Phoenix トレースサーバーへの登録
# =====================================================================
# ローカルで `phoenix start` を実行していることを前提とします
# (あるいは環境変数 PHOENIX_CLIENT_HEADERS を指定してクラウドに送信)
register(
    project_name="my-agent-loop-project",
    endpoint="http://localhost:6006/v1/traces"
)
tracer = trace.get_tracer(__name__)

# =====================================================================
# 2. 自動トレース用デコレーターの定義
# =====================================================================
def trace_node(name: str):
    """関数の実行をトレーススパンとして記録するデコレーター"""
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            # OpenTelemetry のスパンを開始
            with tracer.start_as_current_span(name) as span:
                span.set_attribute("inputs", str(args) + str(kwargs))
                start_time = time.time()
                
                try:
                    result = func(*args, **kwargs)
                    span.set_attribute("outputs", str(result))
                    span.set_attribute("status", "SUCCESS")
                    return result
                except Exception as e:
                    span.set_attribute("status", "ERROR")
                    span.set_attribute("error.message", str(e))
                    raise e
                finally:
                    span.set_attribute("latency_sec", time.time() - start_time)
        return wrapper
    return decorator

> [!IMPORTANT]
> 上記の `trace_node` デコレーターは**同期関数用**の実装です6.2節で扱ったような非同期関数`async def`をデコレートする場合はラッパー関数自体も `async def wrapper` として定義し内部の実行を `await func(*args, **kwargs)` と呼び出す非同期用のデコレーターを別途作成する必要があります

# =====================================================================
# トレース対象のエージェント処理
# =====================================================================
@trace_node("Tool: DatabaseQuery")
def run_db_tool(query: str) -> str:
    time.sleep(0.3)
    return "User: Alice, Status: Active"

@trace_node("Node: LLM_Reasoning")
def run_llm_reasoning(state: dict) -> dict:
    time.sleep(0.5)
    # LLMの思考処理
    return {"thought": "データベースを調べる必要がある", "action": "run_db_tool"}

@trace_node("AgentLoop")
def run_agent_loop(input_text: str):
    state = {"input": input_text, "step": 0}
    print(f"トレース開始: '{input_text}'")
    
    # 思考 -> ツール実行 の1ループをトレース
    reasoning_result = run_llm_reasoning(state)
    
    if reasoning_result["action"] == "run_db_tool":
        tool_result = run_db_tool("SELECT * FROM users")
        state["db_result"] = tool_result
        
    state["step"] += 1
    print("トレース終了。Phoenixのダッシュボード(http://localhost:6006)で可視化されます。")

if __name__ == "__main__":
    run_agent_loop("アクティブユーザーをリストアップして")

プロファイリング時に見るべき3つのボトルネック

  1. 遅延(Latency)のホットスポット:
    • どのツール実行や、どのLLM呼び出し(特に出力トークン数が多いもの)がレスポンスの大部分を占めているかを特定し、プロンプトの軽量化やストリーミング処理を導入します。
  2. ループの収束パターン:
    • トレース上で「同じ状態のメッセージ」が何度も往復していないか確認します。収束しにくいループには、Few-shotのプロンプト例を追加してLLMをガイドします。
  3. トークンコストの累積:
    • ループの深化に伴ってコンテキスト(チャット履歴)が累積され、後半のLLM呼び出しコストが倍増していないか。必要に応じて「過去ログの要約(Summarization)」や「古いメッセージの切り捨て(Sliding Window)」をエッジの遷移条件に挟みます。
0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?