8
5

AI Agentを状態機械として表現する

Last updated at Posted at 2024-09-06

はじめに

生成AIを使ったAI Agentを記述する際に素朴には非同期関数を組み合わせて実装することが多いと思います。
しかし、非同期関数を組み合わせた書き方はテストが書きにくく、Agentが複雑になり、要件が増えるにつれてメンテナンスが難しくなっていきます。

この記事では状態機械を使ったAI Agentの記述方法を紹介します。

免責事項

今回のコードは実務で使用しているコードから概念のみを抜き出して生成AIのサポートを得て作られています。
動作確認はしていないので何らかのエラーが発生する可能性はあります。あらかじめご了承ください。もしエラーを見つけましたらコメントで報告していただけると幸いです。

AI Agentを表現する際の悩み

生成AIを使ったAI Agentを実装することを考えましょう。簡単のため、以下の3ステップからなるRAG Agentを考えます。

  1. 入力文から検索クエリをLLMで抽出する
  2. 検索クエリをナレッジベースに問い合わせる(キーワード検索かベクター検索かはここでは考えないことにします。)
  3. 検索結果をもとに回答文をLLMで生成する

素朴な書き方

一番素朴な実装は以下のような非同期関数として記述することだと思います。

rag.ts
import { search } from "./search";
import { llm } from "./llm";

async function extractSearchQuery(question: string): Promise<string> {
  const systemPrompt = "..."
  const userPrompt = `## 入力文\n${question}\n` + 
      `## 指示\n入力文の質問に回答するために検索エンジンで問い合わせるキーワードを抽出してください`
  const response = await llm(systemPrompt, userPrompt)
  return response.text
}

async function generateAnswer(question: string, context: object[]): Promise<string> {
  const systemPrompt = "..."
  const userPrompt = `## 入力文\n${question}\n## 文脈情報\n${JSON.stringify(context)}` +
    `## 指示\n入力文の質問に文脈情報を参照して回答してください`
  const response = await llm(systemPrompt, userPrompt)
  return response.text
}

export async function rag(question: string): Promise<string> {
  // 入力から検索クエリを抽出する
  const query = await extractSearchQuery(question)
  // ナレッジベースで検索する
  const searchResult = await search(query)
  // 回答を生成する
  const answer = await generateAnswer(question, searchResult)
  return answer
}

問題点1: 単体テストが書きにくい

さて、実装したらテストを書きましょう。その際にはLLMの実行や検索エンジンをモックする必要があります。

rag.test.ts
import { rag } from "./rag";
import { search } from "./search";
import { llm } from "./llm";

jest.mock("./search");
jest.mock("./llm");

describe("rag", () => {
  beforeEach(() => {
    search.mockClear();
    llm.mockClear();
  });

  it("should return the generated answer", async () => {
    // Arrange
    const question = "ウェブサイトの作り方は?";
    const query = "ウェブサイト 作り方";
    const searchResult = [
      { title: "ウェブ開発チュートリアル", content: "..." },
      { title: "ウェブサイト構築ガイド", content: "..." },
    ];
    const answer = "ウェブサイトは次の手順に従って作成できます...";
    
    llm
      .mockResolvedValueOnce({ text: query }) // First call to extract the search query
      .mockResolvedValueOnce({ text: answer }); // Second call to generate the answer
    search.mockResolvedValueOnce(searchResult);

    // Act
    const result = await rag(question);

    // Assert
    expect(llm).toHaveBeenCalledTimes(2);
    expect(llm).toHaveBeenNthCalledWith(
      1,
      "...",
      `## 入力文\n${question}\n` + 
      `## 指示\n入力文の質問に回答するために検索エンジンで問い合わせるキーワードを抽出してください`
    );
    expect(llm).toHaveBeenNthCalledWith(
      2,
      "...",
      `## 入力文\n${question}\n## 文脈情報\n${JSON.stringify(searchResult)}` +
      `## 指示\n入力文の質問に文脈情報を参照して回答してください`
    );
    expect(search).toHaveBeenCalledWith(query);
    expect(result).toBe(answer);
  });
});

