17
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

ニッセイ情報テクノロジー PS事業推進室Advent Calendar 2024

Day 21

LangGraphでHumanInTheLoop 〜プロセスを占有しないHumanFeedback〜

Last updated at Posted at 2024-12-20

シンプルな実装

 LangGraphでHumanFeedbackを組み込む場合、次のようなコードで実現可能です。

コード

simple_hitl.py
import os

from langchain_community.tools import HumanInputRun, WikipediaQueryRun
from langchain_community.utilities import WikipediaAPIWrapper
from langchain_core.messages import HumanMessage, SystemMessage
from langchain_openai import ChatOpenAI
from langgraph.prebuilt import create_react_agent

if 'OPENAI_API_KEY' not in os.environ:
    os.environ['OPENAI_API_KEY'] = 'xxx'

# チャットモデル作成
model = ChatOpenAI(model='gpt-4o-mini')


def _prompt_func(text: str) -> None:
    print('\n')
    print(f'AIからの質問: {text}')


# ツール定義
tools = [
    # 検索ツール
    WikipediaQueryRun(api_wrapper=WikipediaAPIWrapper()),
    # ヒューマンフィードバックツール
    HumanInputRun(prompt_func=_prompt_func),
]

# エージェント作成
agent = create_react_agent(
    model=model,
    tools=tools,
)

# ユーザー入力
user_input = input('エージェントへの指示を入力してください。: ')

system_message = (
    '与えられたメッセージに従って、WEB検索・人間への質問を行い、回答してください。'
)

# エージェント実行
response = agent.invoke(
    {
        'messages': [
            SystemMessage(
                content=system_message
            ),
            HumanMessage(
                content=user_input
            ),
        ]
    }
)

print('### 出力結果 ###')
print(response['messages'][-1].content)

実行結果

$ python tutorials/hitl/simple_hitl.py 
エージェントへの指示を入力してください。: 妹への誕生日プレゼントは何が良いと思いますか?

AIからの質問: 妹への誕生日プレゼントのアイデアを教えてください。年齢や趣味は特に考慮しなくて大丈夫です。
料理が趣味なので料理関係のプレゼントが良いと思っています

AIからの質問: 料理が趣味の妹への誕生日プレゼントでおすすめのアイデアを教えてください。
それはあなたに考えてほしいです。

### 出力結果 ###
料理が趣味の妹への誕生日プレゼントとして、以下のアイデアを考えてみました:

1. **高品質な調理器具**: シェフナイフや包丁セット、フライパン、鍋など、料理をする際に役立つ高品質な器具。
2. **料理本**: 人気の料理本や特定のジャンル(例えば、イタリア料理、フレンチ、ベジタリアン料理など)に特化したレシピ本。
3. **食材のサブスクリプションボックス**: 毎月新鮮な食材が届くサービスを利用することで、料理の幅が広がります。
4. **クッキングクラスの受講券**: 特定の料理技術を学べるクラスの受講券。
5. **オリジナルのスパイスセット**: 世界中のスパイスやハーブを集めたセット。
6. **エプロンやキッチンタオル**: おしゃれで実用的なデザインのエプロンやキッチンタオル。

これらのアイデアから、妹さんの好みやスタイルに合ったものを選んでみてください!

このコードの課題

 HumanInputRunをツールとして用いるこのコードでは、ツールへの回答としてHumanFeedbackを求められます。そのため、ツールの実行完了(=HumanFeedbackの完了)までエージェントが待機し、プロセスを占有することになります。

プロセスを占有しない(エージェントの実行を中断する)実装

 次のコードで、エージェントの実行を中断しながらHumanFeedbackを実現可能です。

コード

improved_hitl.py
import json
import os
import uuid
from dataclasses import dataclass
from typing import Optional, Any

