はじめに
Azure Functions で AI エージェントを動かしたいとき、「会話の続きを覚えておきたい」「複数のステップを順番に実行させたい」という要件にすぐぶつかります。
通常の Azure Functions はリクエストごとにステートレスに動くため、これらの要件を満たすには外部ストアやキューの設計が必要になります。
そこで登場するのが Durable Functions と Agent Framework の組み合わせです。
この記事では、Durable Functions の基本から、AI エージェントとの連携方法、そして「やってみたら想定と違った」落とし穴まで、実際のコードを交えて解説します。
コードは以下のリポジトリに格納しています。
stateful-ai-agents-with-azure-durable-functions
※本記事はあくまで私が実験的に行った範囲での検証のため、もし不備や誤りがあれば、是非ともご指摘お願いします。
1. Durable Functions とは?
Azure Functions は、イベントに応じてコードを実行するサーバーレスプラットフォームです。
しかし通常の Azure Functions には ステート(状態)を持てない という制約があります。各関数はリクエストを受け取り、処理して、終了する。それだけです。
Durable Functions はこの制約を取り払った拡張機能です。
関数が「途中で止まって待つ」「別の関数の結果を受け取って続きを実行する」「状態を長期間保持する」といった、ワークフロー的な動きができるようになります。
通常の Azure Functions
リクエスト → 処理 → レスポンス(終わり)
Durable Functions
リクエスト → 処理A → [待機] → 処理B → [待機] → 処理C → レスポンス
↑ここで止まっていても課金は最小限
Durable Functions は、役割の異なる 3 種類の関数で構成されます。
オーケストレーター(Orchestrator)
ワークフロー全体の「司令塔」です。
処理の順番や分岐を制御します。Python では yield を使ったジェネレータ関数として記述します。
def my_orchestrator(context: df.DurableOrchestrationContext):
# アクティビティを順番に呼び出す
result_a = yield context.call_activity("task_a", "input")
result_b = yield context.call_activity("task_b", result_a)
return result_b
⚠️ 重要な制約: オーケストレーターは「決定論的」なコードしか書けません。
datetime.now()やrandom()の直接呼び出し、外部 API の直接呼び出しは禁止です。これらは必ずアクティビティ経由で行います。
理由は、Durable Functions がクラッシュ復旧時にオーケストレーターを**リプレイ(再実行)**するためです。毎回同じ結果にならないとワークフローが壊れてしまいます。
アクティビティ(Activity)
実際の「処理単位」です。API 呼び出し、DB 操作、計算など、副作用を伴う処理はここに書きます。
アクティビティはステートレスで、通常の Python 関数として記述します。
async def task_a(input: str) -> str:
# DB 操作や外部 API 呼び出しなど
result = await some_external_api(input)
return result
エンティティ(Entity)
「状態を持つオブジェクト」です。
カウンター、セッション情報、フラグなど、複数のオーケストレーターから読み書きされるデータを管理します。状態は Azure Storage に永続化されます。
def MyCounter(context: df.DurableEntityContext):
current_value: int = context.get_state(lambda: 0)
operation = context.operation_name
if operation == "increment":
current_value += 1
elif operation == "get":
context.set_result(current_value)
context.set_state(current_value)
3 つの構成要素がどのように連携するかを図で示します。
[クライアント (HTTP)]
│
│ start_new()
▼
[オーケストレーター] ←── ワークフローの制御
│
├─ yield call_activity("task_a") ──▶ [アクティビティ A] ← 実際の処理
│
├─ yield call_activity("task_b") ──▶ [アクティビティ B]
│
├─ yield call_entity(entity_id, "get") ──▶ [エンティティ] ← 状態管理
│
└─ yield agent.run(...) ──▶ [DurableAIAgent] ← AI エージェント (後述)
主な用途としては以下のようなパターンがあります。
| パターン | 説明 |
|---|---|
| ファンアウト / ファンイン | 複数のタスクを並列実行して結果をまとめる |
| 人間承認待ち (Human Interaction) | 承認メールを送って、返信があるまで待機する |
| モニタリング | 定期的にチェックして条件が満たされたら次の処理へ |
| サガパターン | 分散トランザクションの補償処理 |
| AI エージェントの会話管理 | セッション付きの AI との対話 ← 本記事のメインテーマ |
また、Durable Functions は 消費量プラン で動作します。
| 課金対象 | 内容 |
|---|---|
| 実行時間 | 関数が実際に動いている時間 × メモリ量 |
| 実行回数 | 関数の呼び出し回数 |
| Azure Storage | オーケストレーターとエンティティの状態保存に使用 |
ポイント: オーケストレーターが yield で待機している間は「アイドル状態」となり、実行時間の課金がほぼ発生しません。
数時間・数日にわたる長時間ワークフローでもコスト効率よく運用できます。
通常の Azure Functions(Premiumプランなど)で同じことをスリープ/ループで実現しようとすると、待機時間も課金されてしまいます。
2. Agent Framework を使ったエージェント呼び出し
Durable Functions から AI エージェントを呼び出すには、Agent Framework の専用パッケージを使います。
# requirements.txt
azure-functions
agent-framework-core==1.0.0b260128
agent-framework-azurefunctions==1.0.0b260128
agent-framework-durabletask==1.0.0b260130
| パッケージ | 役割 |
|---|---|
agent-framework-core |
ChatAgent など AI エージェントのコア定義 |
agent-framework-azurefunctions |
Azure Functions と統合するための AgentFunctionApp
|
agent-framework-durabletask |
Durable Functions のオーケストレーターと連携する DurableAIAgent / DurableAgentThread
|
agent-framework-durabletask が今回のキーパッケージです。これを使うことで、通常の ChatAgent をそのまま Durable Functions から yield で呼び出せるようになります。
function_app.py でアプリを初期化します。通常の FunctionApp の代わりに AgentFunctionApp を使うのがポイントです。
# function_app.py
import os
from agent_framework.azure import AgentFunctionApp
from applications.apps.operator_app import create_operator_app
from agents.core import create_agents
def main():
agents = create_agents() # ChatAgent のリストを生成
_app = AgentFunctionApp(
agents=agents, # ← ここでエージェントを登録
)
_app = create_operator_app(_app) # HTTP トリガーなどを追加
return _app
app = main()
AgentFunctionApp は渡された ChatAgent のリストを受け取り、内部で DurableAIAgent ラッパーを生成・管理します。
登録されたエージェントはオーケストレーター内で app.get_agent(context, "AgentName") によって取り出せます。
# オーケストレーター内での取得例
@app.orchestration_trigger(context_name="context")
def my_orchestration(context: df.DurableOrchestrationContext):
agent = app.get_agent(context, "OperatorAgent") # 名前で取得
response = yield agent.run(messages="こんにちは", thread=...)
主要クラスとそれぞれの役割
それぞれ主要クラスの役割を説明します。
DurableAIAgent
ChatAgent を Durable Functions のオーケストレーターで使えるようにラップしたクラスです。
オーケストレーターは yield ベースのジェネレータ関数なので、通常の await は使えません。
DurableAIAgent はこの制約に対応し、agent.run() を yield で呼び出せる形に変換してくれます。
通常の ChatAgent
response = await agent.run(messages) ← オーケストレーターでは使えない
DurableAIAgent
response = yield agent.run(messages, thread=thread) ← ✅ オーケストレーターで使える
DurableAgentThread
会話セッションを識別するスレッドオブジェクトです。
AgentSessionId(name, key) の 2 つの値でセッションを一意に特定します。
| メソッド | 用途 |
|---|---|
agent.get_new_thread() |
初回会話用の新しいスレッドを生成 |
DurableAgentThread.from_session_id(session_id) |
保存済みの session_id から既存スレッドを復元 |
このスレッドを使い回すことで、過去の会話履歴が引き継がれた状態でエージェントに返答させることができます。
実際にオーケストレーターでエージェントを呼び出すコードを示します。
以下が最小パターンでの呼び出しとなります。
from agent_framework import AgentResponse, ChatMessage
from agent_framework_durabletask import AgentSessionId, DurableAIAgent, DurableAgentThread
# ① 初回: 新しいスレッドを作成して会話開始
new_thread = operator_agent.get_new_thread()
# AgentSessionId を保存しておく(エンティティなどで永続化)
saved_session_id = AgentSessionId(
name=new_thread.session_id.name,
key=new_thread.session_id.key,
)
response: AgentResponse = yield operator_agent.run(
messages="はじめまして。何かお手伝いできますか?",
thread=new_thread,
)
# ② 2回目以降: 保存した session_id でスレッドを復元
thread = DurableAgentThread.from_session_id(
AgentSessionId(name=saved_session_id.name, key=saved_session_id.key)
)
response: AgentResponse = yield operator_agent.run(
messages=ChatMessage(role="user", text="続きを教えてください"),
thread=thread, # 同じスレッドを渡すことで会話履歴が引き継がれる
)
# レスポンスにはテキスト・usage・タイムスタンプが含まれる
print(response.text)
print(response.usage_details)
print(response.created_at)
エージェント自体は通常の ChatAgent として定義します。Durable Functions 固有のコードは一切不要です。
# agents/operator_agent.py
from agent_framework import ChatAgent
from agent_framework.openai import OpenAIChatClient
from openai import AsyncOpenAI
def create_agent(llm_params: dict) -> ChatAgent:
return ChatAgent(
chat_client=OpenAIChatClient(
model_id=llm_params.get("model_id"),
async_client=AsyncOpenAI(
base_url=llm_params.get("llm_endpoint"),
api_key=llm_params.get("api_key"),
default_query={"api-version": "2025-01-01-preview"},
),
),
name="OperatorAgent", # ← AgentFunctionApp への登録名
instructions="...", # エージェントへの指示文
)
name に指定した文字列が app.get_agent(context, "OperatorAgent") の引数に対応します。
全体の登録フローとしては以下となります。
ChatAgent を定義
↓
create_agents() でリスト化
↓
AgentFunctionApp(agents=agents) に登録
↓ (内部で自動的に)
DurableAIAgent ラッパーが生成される
↓
オーケストレーター内で app.get_agent(context, "名前") で取得
↓
yield agent.run(messages, thread) で呼び出し
↓
AgentResponse を受け取る
3. ユースケース: 会話メモリを持つオペレーターの実装
Section 2 で説明した仕組みを使って、実際に「会話を記憶するオペレーターエージェント」を作ってみます。
ここでは durable_functions_1 のコードをすべて引用しながら、各ピースがどう組み合わさるかを解説します。
実現したいこと
- ユーザーが HTTP リクエストを送るたびに、同じユーザーとの過去の会話を引き継いで返答する
- ユーザーが会話を終わらせようとしたとき(「ありがとう」「さようなら」など)、エージェントは
byeを含む返答をしてセッションを終了する - 同じユーザーが次に話しかけるときは、新しいセッションで最初から始まる
- 無限ループ防止のため、1 セッションあたり最大 5ターンまで
全体アーキテクチャ
[HTTP POST /operator/operator_orchestration]
│
│ { user_id, input, timestamp }
▼
[HTTPトリガー] ─── start_new("operator_orchestration", input) ───▶ [オーケストレーター]
│
┌───────────────┼───────────────┐
│ │ │
▼ ▼ ▼
[Session エンティティ] [OperatorAgent] [check_response]
(状態の永続化) (LLM 呼び出し) (終了判定)
┌──────────┐
│session_id│ ← DurableAgentThread の識別子
│iteration │ ← 何ターン目か
└──────────┘
データモデルの定義 (schemas/session.py)
まずリクエストとセッション状態を表す Pydantic モデルを定義します。
# schemas/session.py
from pydantic import BaseModel
class UserContext(BaseModel):
user_id: str # セッションの識別キー (エンティティの key に使う)
input: str # ユーザーからのメッセージ
timestamp: str # 現在日時 (オーケストレーターは datetime.now() が使えないため外から渡す)
class SessionId(BaseModel):
name: str # AgentSessionId.name
key: str # AgentSessionId.key
class SessionEntity(BaseModel):
session_id: SessionId | None = None # None = セッション未開始
timestamp を外から渡しているのは、オーケストレーターの決定論的コード制約に対応するためです。
オーケストレーター内で datetime.now() を直接呼ぶと、リプレイ時に値が変わってしまうためリクエスト時点の時刻をクライアントから渡します。
エンティティ: セッションの "記憶" (entities/session.py)
エンティティはユーザーごとの状態を永続化します。user_id をキーとして、一人のユーザーに一つのエンティティインスタンスが対応します。
# entities/session.py
import azure.durable_functions as df
def Session(context: df.DurableEntityContext):
# 初期状態: session_id なし、iteration = 0
current_value: dict = context.get_state(lambda: {"session_id": None, "iteration": 0})
operation = context.operation_name
if operation == "update":
# 新しい session_id を保存
input = context.get_input()
current_value.update(input)
elif operation == "deactivate":
# セッション終了: 状態をリセット
current_value["session_id"] = None
current_value["iteration"] = 0
elif operation == "increment_iteration":
# ターン数を +1 し、上限に達したら False を返す
max_iter = int(context.get_input())
current_value["iteration"] += 1
if current_value["iteration"] >= max_iter:
context.set_result(False) # 上限超え → セッション終了
else:
context.set_result(True) # まだ継続可能
elif operation == "get":
context.set_result(current_value)
context.set_state(current_value)
エンティティが持つ 4 つのオペレーション:
| オペレーション | 呼び出しタイミング | 処理 |
|---|---|---|
get |
オーケストレーター開始直後 | 現在のセッション状態を返す |
update |
初回メッセージ送信後 | 生成した session_id を保存 |
increment_iteration |
エージェント返答後 | ターン数を +1。上限(5)に達したら False |
deactivate |
会話終了時 | session_id と iteration をリセット |
エージェントの定義 (agents/operator_agent.py)
エージェントへの指示文(instructions)が会話の振る舞いを決めます。
# agents/operator_agent.py
from agent_framework import ChatAgent
from agent_framework.openai import OpenAIChatClient
from openai import AsyncOpenAI
INSTRUCTIONS = """あなたはユーザーから送信されたメッセージに基づいて、適切な応答を生成する役割を持っています。
以下のSTEPに従って応答を生成してください。
1. ユーザーメッセージと過去の会話履歴を確認する
3. 適切な応答の生成
- ユーザーが単に問いかけに応じたり、他に質問や要求がある場合は、適切な応答を生成する
- ユーザーが他に質問や要求がないと判断された場合は、"bye"というフレーズを必ず含める
### 注意事項
- 応答は常に礼儀正しく、プロフェッショナルなトーンで行ってください
- 応答は1,2文程度の簡潔な文章にしてください
"""
def create_agent(llm_params: dict) -> ChatAgent:
return ChatAgent(
chat_client=OpenAIChatClient(
model_id=llm_params.get("model_id"),
async_client=AsyncOpenAI(
base_url=llm_params.get("llm_endpoint"),
api_key=llm_params.get("api_key"),
default_query={"api-version": "2025-01-01-preview"},
),
),
name="OperatorAgent",
instructions=INSTRUCTIONS,
)
終了判定のポイントは instructions の中にあります。
「ユーザーがもう用がないと判断したら bye を含める」と指示することで、エージェント自身が会話の終了を宣言できます。
オーケストレーター本体 (orchestrators/operator_orchestrator.py)
ここが処理の核心です。セッションの有無で挙動を分岐し、エージェント呼び出し→終了判定→イテレーション管理を行います。
# applications/orchestrators/operator_orchestrator.py
import logging
from agent_framework import AgentResponse, ChatMessage
from agent_framework_durabletask import AgentSessionId, DurableAIAgent, DurableAgentThread
import azure.durable_functions as df
from schemas.session import UserContext, SessionEntity, SessionId
MAX_ITERATION = 5
def main(context: df.DurableOrchestrationContext, agents: dict[str, DurableAIAgent]):
user_ctx = UserContext.model_validate(context.get_input())
user_input = user_ctx.input
# ① エンティティからセッション状態を取得
session_entity_id = df.EntityId("Session", user_ctx.user_id)
session_raw = yield context.call_entity(session_entity_id, "get")
session = SessionEntity.model_validate(session_raw)
operator_agent = agents["operator_agent"]
if session.session_id:
# ② 継続会話: 保存済み session_id でスレッドを復元
thread = DurableAgentThread.from_session_id(
AgentSessionId(name=session.session_id.name, key=session.session_id.key)
)
response: AgentResponse = yield operator_agent.run(
messages=ChatMessage(role="user", text=user_input),
thread=thread,
)
else:
# ③ 初回会話: 新しいスレッドを作成し、エンティティに session_id を保存
new_thread = operator_agent.get_new_thread()
session.session_id = SessionId(
name=new_thread.session_id.name,
key=new_thread.session_id.key
)
context.signal_entity(session_entity_id, "update", session.model_dump())
response: AgentResponse = yield operator_agent.run(
messages=f"ユーザーからのメッセージに対して適切に応答してください。\nArgument: {{'user_input': {user_input!r}, 'today': {user_ctx.timestamp!r}}}",
thread=new_thread,
)
logging.info(f"OperatorAgent response: {response.to_dict()}")
# ④ 終了判定: "bye" が含まれていればセッションを破棄
checks = yield context.call_activity("check_response", response.text)
if "is_completed" in checks:
context.signal_entity(session_entity_id, "deactivate")
return response.text
# ⑤ イテレーション上限チェック: 5回を超えたら強制終了
succeeded = yield context.call_entity(session_entity_id, "increment_iteration", MAX_ITERATION)
if not succeeded:
context.signal_entity(session_entity_id, "deactivate")
return response.text
return response.text
フローを番号に対応する形で整理します。
① call_entity("get") → セッション確認
│
├─ session_id あり → ② from_session_id() でスレッド復元 → agent.run()
│
└─ session_id なし → ③ get_new_thread() → signal_entity("update") → agent.run()
│
▼
④ call_activity("check_response")
│
├─ "bye" あり → signal_entity("deactivate") → return
│
└─ "bye" なし → ⑤ call_entity("increment_iteration")
│
├─ False (上限超) → signal_entity("deactivate") → return
└─ True (継続可) → return (次のリクエストを待つ)
アクティビティ: 終了判定 (activities/operator_activities.py)
終了判定はシンプルな文字列チェックです。
# activities/operator_activities.py
async def check_response(text: str) -> dict:
if "bye" in text.strip().lower():
return {"is_completed": True}
return {}
アクティビティに切り出しているのは、オーケストレーターは副作用のある処理を直接書けないというルールに従うためです。
将来的にここを「LLM で判定する」などに差し替えることもできます。
関数の登録 (apps/operator_app.py)
各トリガーをデコレーターで登録します。
# applications/apps/operator_app.py
def create_operator_app(app: AgentFunctionApp):
# HTTP エンドポイント: オーケストレーターを起動する
@app.route(route="operator/{function_name}")
@app.durable_client_input(client_name="client")
async def operator_http_start(req: func.HttpRequest, client: df.DurableOrchestrationClient):
function_name = req.route_params.get("function_name")
instance_id = await client.start_new(
orchestration_function_name=function_name,
client_input=req.get_json()
)
return client.create_check_status_response(req, instance_id)
# オーケストレーター本体の登録
@app.orchestration_trigger(context_name="context")
def operator_orchestration(context: df.DurableOrchestrationContext):
operator_agent = app.get_agent(context, "OperatorAgent") # エージェントを取得
return (yield from operator_orchestrator.main(context, agents={
"operator_agent": operator_agent,
}))
# アクティビティの登録
@app.activity_trigger(input_name="text")
async def check_response(text: str):
return await operator_activities.check_response(text)
# エンティティの登録
@app.entity_trigger(context_name="context", entity_name="Session")
def Session(context: df.DurableEntityContext):
return session.Session(context)
return app
デコレーターの対応関係:
| デコレーター | 用途 |
|---|---|
@app.route + @app.durable_client_input
|
HTTP トリガー。DurableOrchestrationClient を受け取ってオーケストレーターを起動 |
@app.orchestration_trigger |
オーケストレーター本体の登録 |
@app.activity_trigger |
アクティビティの登録 |
@app.entity_trigger |
エンティティの登録 |
実際の会話シナリオ
具体的にどのような流れで会話が進むかを示します。
[1回目のリクエスト]
POST /operator/operator_orchestration
{ "user_id": "user_123", "input": "最近の機械学習トレンドを教えてください", "timestamp": "..." }
→ エンティティ: session_id = null → 初回会話
→ get_new_thread() → session_id を保存
→ エージェント: "最近は大規模言語モデルが注目されています。..."
→ check_response: bye なし → increment_iteration (iteration=1)
[2回目のリクエスト]
POST /operator/operator_orchestration
{ "user_id": "user_123", "input": "もう少し詳しく教えてください", "timestamp": "..." }
→ エンティティ: session_id = xxx → 継続会話
→ from_session_id(xxx) → 前回の会話履歴を引き継いで実行
→ エージェント: "先ほどお伝えした通り、LLM は..." ← 前の話題を覚えている!
→ check_response: bye なし → increment_iteration (iteration=2)
[3回目のリクエスト]
POST /operator/operator_orchestration
{ "user_id": "user_123", "input": "ありがとう、わかりました", "timestamp": "..." }
→ エンティティ: session_id = xxx → 継続会話
→ エージェント: "お役に立てて光栄です。またいつでもどうぞ。bye"
→ check_response: "bye" 検出 → is_completed = True
→ signal_entity("deactivate") → session_id = null にリセット
4. Agent Framework の Workflow は Durable Functions と相性が悪い
Section 3 では ChatAgent 単体との組み合わせがうまく動く例を紹介しました。
では「複数のエージェントが協調して動く Workflow(グループチャットやハンドオフ)も Durable Functions で使えるのでは?」と考えるのは自然な発想です。
結論から言うと、現時点では相性が良くありません。
その理由を技術的な観点から順を追って説明します。
Agent Framework の Workflow とは
Agent Framework には ChatAgent 一体での動作だけでなく、複数エージェントが会話しながら協調する仕組みが用意されています。
| 仕組み | 概要 |
|---|---|
| GroupChat | 複数の ChatAgent が順番に発言し合ってタスクを解決する |
| Handoff | あるエージェントが「このタスクは別のエージェントに任せる」と引き継ぐ |
# GroupChat の例 (Agent Framework)
workflow = (
GroupChatBuilder()
.participants([researcher, writer])
.with_orchestrator(agent=orchestrator_agent)
.with_termination_condition(lambda messages: len(messages) >= 3)
.build()
)
これらは asyncio ベースのネイティブ非同期関数として実装されており、await で呼び出すことを前提としています。
Durable Functions のオーケストレーターには決定的な制約があります。
オーケストレーターは
yieldベースのジェネレータ関数でなければならない。awaitは使えない。
これは Durable Functions のリプレイ機構に起因します。オーケストレーターはクラッシュ後や関数インスタンスのスケールアウト時に「最初から再実行」されます。このとき yield の再生ログを使って状態を復元するため、asyncio の非同期コルーチンとは根本的に実行モデルが異なります。
# ❌ オーケストレーターで await は使えない
def my_orchestrator(context: df.DurableOrchestrationContext):
result = await workflow.run(messages="...") # SyntaxError or 動作不定
# ✅ yield だけが使える
def my_orchestrator(context: df.DurableOrchestrationContext):
result = yield context.call_activity("run_workflow", "...") # OK
「では Workflow の呼び出しを Activity に包めばいいのでは?」という回避策があります。
Activity は通常の async def 関数なので await が使えます。
# Activity から Workflow を呼び出す例
@app.activity_trigger(input_name="user_input")
async def run_workflow_activity(user_input: str) -> str:
response = await workflow.run(messages=user_input) # ← await できる
return response.text
しかしこの方法には、重大な制限が伴います。
| 制限 | 内容 |
|---|---|
| ステートレス | Activity はリクエストごとに使い捨てです。エンティティを使ったセッション管理と組み合わせることはできません。Workflow 内部の会話履歴は Activity の実行が終わると消えます。 |
| エンティティとの連携不可 | オーケストレーター上でしかエンティティのシグナル送信が行えないため、Activity 内から Workflow の状態をエンティティに保存する形がとれません。 |
| 途中経過が見えない | Workflow が内部で複数ターン会話していても、Activity の外から観察・介入する手段がありません。 |
結果として、「Durable Functions の強み(ステート管理・エンティティ)」と「Workflow の強み(マルチエージェント協調)」を同時に活用することができない構成になってしまいます。
もう一つの回避策として、Workflow を as_agent() で ChatAgent 互換のエージェントとしてラップし、AgentFunctionApp に登録する方法があります。
# Workflow を DurableAgent として登録
workflow = GroupChatBuilder()...build()
workflow_agent = workflow.as_agent(name="ResearchWorkflowAgent")
# AgentFunctionApp に通常のエージェントと同様に登録
app = AgentFunctionApp(agents=[workflow_agent])
# オーケストレーターで yield を使って呼び出せる
research_agent = app.get_agent(context, "ResearchWorkflowAgent")
response = yield research_agent.run(messages=user_input, thread=thread)
構文上はエラーが出ず、動きます。
しかし次の Section で実際の実行結果を見ると、期待通りの出力が得られないことがわかります。
as_agent() によるラップで表面上の不整合は解消されますが、
Workflow 内部の実行が「Durable Functions のオーケストレーションの管理外」になるため、
usage の集計やレスポンスの構造に問題が生じてしまうというわけですね。
Durable Functions のオーケストレーター
├─ 実行モデル: yield ベースのジェネレータ(同期的な見た目)
└─ 強み: ステート管理・長時間実行・リプレイ耐性
Agent Framework の Workflow (GroupChat / Handoff)
├─ 実行モデル: asyncio ベースのコルーチン(非同期)
└─ 強み: 複数エージェントの協調・動的な発話順序の決定
↑ この 2 つの実行モデルが根本的に噛み合わない
5. 実証: Workflow as Agent の問題を確認する
durable_functions_2 では実際に GroupChat を as_agent() で DurableAgent として登録し、オーケストレーターから呼び出す構成を試しました。
ここではそのコードと実行結果を示し、Section 4 で説明した問題が実際に発生することを確認します。
構成の概要
durable_functions_2 は durable_functions_1 に以下を追加した構成です。
durable_functions_2/
├── agents/
│ ├── operator_agent.py # Section 3 と同じ ChatAgent
│ └── research_workflow_agent.py # ← 新規: GroupChat を as_agent() でラップ
├── applications/
│ └── orchestrators/
│ ├── operator_orchestrator.py
│ └── research_workflow_orchestrator.py # ← 新規: Workflow 呼び出しオーケストレーター
└── test.http # ← 実行結果のコメントあり
GroupChat → as_agent() のコード
3 つのエージェント(Researcher・Writer・Orchestrator)で構成される GroupChat を作り、それを as_agent() で DurableAgent 互換にラップしています。
# agents/research_workflow_agent.py
from agent_framework import ChatAgent, GroupChatBuilder
def create_agent(llm_params: dict):
# 情報収集担当
researcher = ChatAgent(
...,
name="Researcher",
instructions="Gather concise facts that help answer the question. Be brief and factual.",
)
# 回答生成担当
writer = ChatAgent(
...,
name="Writer",
instructions="Compose clear, structured answers using any notes provided. Be comprehensive.",
)
# 発話順序を決める司令塔
orchestrator_agent = ChatAgent(
...,
name="Orchestrator",
instructions="""
You coordinate a team conversation to solve the user's task.
- Start with Researcher to gather information
- Then have Writer synthesize the final answer
- Only finish after both have contributed meaningfully
""",
)
# GroupChat を構築
workflow = (
GroupChatBuilder()
.participants([researcher, writer])
.with_termination_condition(
# アシスタントの発言が 3 回以上になったら終了
lambda messages: sum(
1 for msg in messages
if getattr(msg.role, "value", msg.role) == "assistant"
) >= 3
)
.with_orchestrator(agent=orchestrator_agent)
.build()
)
# ← ここで ChatAgent 互換のエージェントとしてラップ
return workflow.as_agent(name="ResearchWorkflowAgent")
as_agent() の呼び出しにより、GroupChat が ChatAgent と同じインターフェースを持つオブジェクトになります。
これで AgentFunctionApp への登録と、オーケストレーターからの yield 呼び出しが構文上は可能になります。
オーケストレーター側のコード
operator_orchestrator.py と同じ構造で、エージェントを ResearchWorkflowAgent に変えただけです。
# applications/orchestrators/research_workflow_orchestrator.py
MAX_ITERATION = 5
def main(context: df.DurableOrchestrationContext, agents: dict[str, DurableAIAgent]):
user_ctx = UserContext.model_validate(context.get_input())
session_entity_id = df.EntityId("Session", user_ctx.user_id)
session_raw = yield context.call_entity(session_entity_id, "get")
session = SessionEntity.model_validate(session_raw)
research_workflow_agent = agents["research_workflow_agent"]
if session.session_id:
thread = DurableAgentThread.from_session_id(...)
response: AgentResponse = yield research_workflow_agent.run(
messages=ChatMessage(role="user", text=user_ctx.input),
thread=thread,
)
else:
new_thread = research_workflow_agent.get_new_thread()
...
response: AgentResponse = yield research_workflow_agent.run(
messages=str(extra_fixed_params),
thread=new_thread,
)
return {
"response": response.text,
"usage_details": response.usage_details, # ← ここが問題
"created_at": response.created_at,
}
実行結果: 何が起きたか
実際にリクエストを送って得られた結果が test.http にコメントとして残っています。
リクエスト
POST http://localhost:7074/api/operator/research_workflow_orchestration
Content-Type: application/json
{
"user_id": "test_user",
"input": "What are the key benefits of async/await in Python?",
"timestamp": "2026-03-04T00:49:49+09:00"
}
期待していたレスポンス
{
"response": "async/await の主なメリットは... (Writer が生成した最終回答のみ)",
"usage_details": {
"prompt_tokens": 1234,
"completion_tokens": 567
},
"created_at": "2026-03-03T15:50:20.009375Z"
}
実際に得られたレスポンス
{
"response": "{'user_input': 'What are the key benefits of async/await in Python?', 'today': '2026-03-04T00:49:49+09:00'}- Readable, sequential-style code for asynchronous operations: async/await lets you write nonblocking code...\n\nLimitations (brief): mainly benefits I/O-bound work...\n\nShort answer\n- async/await lets you write non‑blocking asynchronous code...\n\nKey benefits\n- Readability and maintainability...\n- Efficient concurrency for I/O‑bound work...\n\nI provided a concise explanation... Would you like an example or deeper dive into any area?",
"usage_details": null,
"created_at": "2026-03-03T15:50:20.009375Z"
}
実行結果から 2 つの明確な問題が確認できます。
問題① usage_details が null
"usage_details": null
GroupChat は内部で複数のエージェント(Researcher・Writer・Orchestrator)が順に LLM を呼び出します。
それぞれの LLM 呼び出しで発生した token 使用量は個別には存在しますが、as_agent() でラップされた時点で 最終的な AgentResponse に usage を集約する仕組みがないため、null になってしまいます。
usage を正確に把握できないと、コスト管理やデバッグが困難になります。
問題② 複数エージェントのレスポンスが一つの文字列に結合されている
レスポンスの response フィールドを見ると:
[入力パラメータ辞書の文字列] ← そのままレスポンスに混入
[Researcher の発言] ← 情報収集の箇条書き
[Writer の発言] ← 構造化された回答
[Orchestrator の締めの発言] ← "Would you like an example...?"
これらがすべて改行で連結された単一の文字列として返ってきます。
本来欲しいのは Writer が生成した最終回答だけですが、GroupChat 内部の全発話が結合されてしまうため:
- どこまでが Researcher の発言でどこからが Writer の発言か判別できない
- 一番最後の Orchestrator の「何か質問はありますか?」が含まれてしまう
- 入力パラメータの辞書文字列まで混入している
durable_functions_1 との比較
同じ質問を durable_functions_1 の OperatorAgent(単純な ChatAgent)で処理した場合との比較です。
| 項目 | durable_functions_1 (ChatAgent) | durable_functions_2 (Workflow as Agent) |
|---|---|---|
response |
エージェントの返答のみ | 全エージェントの発話が連結された長い文字列 |
usage_details |
token 数が正確に取得できる | null |
| レスポンスの扱いやすさ | そのまま利用できる | パースや後処理が必要 |
| デバッグのしやすさ | どのエージェントが何を答えたか追跡しやすい | 追跡困難 |
代替案: GroupChat を使いたい場合
GroupChat や Handoff を使いたい場合は、Durable Functions の外で実行することを推奨します。
# ✅ Azure Functions 外(独立したサービス or FastAPI など)での利用
workflow = GroupChatBuilder()...build()
async def handle_request(user_input: str) -> str:
response = await workflow.run(messages=user_input)
return response.text # 各エージェントの usage も取れる
# Durable Functions からは HTTP で呼び出す
# @app.activity_trigger でこのサービスに HTTP リクエストを投げる形にする
Durable Functions 側はオーケストレーション(呼び出しの順序・リトライ・タイムアウト)を担当し、
Agent Framework の Workflow はステートレスなサービスとして外出しするのが現実的な設計です。
Note: この設計を採用する場合、Durable Functions のエンティティに頼っていたセッション管理(
session_idの永続化・会話履歴の引き継ぎ)は外部サービス側でも別途実装する必要があります。セッション ID の永続化には Cosmos DB や Redis などの外部ストアを検討してください。
6. Durable Functions と Agent Framework の棲み分け
ここまでの内容を踏まえ、「どのユースケースでどちらを使うべきか」を整理します。
Durable Functions はワークフローの「器」であり、オーケストレーション・ステート管理・長時間実行が得意です。
一方、Agent Framework (Workflow) はマルチエージェントの「頭脳」であり、複数エージェントの協調・動的な発話制御を得意としています。
この 2 つは役割が違うので、混ぜて使う場合は、役割分担を明確にする必要があると思っています。
例えば、Agent Frameworkは逐次処理も並列処理もワークフローとしてサポートしていますが、わざわざそれをAgent Frameworkのワークフローとして組む必要はなく、Durable Functionsの設計によって達成することができます。
両者の特徴と強みを考えると、それぞれで組むべきことは以下のように整理できます。
Durable Functions で組むべきこと
逐次処理
A → B → C と順番にアクティビティを実行する
並列処理
[A, B, C] を同時に実行して結果をまとめる
ステートフル会話
エンティティ + DurableAgentThread でセッション管理
長時間ワークフロー
承認待ち、ポーリング、定期チェック
Durable Functions で組むべきでないこと
GroupChat(複数エージェントの協調会話)
→ usage が取れない、レスポンスが連結される (Section 5)
Handoff(エージェントの動的引き継ぎ)
→ asyncio ベースの実行モデルと不整合
複雑なマルチエージェントワークフロー
→ Workflow as Agent の問題がすべて当てはまる
ユースケース別に整理するとすると以下のようなイメージです。
| ユースケース | 推奨 | 理由 |
|---|---|---|
| ステートフルな 1 対 1 会話 | Durable Functions + ChatAgent (DurableAgent) | エンティティでセッション管理、スレッドで会話継続。Section 3 の構成がそのまま使える |
| 複数ステップの逐次処理 (A → B → C) | Durable Functions | アクティビティの逐次 yield で直感的に記述できる |
| 並列タスクのファンアウト / ファンイン | Durable Functions |
context.task_all([...]) で並列アクティビティを実行し、全結果をまとめられる |
| リトライ・タイムアウト付きの堅牢な処理 | Durable Functions | 組み込みのリトライポリシー・外部イベント待ち (wait_for_external_event) が使える |
| 長時間実行(数時間〜数日) | Durable Functions | 待機中の課金がほぼゼロ。人間の承認待ちなどにも対応 |
| グループチャット(複数エージェントの協調) | Agent Framework 単独 または Azure Functions 外 | Durable Functions と組み合わせると Section 4・5 の問題が発生する |
| ハンドオフ(エージェントの動的な引き継ぎ) | Agent Framework 単独 または Azure Functions 外 | 同上 |
| 評価・実験用マルチエージェント | Agent Framework 単独 | usage_details が正確に計測でき、asyncio ネイティブで柔軟に書ける |
Agent Frameworkがサポートするグループチャットやハンドオフなど強力なワークフローに関しては、Durable Functionsでは利用せずに、FastAPIなど別のアプリケーションで実装し利用するようにしましょう。
7. まとめ
本記事では、Durable Functions と Agent Framework を組み合わせた AI エージェント開発について、基礎から落とし穴まで一通り解説しました。
Durable Functions にも Agent Frameworkにもそれぞれ良さがあり、そもそも一方はAzureサービスでもう一方はPythonパッケージであるため比べること自体が間違っているのですが、正しく役割や用途、強みを理解すれば非常に強力なツールです。
- Durable Functions が解く問題: 「複雑なワークフローをサーバーレスで、ステートを持ちながら、コスト効率よく動かしたい」
- Agent Framework が解く問題: 「複数の AI エージェントを協調させて、動的に賢い回答を生み出したい」
まだ Agent Framework は成熟しておらず、今後も開発が頻繁に行われるライブラリだと思うので、Durable Functions との協調もこれからさらに広がっていくのではないかと期待しています。
もっとステートフルなワークフローが使いやすくなることを、願っております。