このテストはフロー全体をテストするため、長く、モックやアサート文が多いため、メンテナンスが難しいテストになっています。generateAnswerextractSearchQueryに対して単体テストを書けばある程度改善しますが、そうすると今度はragのなかでgenerateAnswerextractSearchQueryをモックすることになり、あまり効果的なテストになりません。

問題点2: 進捗の表示

生成AIアプリでは、回答生成までにかかる待ち時間のUXを改善するために、AI Agentが今何をやっているのかの進捗状況を表示することが求められます。ではこの例で、「検索を実行しています」や「回答を生成しています」などの進捗メッセージを送信するようにしましょう。

rag-monitor.ts
import { search } from "./search";
import { llm } from "./llm";


async function extractSearchQuery(question: string, listener: (string) => void): Promise<string> {
  const systemPrompt = "..."
  const userPrompt = `## 入力文\n${question}\n` + 
      `## 指示\n入力文の質問に回答するために検索エンジンで問い合わせるキーワードを抽出してください`
  listener('検索キーワードを抽出しています')
  const response = await llm(systemPrompt, userPrompt)
  return response.text
}

async function generateAnswer(question: string, context: object[], listener: (string) => void): Promise<string> {
  const systemPrompt = "..."
  const userPrompt = `## 入力文\n${question}\n## 文脈情報\n${JSON.stringify(context)}` +
    `## 指示\n入力文の質問に文脈情報を参照して回答してください`
  const response = await llm(systemPrompt, userPrompt)
  listener('回答を生成しています')
  return response.text
}

export async function rag(question: string, listener: (string) => void): Promise<string> {
  // 入力から検索クエリを抽出する
  const query = await extractSearchQuery(question, listener)
  // ナレッジベースで検索する
  listener(`${query}で検索しています`)
  const searchResult = await search(query, listener)
  // 回答を生成する
  const answer = await generateAnswer(question, searchResult, listener)
  return answer
}

この時のテストは以下のとおりです。

rag-monitor.test.ts
import { rag } from './rag-monitor';
import { search } from './search';
import { llm } from './llm';

jest.mock('./search');
jest.mock('./llm');

describe('rag', () => {
  it('should send progress messages through the listener', async () => {
    // Arrange
    const question = '最も高い山は何ですか?';
    const query = '最も高い山';
    const searchResult = [{ title: 'エベレスト山', content: '...' }];
    const answer = '世界で最も高い山はエベレスト山です。';
    const mockListener = jest.fn();

    llm.mockResolvedValueOnce({ text: query }).mockResolvedValueOnce({ text: answer });
    search.mockResolvedValueOnce(searchResult);

    // Act
    const result = await rag(question, mockListener);

    // Assert
    expect(result).toBe(answer);
    expect(mockListener).toHaveBeenNthCalledWith(1, '検索キーワードを抽出しています');
    expect(mockListener).toHaveBeenNthCalledWith(2, `${query}で検索しています`);
    expect(mockListener).toHaveBeenNthCalledWith(3, '回答を生成しています');
    expect(mockListener).toHaveBeenCalledTimes(3);
  });
});

さて、このテストは問題なく成功します。安心してコードをマージしたところ、QA時に「回答を生成していますのメッセージが表示されない」不具合を発見しました。コードを再度確認してみましょう。

rag-monitor.ts
async function generateAnswer(question: string, context: object[], listener: (string) => void): Promise<string> {
  const systemPrompt = "..."
  const userPrompt = `## 入力文\n${question}\n## 文脈情報\n${JSON.stringify(context)}` +
    `## 指示\n入力文の質問に文脈情報を参照して回答してください`
  const response = await llm(systemPrompt, userPrompt)
  listener('回答を生成しています') // ←バグ
  return response.text
}

listenerの呼び出しをawaitの後に書いているため、進捗メッセージが適切なタイミングで表示されていないのです。これをテストで検出するのは困難です。

このようにlistenerの呼び出し箇所が分散しているため、呼び出し忘れや順序のバグを埋め込みやすいのです。