from langchain_community.tools import HumanInputRun, WikipediaQueryRun
from langchain_community.utilities import WikipediaAPIWrapper
from langchain_core.callbacks import CallbackManagerForToolRun
from langchain_core.messages import HumanMessage, SystemMessage, BaseMessage, ToolMessage, AIMessage
from langchain_openai import ChatOpenAI
from langgraph.checkpoint.memory import MemorySaver
from langgraph.prebuilt import create_react_agent

if 'OPENAI_API_KEY' not in os.environ:
    os.environ['OPENAI_API_KEY'] = 'xxx'


class DummyHumanInputRun(HumanInputRun):
    """
    ダミーのヒューマンフィードバックツール
    """

    def _run(
            self,
            query: str,
            run_manager: Optional[CallbackManagerForToolRun] = None,
    ) -> str:
        # 実際にはコールしないため、NotImplementedError
        raise NotImplementedError


@dataclass
class HumanFeedback:
    """
    ヒューマンフィードバックを格納するクラス
    """
    tool_call_id: str
    query: str
    response: str | None = None


class HumanFeedbackAgent:
    def __init__(self):
        # チャットモデル作成
        self._model = ChatOpenAI(model='gpt-4o-mini')
        # ツール定義
        tools = [
            # 検索ツール
            WikipediaQueryRun(api_wrapper=WikipediaAPIWrapper()),
            # ヒューマンフィードバックツール(ダミー)
            DummyHumanInputRun(),
        ]
        # ツール名
        self._tools_by_name = {
            tool.name: tool for tool in tools
        }
        self._human_feedback_tool_name = DummyHumanInputRun().name

        # エージェント作成
        self._agent = create_react_agent(
            self._model,
            tools,
            checkpointer=MemorySaver(),
            # ノード「tools」の前に中断
            interrupt_before=['tools'],
        )
        self._config = None

        self._system_message = (
            '与えられたメッセージに従って、WEB検索・人間への質問を行い、回答してください。'
        )

    def invoke(
            self,
            user_input: str,
    ) -> tuple[dict[str, Any], dict[str, HumanFeedback]]:
        # エージェント実行中断後の再開のため、'thread_id'を設定
        self._config = {
            # 'recursion_limit': 30,
            'configurable': {
                'thread_id': uuid.uuid4(),
            },
        }
        # エージェント実行
        response = self._agent.invoke(
            {
                'messages': [
                    SystemMessage(
                        content=self._system_message
                    ),
                    HumanMessage(
                        content=user_input
                    )
                ]
            },
            config=self._config,
        )

        # 継続可能かチェック
        while self._is_continued(response):
            # 継続可能(=HumanFeedback不要)ならState更新
            self._agent.update_state(
                self._config,
                self._exec_tool(response, dict()),
                as_node='tools'
            )
            # エージェント実行
            response = self._agent.invoke(
                None,
                config=self._config,
            )

        return response, self._human_feedbacks(response)

    def continue_invoke(
            self,
            before_response,
            human_feedbacks: dict,
    ) -> tuple[dict[str, Any], dict[str, HumanFeedback]]:
        # ユーザーからのHumanFeedbackをツールの実行結果として登録
        tool_messages = self._exec_tool(before_response, human_feedbacks)

        # 'tools'ノードとしてツールの実行結果を更新
        self._agent.update_state(
            self._config,
            tool_messages,
            as_node='tools'
        )
        # エージェント実行
        response = self._agent.invoke(
            None,
            config=self._config,
        )

        # 継続可能かチェック
        while self._is_continued(response):
            # 継続可能(=HumanFeedback不要)ならState更新
            self._agent.update_state(
                self._config,
                self._exec_tool(response, dict()),
                as_node='tools'
            )
            # エージェント実行
            response = self._agent.invoke(
                None,
                config=self._config,
            )

        return response, self._human_feedbacks(response)

    def _is_continued(
            self,
            response: dict[str, Any],
    ) -> bool:
        last_message = response['messages'][-1]
        # ラストメッセージがツール呼び出しでない場合は継続不要(=エージェント実行終了)
        if not isinstance(last_message, AIMessage) or not last_message.tool_calls:
            return False
        else:
            # ラストメッセージにHumanFeedbackツールが含まれている場合は継続不可
            if any([tool_call['name'] == self._human_feedback_tool_name for tool_call in last_message.tool_calls]):
                return False
            # 含まれていない場合は継続可能
            return True

    def _exec_tool(
            self,
            response: dict[str, Any],
            human_feedbacks: dict[str, HumanFeedback],
    ) -> dict[str, list[BaseMessage]]:
        message = response['messages'][-1]

        outputs = []
        for tool_call in message.tool_calls:
            tool_name = tool_call['name']
            tool_call_id = tool_call['id']
            if tool_name == self._human_feedback_tool_name:
                # ツールがHumanFeedbackの場合は、ユーザーからのHumanFeedbackをツールの実行結果として登録
                tool_result = human_feedbacks[tool_call_id].response
            else:
                # それ以外の場合は、通常通りツール実行
                tool_result = self._tools_by_name[tool_name].invoke(
                    tool_call['args']
                )
            outputs.append(
                ToolMessage(
                    content=json.dumps(tool_result, ensure_ascii=False),
                    name=tool_name,
                    tool_call_id=tool_call_id,
                )
            )
        return {'messages': outputs}

    def _human_feedbacks(
            self,
            response: dict[str, Any],
    ) -> dict[str, HumanFeedback]:
        message = response['messages'][-1]

        outputs = dict()
        for tool_call in message.tool_calls:
            # HumanFeedbackのツール呼び出しのみ抽出
            if tool_call['name'] == self._human_feedback_tool_name:
                _id = tool_call['id']
                query = tool_call['args']['query']
                outputs[_id] = HumanFeedback(
                    tool_call_id=_id,
                    query=query,
                )

        return outputs


