16
4

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

前書き

これまでMastraでは、ワークフロー内でのHITL(Human-in-the-Loop)の実装は可能でしたが、AIエージェントのツール実行に対するHITLの仕組みは提供されていませんでした。

しかし9月中のアップデートにより、AIエージェントのツール実行中にもHITLを組み込めるようになりました。

11月の公式ブログでも紹介されましたが、まだ実装のサンプルが少なかったため、本記事ではMastraのHITLについて、より深く掘り下げて解説します。

そもそもHITLとは

HITL(Human-in-the-Loop)は、AIエージェントやワークフローの中に、人間の判断や承認を介在させる仕組みのことです。

たとえば、AIエージェントが外部APIを呼び出す前にユーザーの確認を求めたり、重要な処理の実行前に承認を得たりするケースが該当します。これにより、AIの自律的な動作を適切に制御し、意図しない操作やミスを防ぐことができます。

Mastraの中のHITLの実現

Mastraには一時停止Suspendと再開Resumeの仕組みがあります。

suspend()executeの中から呼び出せば、ワークフローの実行を一時停止できます。
その際、Mastraのインスタンスにストレージを設定していれば、ワークフローの実行状態がスナップショットとして保存されます。

人間の入力が必要な場面では、停止状態からresumeDataをクライアントから送信することで、ワークフローの次のアクションを制御できます。

例えば、以下のようなシンプルなメール送信ワークフローがあります。

メール送信ワークフロー
const step1 = createStep({
  id: "step-1",
  inputSchema: z.object({
    userEmail: z.string()
  }),
  outputSchema: z.object({
    output: z.string()
  }),
  resumeSchema: z.object({
    approved: z.boolean()
  }),
  execute: async ({ inputData, resumeData, suspend }) => {
    const { userEmail } = inputData;
    const { approved } = resumeData ?? {};

    // resumeDataからapprovedがなければ、ワークフロー実行一時中止
    if (!approved) {
      return await suspend({});
    }

    return {
      output: `Email sent to ${userEmail}`
    };
  }
});

export const testWorkflow = createWorkflow({
  id: "test-workflow",
  inputSchema: z.object({
    userEmail: z.string()
  }),
  outputSchema: z.object({
    output: z.string()
  })
})
  .then(step1)
  .commit();

ユーザーからの入力を受け取り、実際に送信する前にユーザーの承認を求めます。

Mastra Studioでこのワークフローを実行して、確認すると、Approvedのチェックボックスが表示されていることが確認できます。

E82CDE50-CD26-44FA-B249-B76E5C10E392.jpeg

スナップショットによる状態の維持

この一時停止状態で、設定しているストレージを確認すると、mastra_workflow_snapshotというテーブルが作成されており、その中に現在のワークフローのデータが保存されていることが確認できます。

791A24E1-1BB4-4DC0-801C-28DE0C260E11_4_5005_c.jpeg

一時停止中のワークフロースナップショット
{"runId":"86774c05-99ca-49aa-b973-0eec20c6f826","status":"suspended","value":{},"context":{"input":{"userEmail":"test@gmail.com"},"step-1":{"payload":{"userEmail":"test@gmail.com"},"startedAt":1764240044811,"status":"suspended","suspendPayload":{},"suspendedAt":1764240044821}},"activePaths":[],"serializedStepGraph":[{"type":"step","step":{"id":"step-1"}}],"suspendedPaths":{"step-1":[0]},"waitingPaths":{},"resumeLabels":{},"runtimeContext":{},"timestamp":1764240044824}

このワークフローのスナップショットにはstep-1というステップがあり、statusが停止中を示すsuspendedとなっています。

実行を許可すると、スナップショットは以下のように変化します。

実行を許可したワークフロースナップショット
{"runId":"86774c05-99ca-49aa-b973-0eec20c6f826","status":"success","value":{},"context":{"input":{"userEmail":"test@gmail.com"},"step-1":{"payload":{"userEmail":"test@gmail.com"},"startedAt":1764240044811,"status":"success","suspendPayload":{},"suspendedAt":1764240044821,"resumePayload":{"approved":true},"resumedAt":1764240114897,"output":{"output":"Email sent to test@gmail.com"},"endedAt":1764240114900}},"activePaths":[],"serializedStepGraph":[{"type":"step","step":{"id":"step-1"}}],"suspendedPaths":{},"waitingPaths":{},"resumeLabels":{},"result":{"output":"Email sent to test@gmail.com"},"runtimeContext":{},"timestamp":1764240114904}

step-1statussuccessとなっています。

つまり、MastraのHITLは、Suspendによってスナップショットが作成され、ユーザーの操作を経てResumeが実行され、スナップショットが更新されるという流れになります。