他の手法

RAGエージェントを記述する際のDSLとしてはAzure AI StudioのPrompt FlowやAWSのBedrock Studio、GoogleのVertext AI Agent Builderなどがあります。

これらはGUIやYamlでエージェントを簡単に記述できますが、ベンダーロックインの懸念があります。
また、既存のアプリの中のデータを使ったAgentを動かすのにも適しません。

状態機械(ステートマシン)でAI Agentを表現する

さて、非同期関数でAI Agentのフロー全体を記述する方法は問題があることがわかりました。
この記事では、問題を解決するために、状態機械としてフローを表現する書き方を紹介します。

状態機械とは

状態機械は入力イベントに応じて内部状態が変化していくシステムのことです。
今回の記事では以下のような数学的定義に対応する状態機械を実装していきます。

  • 状態全体の集合 $Q$
  • イベントの集合 $E$
  • コマンドの集合 $C$
  • 初期状態 $q_0 \in Q$
  • 状態遷移 $\delta :: Q \times E \to Q \times C$
  • 終了状態の集合 $F \subset Q$

今回の例に当てはめると、$Q$,$E$,$C$はそれぞれ以下のような型で表現できます。

rag-sm.ts
type RagState = 
  | { tag: 'init' }
  | { tag: 'extract_keyword', question: string }
  | { tag: 'search', question: string, query: string }
  | { tag: 'generate_answer', question: string, searchResult: object[] }
  | { tag: 'completed', answer: string }

type RagEvent =
  | { tag: 'start', question: string }
  | { tag: 'keyword', query: string }
  | { tag: 'search_result', result: object[] }
  | { tag: 'answer', answer: string }

type RagCommand =
  | { tag: 'nop' }
  | { tag: 'llm', system: string, user: string, onReply: (string) => RagEvent }
  | { tag: 'search', query: string, onResult: (object[]) => RagEvent }

RagState: 状態全体の集合

RagStateはAI Agentの可能なすべての状態を表します。状態の種類には以下のものがあります。

  • init: 初期状態。エージェントが起動したが、まだ処理を開始していない状態
  • extract_keyword: 検索キーワード抽出状態。質問から検索キーワードを抽出している状態
  • search: 検索状態。抽出したキーワードを用いて情報を検索している状態
  • generate_answer: 検索結果を基に回答を生成している状態
  • completed: 回答の生成を終えた状態

各状態は、エージェントが何をしているかを表現するためにタグ付けされています。questionsearchResultanswerといったフィールドは、状態遷移に必要な情報を持っています。

RagEvent: イベントの集合

RagEventは外部からの入力であるイベントの種類を表します。
例えばkeywordイベントはLLMによって質問から検索キーワードが抽出されたイベントを表します。

RagCommand: コマンドの集合

RagCommandはエージェントが取るべきアクションを表すコマンドの種類を表します。今回はnop, llm, searchの三種類のコマンドを使用します。たとえば、キーワード抽出後にはsearchコマンドが、検索完了後にはllmコマンドが発行されます。

また、llmコマンドとsearchコマンドは、コマンド実行結果をRagEvent型に変換するコールバック関数フィールドを持ちます。こうすることで、非同期関数の呼び出しを外部に切り出すことができ、テストが書きやすくなります。

状態遷移図

コードの実装前に、今回のRAGの状態遷移図を確認しておきます。

この図で、ノードは状態を表し、辺はイベントとその際に発行されるコマンドを表しています。

今回の状態遷移図は非常にシンプルなものですが、実用上は条件分岐やループが出現し、徐々に複雑になっていきます。

状態遷移関数の実装

では状態遷移図に従って、状態遷移関数と状態機械全体を実装していきます。

rag-sm.ts
...

class Rag {
  state: RagState

  constructor() {
    this.state = { tag: 'init' };
  }
  
