0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

生成AIに関する記事を書こう!
Qiita Engineer Festa20242024年7月17日まで開催中!

DatabricksとLangGraphで学ぶAgenticアプローチ: ユーザ入力の待機

Posted at

導入

LangGraphのHow-to Guideウォークスルーの8回目です。

今回は、こちらの内容である「ユーザ入力を待つ方法」をウォークスルーしてみます。

検証はDatabricks on AWS、DBRは15.3MLを使っています。

ユーザ入力の待機とは

今回の内容もHuman in the Loopのカテゴリに属する内容です。

以下、公式ドキュメントの序文を邦訳。

ユーザー入力を待つ方法

人間参加型インタラクションの主なパターンの 1 つは、人間の入力を待つことです。
主なユースケースは、ユーザーに明確な質問をすることです。
これを実現する方法の 1 つは、END ノードに移動してグラフを終了することです。
その後、ユーザーの応答は、グラフの新しい呼び出しとして返されます。
これは基本的に、チャットボットのアーキテクチャを作成しているにすぎません。

これの問題は、グラフの特定のポイントに戻るのが難しいことです。
多くの場合、エージェントはプロセスの途中にあり、少しのユーザー入力が必要です。
conditional_entry_pointを使って適切な場所にルーティングするようにグラフを設計することは可能ですが、それは非常にスケーラブルではありません (基本的に、ほとんどどこにでも到達できるルーティング関数を持つ必要があるため)。

これを行う別の方法は、ユーザ入力を取得するためのノードを明示的に持つことです。
これはノートブックの設定で簡単に実装できます。ノードにinput()呼び出しを入れるだけです。
しかし、それは正確には本番適用可能というわけではありません。

幸いなことに、LangGraphを使用すると、同様のことを本番環境で行うことができます。
基本的な考え方は次のとおりです。

  • 人間の入力を表すノードを設定します。これは、特定の内向き/外向きエッジを持つことができます(必要に応じて)。実際には、このノード内にロジックは存在しないはずです。
  • ノードの前にブレークポイントを追加します。これにより、このノードが実行される前にグラフが停止します(いずれにせよ、実際のロジックがないので、これは良いことです)
  • グラフの状態を更新するために.update_stateas_nodeを使用します。人間が返した反応は何でも入れてください。ここで重要なのは、as_nodeパラメーターを使用して、そのノードであったかのようにこの更新を適用することです。これにより、次に実行を再開したときに、そのノードが最初からではなく、動作したかのように再開される効果があります。
    注: これには、チェックポイントを渡す必要があります。

というわけで、一連の処理がただ自動で進むのではなく、あるノードの実行前に割り込みをかけて、人の判断を入れる方法になります。
では、公式ドキュメントのコードをウォークスルーしてみましょう。

Step1. パッケージインストール

LangGraphやLangChainなど、必要なパッケージをインストール。

%pip install -U langgraph==0.1.4 langchain==0.2.6 langchain-community==0.2.6 mlflow-skinny[databricks]==2.14.1 pydantic==2.7.4
dbutils.library.restartPython()

Step2. エージェント(グラフ)の構築

シンプルなReActスタイルのエージェントを構築します。

モデルは以前の記事で作成したDatabricks Model Servingのエンドポイントを流用します。

※ Databricks Model Serving EndpointはLangChainのツールバインドにまだ対応していないため、公式ドキュメントのコードに対して処理を一部変更しています。

from langchain_community.chat_models import ChatDatabricks

# 状態を設定
from langgraph.graph import MessagesState, START
from langgraph.graph import END, StateGraph
from langgraph.checkpoint.memory import MemorySaver

# ツールを設定
# 実際のツール - 検索ツールを1つ持ちます
# "偽の"ツール - "ask_human"ツールも1つ持ちます
# ここで実際のツールを定義します
from langchain_core.tools import tool
from langgraph.prebuilt import ToolNode

from langchain_core.pydantic_v1 import BaseModel
from langchain_core.messages import ToolMessage
from langgraph.prebuilt import ToolInvocation

@tool
def search(query: str):
    """Web検索を実行します。"""

    # Search Toolに指定されたクエリを表示
    print(f"Tool Query -> {query}")

    # これは実際の実装のためのプレースホルダーです
    # ただし、LLMにはこのことを知らせないでください 😊
    return "サンフランシスコは晴れですが、あなたがGeminiなら気をつけてください 😈."

tools = [search]
tools_by_name = {tool.name: tool for tool in tools}