このHITL(Human-in-the-Loop)の実装において、ちょっとわかりにくいのは、一時停止後にクライアント(フロントエンド)から、もしくはAPIを使って、同じワークフローのresumeを呼び出し、resumeDataを送信する必要があるという点です。

const workflow = mastra.getWorkflow("testWorkflow");
const run = await workflow.createRunAsync();

await run.start({
  inputData: {
    userEmail: "alex@example.com"
  }
});

// ユーザーからのレスポンスをresume()を使ってワークフローへ送信
const handleResume = async () => {
  const result = await run.resume({
    step: "step-1",
    resumeData: { approved: true }
  });
};

また、スナップショットからワークフローを復元して再度HITLを実行することもできますが、それについては別の機会に説明します。

AIエージェントのツール実行におけるHITLの実現

冒頭でも説明したとおり、最新のMastraでは、AIエージェントのツール実行時にもHITLを介入させることが可能になりました。:fist_tone1:

実装方法はワークフローと基本的に同じです。Mastraのツール内でsuspendを使って実行を停止させたり、resumeDataを受け取って再開させたりできます。

承認パターン

ツール実行時の承認には2つのパターンがあります。

常に承認を求める場合は、requireApproval: trueを設定するだけで実現できます。

常に承認を求めるツールの実装
const deleteTool = createTool({
  id: "delete-data",
  description: "Delete records from database",
  inputSchema: z.object({
    count: z.number(),
    table: z.string()
  }),
  execute: async ({ context }) => {
    return await deleteRecords(context);
  },
  requireApproval: true,
});

条件付きで承認を求める場合は、suspendresumeDataを使って制御します。
以下は削除レコードが11件を超える場合に承認を求める例です。

レコード削除ツールの実装
export const deleteTool = createTool({
  id: "delete-records",
  description: "レコード削除ツール(11件以上は承認が必要)",
  inputSchema: z.object({
    count: z.number(),
    table: z.string()
  }),
  suspendSchema: z.object({
    message: z.string()
  }),
  resumeSchema: z.object({
    approved: z.boolean()
  }),
  execute: async ({ context, suspend, resumeData }) => {
    const { count, table } = context;

    // 11件以上の場合、承認を要求
    if (count > 10 && !resumeData) {
      return await suspend({
        message: `⚠️ ${table}から${count}件削除します。承認しますか?`
      });
    }

    // 承認されなかった場合
    if (resumeData && !resumeData.approved) {
      return { success: false, message: "削除がキャンセルされました" };
    }

    // 削除を実行(10件以下、または承認済み)
    return { 
      success: true, 
      message: `${table}から${count}件削除しました` 
    };
  },
});

このツールを通常通りAIエージェントに持たせれば、条件に応じた承認フローが動作します。

HITL対応のクライアント実装

詳細な実装はこのリポジトリを確認してください。

レスポンスの検知

MastraのAIエージェントのレスポンスはMastraModelOutputがベースタイプになっています。
node_modules/@mastra/core/dist/stream/types.d.tsBaseChunkTypeを確認すると、tool-call-suspendedというレスポンスタイプが存在します。

HITLを実装する場合、このタイプのレスポンスをクライアント側で検知して承認用UIを表示できます。

サンプル.ts
...
if (data.type === 'tool-call-suspended') {
  const suspendedCall: ToolCall = {
    id: data.payload.toolCallId,
    name: data.payload.toolName,
    args: data.payload.suspendPayload,
    suspended: true,
    state: 'approval-requested',
  };
  setToolCalls(prev => {
    const newCalls = [...prev];
    const existingIdx = newCalls.findIndex(c => c.id === suspendedCall.id);
    if (existingIdx >= 0) {
      newCalls[existingIdx] = suspendedCall;
    } else {
      newCalls.push(suspendedCall);
    }
    return newCalls;
  });
}
...

7DF6439C-A00B-426D-991A-0F43EBF91647.jpeg

承認フローの実行

承認時は同じAIエージェントに対して、resumeDatarunIdtoolCallIdを渡します。

...
const stream = await deleteAgent.resumeStream(
  { approved },
  { runId, toolCallId }
);

APIの構成

通常のフローと承認用フローで、APIを2つ用意するとやりやすいです。

/api        # 通常のリクエスト用
/api/resume # 承認フロー用

最後

Mastraはマイナーバージョンアップが頻繁に行われており、追いかけているつもりでも変化の速さにいつも驚かされます。

ドキュメントよりissueを見た方が早い、なんてこともありました:sweat_smile:
とはいえ、早ければ年内にも正式版のV1がリリースされるでしょう。楽しみですね。

参考資料

16
4
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
16
4

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?