  onEvent(event: RagEvent): RagCommand {
    switch (event.tag) {
      case 'start':
        return this.onStart(event.question);
      case 'keyword':
        return this.onKeyword(event.query);
      case 'search_result':
        return this.onSearchResult(event.result);
      case 'answer':
        return this.onAnswer(event.answer);
      default:
        throw new Error('Unexpected event: ' + JSON.stringify(event));
    }
  }

  private onStart(question: string): RagCommand {
    if (this.state.tag !== 'init') {
      throw new Error('onStart called in wrong state: ' + JSON.stringify(this.state));
    }
    this.state = { tag: 'extract_keyword', question };
    const userPrompt = `## 入力文\n${question}\n` + 
                       `## 指示\n入力文の質問に回答するために検索エンジンで問い合わせるキーワードを抽出してください`;
    const onReply = (answer) => ({ tag: 'keyword', query: answer })
    return { tag: 'llm', system: '...', user: userPrompt, onReply };
  }

  private onKeyword(query: string): RagCommand {
    if (this.state.tag !== 'extract_keyword') {
      throw new Error('onKeyword called in wrong state: ' + JSON.stringify(this.state));
    }
    const { question } = this.state;
    const onResult = (result) => ({ tag: 'search_result', result })
    this.state = { tag: 'search', question, query, onResult };
    return { tag: 'search', query };
  }

  private onSearchResult(result: object[]): RagCommand {
    if (this.state.tag !== 'search') {
      throw new Error('onSearchResult called in wrong state: ' + JSON.stringify(this.state));
    }
    const { question } = this.state;
    this.state = { tag: 'generate_answer', question, searchResult: result };
    const userPrompt = `## 入力文\n${question}\n## 文脈情報\n${JSON.stringify(result)}` +
                       `## 指示\n入力文の質問に文脈情報を参照して回答してください`;
    const onReply = (answer) => ({ tag: 'answer', answer })
    return { tag: 'llm', system: '...', user: userPrompt, onReply };
  }

  private onAnswer(answer: string): RagCommand {
    if (this.state.tag !== 'generate_answer') {
      throw new Error('onAnswer called in wrong state: ' + JSON.stringify(this.state));
    }
    this.state = { tag: 'completed', answer };
    return { tag: 'nop' };
  }
}

Ragクラスは状態機械の実装の中核を成すクラスで、状態遷移関数を内包しています。このクラスは内部状態を持ち、与えられたイベントに応じた適切なコマンドを生成し、クラスの状態を更新する責務を担います。

onEvent: イベントハンドラ

主要メソッドの一つであるonEventは、外部からのイベントを受け取り、それを処理して新たな状態とコマンドを生成します。このメソッドは、現在の内部状態とイベントの種類に応じて、以下のいずれかのプライベートメソッドを呼び出します:

  • onStart: 初期状態からキーワード抽出状態への遷移を管理します
  • onKeyword: キーワード抽出状態から検索状態への遷移を管理します
  • onSearchResult: 検索状態から回答生成状態への遷移を管理します
  • onAnswer: 回答生成状態から完了状態への遷移を管理します

各状態遷移メソッドはRagCommandを返し、それに含まれるonReplyonResultなどのコールバックを通じて次のイベントが発生するように設計されています。これにより、非同期処理のフローがシームレスにつながります。

状態管理

Ragクラスは状態管理に非常に厳格です。各状態遷移メソッドが呼ばれる前に現在の状態をチェックし、誤ったタイミングでのメソッド呼び出しをエラーとして捉えます。これにより、状態の不整合を防ぎ、状態遷移が意図したとおりに行われることを保証しています。

状態機械の実行

最後にRagクラスを使用して生成AIアプリを提供するコードを実装します。

rag-sm.ts
import { search } from "./search";
import { llm } from "./llm";

...

async function commandHandler(command: RagCommand): Promise<RagEvent | undefined> {
  switch (command) {
    case 'llm':
      const reply = await llm(command.system, command.user)
      return command.onReply(reply)
    case 'search':
      const result = await search(command.query)
      return command.onResult(result)
    case 'nop':
    default:
      return undefined
}


async function rag(question: string): Promise<string> {
    const machine = new Rag()
    let event: RagEvent | undefined = { tag: 'start', question: question };
    while(event) {
      const command = machine.onEvent(event);
      if (machine.state.tag === 'completed') {
        return machine.state.answer;
      }
      event = await commandHandler(command);
    }
    return Promise.reject(new Error('新規イベントが発行されませんでした'));
  }
}