def main():
    agent = HumanFeedbackAgent()

    # ユーザー入力
    user_input = input('エージェントへの指示を入力してください。: ')

    # エージェント実行開始
    response, human_feedbacks = agent.invoke(user_input)

    # ### このタイミングでエージェントが中断され、エージェント実行からプロセスが解放される ### #

    while len(human_feedbacks) > 0:
        # HumanFeedbackが必要な場合はユーザーへ問い合わせ
        for human_feedback in human_feedbacks.values():
            user_input = input(f'\nAIからの質問: {human_feedback.query}\n')
            human_feedback.response = user_input

        # 得られたHumanFeedbackを新たなインプットとしてエージェントの実行を継続
        response, human_feedbacks = agent.continue_invoke(response, human_feedbacks)

    print('### 出力結果 ###')
    print(response['messages'][-1].content)


main()

実行結果

$ python tutorials/hitl/improved_hitl.py 
エージェントへの指示を入力してください。: 妹への誕生日プレゼントは何が良いと思いますか?

AIからの質問: 妹への誕生日プレゼントに何が良いかアドバイスをください。妹の年齢や趣味などの情報はありません。
料理が趣味なので料理関係のプレゼントが良いと思っています

### 出力結果 ###
妹の趣味が料理であれば、以下のような誕生日プレゼントが考えられます:

1. **高品質な調理器具**:例えば、シェフナイフやフライパン、鍋など、普段の料理をより楽しくするアイテムを選ぶと良いでしょう。
   
2. **料理教室のチケット**:新しいレシピや技術を学ぶことができる料理教室に参加できるチケットは、妹にとって素晴らしい体験になるかもしれません。

3. **料理本やレシピ集**:特定の料理や食材に特化した本や、人気シェフのレシピ本などは、インスピレーションを与える良いプレゼントです。

4. **調味料セット**:特別なスパイスやオイルなど、料理を引き立てる調味料のセットも喜ばれるでしょう。