# Tool呼び出し用ノード。HumanMessageで結果を返す
@mlflow.trace(span_type="node")
def tool_node(state: dict):
    result = []
    for tool_call in state["messages"][-1].tool_calls:
        tool = tools_by_name[tool_call["name"]]
        observation = tool.invoke(tool_call["args"])
        result.append(HumanMessage(content=observation, tool_call_id=tool_call["id"]))
    return {"messages": result}

# モデルを設定
endpoint_name = "mistral-7b-instruct-v03-endpoint"
model = ChatDatabricks(endpoint=endpoint_name, temperature=0.1)


# すべてのツールをモデルにバインドします
# 上記の実際のツールがありますが、人間に質問するためのモックツールも必要です
# `bind_tools`はツールだけでなくツール定義も受け取るため、
# `ask_human`のツール定義を定義できます


class AskHuman(BaseModel):
    """人間に質問する"""

    question: str


# 続行するかどうかを決定する関数を定義
@mlflow.trace(span_type="edge")
def should_continue(state):
    messages = state["messages"]
    last_message = messages[-1]
    # 関数呼び出しがない場合、終了します
    if not last_message.tool_calls:
        return "end"
    # ツール呼び出しが人間に質問している場合、そのノードを返します
    # ここにロジックを追加して、人間の入力が必要なことをシステムに知らせることもできます
    # 例えば、Slackメッセージを送信するなど
    elif last_message.tool_calls[0]["name"] == "AskHuman":
        return "ask_human"
    # それ以外の場合は続行します
    else:
        return "continue"


# モデルを呼び出す関数を定義
@mlflow.trace(span_type="node")
def call_model(state):
    messages = state["messages"]
    response = model.invoke(messages)

    # 修正:強制的にダミーのTool Callを実行する設定を追加
    response.tool_calls = [{"id": "1111", "name": "AskHuman", "args":{}}]
    # 既存のリストに追加されるため、リストを返します
    return {"messages": [response]}


# 人間に質問するための偽のノードを定義
def ask_human(state):
    pass


# グラフを構築

# 新しいグラフを定義
workflow = StateGraph(MessagesState)

# サイクルする3つのノードを定義
workflow.add_node("agent", call_model)
workflow.add_node("action", tool_node)
workflow.add_node("ask_human", ask_human)

# エントリーポイントを `agent` に設定
# これは、このノードが最初に呼び出されることを意味します
workflow.add_edge(START, "agent")

# 条件付きエッジを追加
workflow.add_conditional_edges(
    # まず、開始ノードを定義します。`agent` を使用します。
    # これは、`agent` ノードが呼び出された後に取られるエッジを意味します。
    "agent",
    # 次に、次に呼び出されるノードを決定する関数を渡します。
    should_continue,
    # 最後にマッピングを渡します。
    # キーは文字列で、値は他のノードです。
    # ENDはグラフが終了することを示す特別なノードです。
    # これにより、`should_continue` が呼び出され、その出力がこのマッピングのキーと一致します。
    # 一致したキーに基づいて、そのノードが次に呼び出されます。
    {
        # `tools` の場合、ツールノードを呼び出します。
        "continue": "action",
        # 人間に質問する場合
        "ask_human": "ask_human",
        # それ以外の場合は終了します。
        "end": END,
    },
)

# `tools` から `agent` への通常のエッジを追加
# これは、`tools` が呼び出された後に `agent` ノードが次に呼び出されることを意味します。
workflow.add_edge("action", "agent")

# 人間の応答を受け取った後、エージェントに戻ります
workflow.add_edge("ask_human", "agent")

# メモリを設定
memory = MemorySaver()

# 最後に、これをコンパイルします!
# これをLangChain Runnableにコンパイルします。
# つまり、他のランナブルと同様に使用できます
# `ask_human` ノードが実行される前にブレークポイントを追加します
app = workflow.compile(checkpointer=memory, interrupt_before=["ask_human"])

グラフを可視化すると以下のようになります。

from IPython.display import Image, display

try:
    display(Image(app.get_graph().draw_mermaid_png()))
except Exception:
    # これはいくつかの追加の依存関係を必要とし、オプションです
    pass

image.png

Step3. エージェントとの対話

構築したエージェントを使って対話してみます。

from langchain_core.messages import HumanMessage

config = {"configurable": {"thread_id": "3"}}
input_message = HumanMessage(
    content="ユーザにどこにいるか質問し、それからその場所の天気を調べてください"    
)
with mlflow.start_span("graph", span_type="AGENT") as span:
    for event in app.stream({"messages": [input_message]}, config, stream_mode="values"):
        event["messages"][-1].pretty_print()
出力
================================ Human Message =================================