commandHandler関数はRagCommandに応じた非同期処理を実行し、その結果として次のRagEventを生成します。この関数は、状態遷移において中核となる外部サービスの呼び出しを担当します。

commandHandler: コマンドハンドラ

この関数はRagCommandを受け取り、そのタグに応じて異なる非同期アクションを実行します。利用可能なコマンドとその処理は以下のとおりです。

  • 'llm': 与えられたシステムプロンプトとユーザープロンプトを使用して言語モデルの照会を行い、その回答に基づいて次のイベントを生成します。
  • 'search': 検索クエリを用いて検索を行い、その結果に基づいて次のイベントを生成します。
  • 'nop': 何もしないコマンドであり、状態機械の実行が完了したことを示します。

コマンドタグに応じて適切なAPI(例:llmsearch)が呼び出され、非同期操作が完了するとonReplyonResultコールバックが利用されて新しいイベントが生成されます。

状態機械の実行

rag関数はRag状態機械を実行するエントリポイントです。この関数は初期イベントを作成し、それをRagクラスのonEventメソッドに渡して、結果として生成されたコマンドをcommandHandler関数に渡します。そこから生成される新しいイベントに基づいて、次のコマンドが生成されるまでのプロセスがループします。

このループは状態機械が完了状態に達するまで続き、その時点で生成された最終的な回答が返されます。新しいイベントが発行されず、状態機械が進行不可能になった場合にはエラーが投げられます。

全体のコード

状態機械を利用したコードの全体像をまとめると以下のようになります。
この例はRAG自体がシンプルなのでボイラープレートの増加によるデメリットがありますが、実際のロジックが複雑になるにつれてそのデメリットは相対的に小さくなります。

rag-sm.ts
import { search } from "./search";
import { llm } from "./llm";

export type RagState =
  | { tag: 'init' }
  | { tag: 'extract_keyword', question: string }
  | { tag: 'search', question: string, query: string }
  | { tag: 'generate_answer', question: string, searchResult: object[] }
  | { tag: 'completed', answer: string }

export type RagEvent =
  | { tag: 'start', question: string }
  | { tag: 'keyword', query: string }
  | { tag: 'search_result', result: object[] }
  | { tag: 'answer', answer: string }

export type RagCommand =
  | { tag: 'nop' }
  | { tag: 'llm', system: string, user: string, onReply: (string) => RagEvent }
  | { tag: 'search', query: string, onResult: (object[]) => RagEvent }


export class Rag {
  state: RagState

  constructor() {
    this.state = { tag: 'init' };
  }

  onEvent(event: RagEvent): RagCommand {
    switch (event.tag) {
      case 'start':
        return this.onStart(event.question);
      case 'keyword':
        return this.onKeyword(event.query);
      case 'search_result':
        return this.onSearchResult(event.result);
      case 'answer':
        return this.onAnswer(event.answer);
      default:
        throw new Error('Unexpected event: ' + JSON.stringify(event));
    }
  }

  private onStart(question: string): RagCommand {
    if (this.state.tag !== 'init') {
      throw new Error('onStart called in wrong state: ' + JSON.stringify(this.state));
    }
    this.state = { tag: 'extract_keyword', question };
    const userPrompt = `## 入力文\n${question}\n` +
                       `## 指示\n入力文の質問に回答するために検索エンジンで問い合わせるキーワードを抽出してください`;
    const onReply = (answer: string) => ({ tag: 'keyword', query: answer });
    return { tag: 'llm', system: '...', user: userPrompt, onReply };
  }

  private onKeyword(query: string): RagCommand {
    if (this.state.tag !== 'extract_keyword') {
      throw new Error('onKeyword called in wrong state: ' + JSON.stringify(this.state));
    }
    this.state = { tag: 'search', question: this.state.question, query };
    const onResult = (result: object[]) => ({ tag: 'search_result', result });
    return { tag: 'search', query, onResult };
  }