5. **キッチンガジェット**:ユニークな調理器具や、便利なキッチンツールなども、料理好きの妹には嬉しいアイテムです。

これらのアイデアを参考にして、妹にぴったりのプレゼントを選んでみてください!

ポイント

エージェントの中断

 次のコードで、'tools'ノードの実行前にエージェントを中断するように設定しています。これにより、エージェントの実行が中断され、プロセスがエージェントの実行から解放されます。

        # エージェント作成
        self._agent = create_react_agent(
            self._model,
            tools,
            checkpointer=MemorySaver(),
            # ノード「tools」の前に中断
            interrupt_before=['tools'],
        )

HumanFeedback要否の確認

 'tools'ノードの実行前にエージェント中断されたら、エージェントのレスポンスからHumanFeedbackが必要か判断します。ツール呼び出し(tool_call)のツール名('name')がHumanFeedbackツールの名称と一致する場合に、HumanFeedbackが必要と判断します。

    def _is_continued(
            self,
            response: dict[str, Any],
    ) -> bool:
        last_message = response['messages'][-1]
        # ラストメッセージがツール呼び出しでない場合は継続不要(=エージェント実行終了)
        if not isinstance(last_message, AIMessage) or not last_message.tool_calls:
            return False
        else:
            # ラストメッセージにHumanFeedbackツールが含まれている場合は継続不可
            if any([tool_call['name'] == self._human_feedback_tool_name for tool_call in last_message.tool_calls]):
                return False
            # 含まれていない場合は継続可能
            return True

HumanFeedback呼び出しの抽出

 HumanFeedbackが必要と判断されたら、ツール呼び出しの情報を抽出します。抽出する情報は以下のとおりです。

  • id: 後続のHumanFeedback結果の反映のために必要なキー情報
  • query: 人間への問い合わせ内容
    def _human_feedbacks(
            self,
            response: dict[str, Any],
    ) -> dict[str, HumanFeedback]:
        message = response['messages'][-1]

        outputs = dict()
        for tool_call in message.tool_calls:
            # HumanFeedbackのツール呼び出しのみ抽出
            if tool_call['name'] == self._human_feedback_tool_name:
                _id = tool_call['id']
                query = tool_call['args']['query']
                outputs[_id] = HumanFeedback(
                    tool_call_id=_id,
                    query=query,
                )

        return outputs

HumanFeedback結果の反映

人間からのレスポンスでHumanFeedback結果が得られたら、その結果を反映します。事前に抽出したidをキーとして、ツールのレスポンスとしてHumanFeedback結果を反映します。反映した結果は、ツールの実行結果(ToolMessage)としてチャットモデルに渡されます。

    def _exec_tool(
            self,
            response: dict[str, Any],
            human_feedbacks: dict[str, HumanFeedback],
    ) -> dict[str, list[BaseMessage]]:
        message = response['messages'][-1]

        outputs = []
        for tool_call in message.tool_calls:
            tool_name = tool_call['name']
            tool_call_id = tool_call['id']
            if tool_name == self._human_feedback_tool_name:
                # ツールがHumanFeedbackの場合は、ユーザーからのHumanFeedbackをツールの実行結果として登録
                tool_result = human_feedbacks[tool_call_id].response
            else:
                # それ以外の場合は、通常通りツール実行
                tool_result = self._tools_by_name[tool_name].invoke(
                    tool_call['args']
                )
            outputs.append(
                ToolMessage(
                    content=json.dumps(tool_result, ensure_ascii=False),
                    name=tool_name,
                    tool_call_id=tool_call_id,
                )
            )
        return {'messages': outputs}

まとめ

 HumanInputRunを用いたシンプルなHumanFeedbackの実装と、プロセスを占有しないHumanFeedbackの実装を紹介しました。LangGraphを使ってHumanFeedbackを組み込んだエージェントを作成する際は、ぜひ参考にしてください。

17
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
17
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?