ユーザにどこにいるか質問し、それからその場所の天気を調べてください
================================== Ai Message ==================================

 まず、ユーザに「今どこにいるのですか?」と質問します。その後、入力された場所の天気を調べます。たとえば、ユーザが「東京」と答えた場合、天気APIを使用して東京の天気を調べます。

以下、Pythonでの例です。

```python
import requests
import json

# ユーザから入力を受け取る
location = input("今どこにいるのですか?")

# 天気APIのURL
url = f"http://api.openweathermap.org/data/2.5/weather?q={location}&appid=YOUR_API_KEY"

# 天気APIから天気情報を取得
response = requests.get(url)
data = json.loads(response.text)

# 天気情報を表示
print(f"{location}の天気は、{data['weather'][0]['description']}です。")
```

この例では、OpenWeatherMap APIを使用して天気情報を取得しています。APIキーは、自分のアカウントで取得できます。
Tool Calls:
  AskHuman (1111)
 Call ID: 1111
  Args:

Tool Callができない弊害で、LLMが天気取得のコードを生成してしまっていますが、そこの内容は無視してください。
大事なポイントは、最初のagentノードを実行した後、処理が中断されたことです。
これは、次の実行ノードとしてask_humanが選択され、そこにブレークポイントが設定されているためです。

さて、次の動作を公式ドキュメントから邦訳で抜粋。

次に、このスレッドをユーザーからの応答で更新します。その後、次の実行を開始できます。

これをツール呼び出しとして扱っているため、ツール呼び出しからの応答であるかのように状態を更新する必要があります。これを行うには、状態を確認してツール呼び出しの ID を取得する必要があります。

というわけで、ユーザからの応答を基に状態を変更します。
イメージとしては、何らかのフロントエンドやCLI上でユーザの入力待ちになった状態であり、今回は以下のようにツールに与えるパラメータを入力としてエージェントの状態を更新します。これはask_humanのノード処理となり、次のノードへ進むことになります。
(出力結果として、次に実行するノードを表示しています)

# ツール呼び出しのIDと希望するレスポンスを使用してツールメッセージを作成します
tool_message = [
    {"type": "user", "content": "san francisco"}
]

# # 以下と同等です。どちらも動作します
# from langchain_core.messages import ToolMessage
# tool_message = [ToolMessage(tool_call_id=tool_call_id, content="san francisco")]

# 状態を更新します
# `as_node="ask_human"`を指定していることに注意してください
# これにより、このノードとしてこの更新が適用され、
# その後は通常通りに続行されます
app.update_state(config, {"messages": tool_message}, as_node="ask_human")

# 状態を確認できます
# 現在の状態には次に`agent`ノードがあることがわかります
# これは、グラフの定義方法に基づいています
# `ask_human`ノードの後に(今トリガーしたもの)
# `agent`ノードへのエッジがあります
app.get_state(config).next
出力
('agent',)

これで、エージェントに続行するように指示できます。
追加の入力は必要ないため、グラフへの入力としてNoneを渡すだけで済みます。

with mlflow.start_span("graph", span_type="AGENT") as span:
    for event in app.stream(None, config, stream_mode="values"):
        event["messages"][-1].pretty_print()
出力
================================== Ai Message ==================================

 まず、ユーザに「今どこにいるのですか?」と質問します。その後、入力された場所の天気を調べます。たとえば、ユーザが「san francisco」と答えた場合、天気APIを使用してサンフランシスコの天気を調べます。

以下、Pythonでの例です。

```python
import requests
import json

# ユーザから入力を受け取る
location = input("今どこにいるのですか?")

# 天気APIのURL
url = f"http://api.openweathermap.org/data/2.5/weather?q={location}&appid=YOUR_API_KEY"

# 天気APIから天気情報を取得
response = requests.get(url)
data = json.loads(response.text)

# 天気情報を表示
print(f"{location}の天気は、{data['weather'][0]['description']}です。")
```

この例では、OpenWeatherMap APIを使用して天気情報を取得しています。APIキーは、自分のアカウントで取得できます。
Tool Calls:
  AskHuman (1111)
 Call ID: 1111
  Args:

わかりづらいのですが、ask_humanの次のノードであるactionから処理が再開されました。
(そして、またask_humanノードまでまた処理が進んで停止)

まとめ

Human-in-the-loop処理の一環として、人への確認を行い、その内容を使ってツールを呼び出すという流れを実行しました。
(今回の改修内容だと、実際のツール呼び出し処理を判定させていないので、イマイチわかりづらいですが。。。)
人間判断を含めることはよくあることだと思うので、活用しやすいテクニックの一つかなと思います。

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?