  private onSearchResult(result: object[]): RagCommand {
    if (this.state.tag !== 'search') {
      throw new Error('onSearchResult called in wrong state: ' + JSON.stringify(this.state));
    }
    this.state = { tag: 'generate_answer', question: this.state.question, searchResult: result };
    const userPrompt = `## 入力文\n${this.state.question}\n## 文脈情報\n${JSON.stringify(result)}` +
                       `## 指示\n入力文の質問に文脈情報を参照して回答してください`;
    const onReply = (answer: string) => ({ tag: 'answer', answer });
    return { tag: 'llm', system: '...', user: userPrompt, onReply };
  }

  private onAnswer(answer: string): RagCommand {
    if (this.state.tag !== 'generate_answer') {
      throw new Error('onAnswer called in wrong state: ' + JSON.stringify(this.state));
    }
    this.state = { tag: 'completed', answer };
    return { tag: 'nop' };
  }
}

async function commandHandler(command: RagCommand): Promise<RagEvent | undefined> {
  switch(command.tag) {
    case 'llm':
      const reply = await llm(command.system, command.user);
      return command.onReply(reply);
    case 'search':
      const result = await search(command.query);
      return command.onResult(result);
    case 'nop':
      return undefined;
    default:
      throw new Error('Unexpected command: ' + JSON.stringify(command));
  }
}

export async function rag(question: string): Promise<string> {
  const machine = new Rag();
  let event: RagEvent | undefined = { tag: 'start', question: question };
  while(event) {
    const command = machine.onEvent(event);
    if(machine.state.tag === 'completed') {
      return machine.state.answer;
    }
    event = await commandHandler(command);
  }
  return Promise.reject(new Error('新規イベントが発行されませんでした'));
}

メリット1: 単体テストが書きやすい

さて、Ragクラスに対する単体テストを書いていきます。メインのロジックはonEventなのでそこに集中してテストを書いていきます。

onEventは状態変更以外の副作用を持たないので発行されたコマンドと遷移後の状態が意図通りであることをテストすれば良いです。

rag-sm.test.ts
import { Rag, RagCommand, RagEvent, RagState } from './rag-sm';

describe('Rag', () => {
  let rag: Rag;

  beforeEach(() => {
    rag = new Rag();
  });

  it('初期状態からキーワード抽出状態への遷移とコマンドのテスト', () => {
    const startEvent: RagEvent = { tag: 'start', question: 'ウェブサイトの作り方は?' };
    const command = rag.onEvent(startEvent);
    
    const expectedState: RagState = { tag: 'extract_keyword', question: 'ウェブサイトの作り方は?' };
    const expectedCommand: RagCommand = { 
      tag: 'llm', 
      system: '...', 
      user: '## 入力文\nウェブサイトの作り方は?\n## 指示\n入力文の質問に回答するために検索エンジンで問い合わせるキーワードを抽出してください',
      onReply: expect.any(Function)
    };

    expect(rag.state).toEqual(expectedState);
    expect(command).toEqual(expectedCommand);
  });

  it('キーワード抽出状態から検索状態への遷移とコマンドのテスト', () => {
    rag.state = { tag: 'extract_keyword', question: 'ウェブサイトの作り方は?' };
    const keywordEvent: RagEvent = { tag: 'keyword', query: 'ウェブサイト 作り方' };
    const command = rag.onEvent(keywordEvent);

    const expectedState: RagState = { tag: 'search', question: 'ウェブサイトの作り方は?', query: 'ウェブサイト 作り方' };
    const expectedCommand: RagCommand = { 
      tag: 'search', 
      query: 'ウェブサイト 作り方', 
      onResult: expect.any(Function)
    };

    expect(rag.state).toEqual(expectedState);
    expect(command).toEqual(expectedCommand);
  });

  /* 中略 */

  it('不正な状態でイベントが呼ばれた場合に、エラーを投げるテスト', () => {
    rag.state = { tag: 'generate_answer', question: 'ウェブサイトの作り方は?', searchResult: [{ title: 'タイトル', content: '内容' }] };
    const invalidEvent: RagEvent = { tag: 'start', question: '別の質問' };

    expect(() => rag.onEvent(invalidEvent)).toThrow();
  });
});

このテストは、Ragクラスの状態が正しく遷移していること、および適切なRagCommandが生成されたことを同時に検証しています。不正な状態遷移を試みた際に適切なエラーが投げられるかもテストしています。これにより、状態とコマンドの生成が意図した通りに機能していることを確認できます。

今回の例では簡単のためonReply, onResultのコールバック関数は検証していませんが、これも純粋関数なのでテストは容易です。

素朴な書き方の場合と比較して、モックが一切必要なくなっていることがわかります。

メリット2: 進捗の表示

さて、この状態機械に現在の進捗を表示させるように修正してみましょう。
ragメソッドにlistenerの引数を追加すれば良いです。

rag-sm-monitor.ts
...

function progressMessage(state: RagState) {
  switch (state.tag) {
    case 'extract_keyword':
      return 'キーワードを抽出しています。'
    case 'search':
      return `${state.query}で検索しています`
    case 'generate_answer':
      return '回答を生成しています'
    default:
      return undefined
  }
}

async function rag(question: string, listener: (string) => void): Promise<string> {
    const machine = new Rag()
    let event: RagEvent | undefined = { tag: 'start', question: question };
    while (event) {
      const command = machine.onEvent(event);
      if (machine.state.tag === 'completed') {
        return machine.state.answer;
      }
      const message = progressMessage(machine.state) 
      if (message) {
        listener(message)
      }
      event = await commandHandler(command);
    }
    return Promise.reject(new Error('新規イベントが発行されませんでした'));
  }
}

素朴な書き方と比較すると、listenerを呼び出している箇所が一箇所になっており、呼び出し漏れやタイミングバグを埋め込むリスクが小さくなっていることがわかります。進捗メッセージもRagStateの純粋関数となっており、テストが非常に簡単です。

まとめ

この記事では、AI Agentの実装において状態機械を採用するアプローチとそのメリットについて紹介しました。以下のポイントを重視したアプローチが、結果的に開発の複雑さを軽減し、品質を向上させることができると考えています。

  1. 明確な状態管理: 各状態が明確なタスクと関連データを持ち、フロー全体が直感的に理解しやすくなりました。これにより、状態遷移の理解、実装、デバッグが容易になります

  2. テストの容易さ: 純粋な関数の使用により、外部のAPI呼び出しや非同期処理をモックする必要がなくなりました。これにより単体テストの実装が容易になり、コードの信頼性も向上します

  3. 進捗表示の統合: 進捗表示を状態遷移と密接に関連させることで、進捗表示のためのコードを一箇所に集約し、呼び出し漏れなどのヒューマンエラーを減らすことができました

  4. 拡張性とメンテナンス性: 状態遷移図を可視化することで、コードの詳細を読まなくてもシステムの振る舞いを抽象的に理解し、変更方法を議論することが可能になります。複雑な仕様を検証するためにモデル検査のアプローチを適用することもできるかもしれません。もちろんコードと状態遷移図の齟齬が発生しないように気をつける必要はありますが、生成AIを使って高い精度でコードから状態遷移図を書き起こすことが可能です

この記事を通して、読者の皆さんには、状態機械を使用してAI Agentを実装する際の具体的な方法とそのメリットについて理解を深めていただけたと思います。最初はボイラープレートが増えるなどの難点も感じるかもしれませんが、システムが成熟し、より複雑になるにつれて、このアプローチの真価が発揮されることでしょう。
生成AIアプリや複数のステップからなる複雑な処理を表現する際には、このアプローチを検討してみてください。

参考文献

今回のアプローチはElmアーキテクチャから着想を得ています。

この言語も非常に面白いので、興味があればぜひご覧ください。

8
5
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
8
5