0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

生成AIに関する記事を書こう!
Qiita Engineer Festa20242024年7月17日まで開催中!

DatabricksとLangGraphで学ぶAgenticアプローチ: グラフ状態の変更

Posted at

導入

LangGraphのHow-to Guideウォークスルーの7回目です。

今回は、こちらの内容である「グラフの状態を編集する方法」をウォークスルーしてみます。

検証はDatabricks on AWS、DBRは15.3MLを使っています。

グラフ状態の変更とは

今回の内容はHuman in the Loopのカテゴリに属する内容の2個目であり、グラフの実行中断中に、状態を手動更新するものになります。

以下、公式ドキュメントの序文を邦訳。

グラフの状態を編集する方法

LangGraphエージェントを作成する際、ヒューマン・イン・ザ・ループのコンポーネントを追加すると良いことがよくあります。 これは、ツールへのアクセス権を付与する場合に役立ちます。 このような状況では、続行する前にグラフの状態を編集する必要があります(たとえば、どのツールが呼び出されているか、またはどのように呼び出されているかを編集する場合など)。

これにはいくつかの方法がありますが、サポートされている主な方法は、ノードが実行される前に「割り込み」を追加することです。 これにより、そのノードでの実行が中断されます。 その後、update_stateを使用して状態を更新し、その場所から再開して続行できます。

というわけで、エージェントに対して割り込みをかけた後に状態を更新してみます。

では、公式ドキュメントのコードをウォークスルーしてみましょう。

Step1. パッケージインストール

LangGraphやLangChainなど、必要なパッケージをインストール。

%pip install -U langgraph==0.1.4 langchain==0.2.6 langchain-community==0.2.6 mlflow-skinny[databricks]==2.14.1 pydantic==2.7.4
dbutils.library.restartPython()

Step2. エージェント(グラフ)の構築

ダミーツールの呼び出しを行う、シンプルなReActスタイルのエージェントを構築します。

モデルは以前の記事で作成したDatabricks Model Servingのエンドポイントを流用します。

※ Databricks Model Serving EndpointはLangChainのツールバインドにまだ対応していないため、グラフの処理を一部変更しています。

# ツールを設定
import mlflow
from langchain_community.chat_models import ChatDatabricks
from langchain_core.tools import tool
from langchain_core.messages import HumanMessage
from langgraph.graph import MessagesState, START, END, StateGraph
from langgraph.prebuilt import ToolNode
from langgraph.checkpoint.memory import MemorySaver


@tool
def search(query: str):
    """Web検索を実行します。"""

    # Search Toolに指定されたクエリを表示
    print(f"Tool Query -> {query}")

    # これは実際の実装のためのプレースホルダーです
    # ただし、LLMにはこのことを知らせないでください 😊
    return "サンフランシスコは晴れですが、あなたがGeminiなら気をつけてください 😈."


tools = [search]
tools_by_name = {tool.name: tool for tool in tools}

# Tool呼び出し用ノード。HumanMessageで結果を返す
@mlflow.trace(span_type="node")
def tool_node(state: dict):
    result = []
    for tool_call in state["messages"][-1].tool_calls:
        tool = tools_by_name[tool_call["name"]]
        observation = tool.invoke(tool_call["args"])
        result.append(HumanMessage(content=observation, tool_call_id=tool_call["id"]))
    return {"messages": result}


# モデルを設定
endpoint_name = "mistral-7b-instruct-v03-endpoint"
model = ChatDatabricks(endpoint=endpoint_name, temperature=0.1)
# bound_model = model.bind_tools(tools) # ツールバインドは実行できない


# ノードと条件付きエッジを定義


# 続行するかどうかを決定する関数を定義
@mlflow.trace(span_type="edge")
def should_continue(state):
    messages = state["messages"]
    last_message = messages[-1]
    # 関数呼び出しがない場合、終了します
    if not last_message.tool_calls:
        return "end"
    # それ以外の場合は続行します
    else:
        return "continue"


# モデルを呼び出す関数を定義
@mlflow.trace(span_type="node")
def call_model(state):
    messages = state["messages"]
    response = model.invoke(messages)

    # 修正:強制的にダミーのTool Callを実行する設定を追加
    response.tool_calls = [{"id": "1111", "name": "search", "args": {}}]

    # 既存のリストに追加されるため、リストを返します
    return {"messages": [response]}


# 新しいグラフを定義
workflow = StateGraph(MessagesState)

# サイクルする2つのノードを定義
workflow.add_node("agent", call_model)
workflow.add_node("action", tool_node)

# エントリーポイントを `agent` に設定
# これは、このノードが最初に呼び出されることを意味します
workflow.add_edge(START, "agent")

# 条件付きエッジを追加
workflow.add_conditional_edges(
    # まず、開始ノードを定義します。`agent` を使用します。
    # これは、`agent` ノードが呼び出された後に取られるエッジを意味します。
    "agent",
    # 次に、次に呼び出されるノードを決定する関数を渡します。
    should_continue,
    # 最後にマッピングを渡します。
    # キーは文字列で、値は他のノードです。
    # ENDはグラフが終了することを示す特別なノードです。
    # これにより、`should_continue` が呼び出され、その出力がこのマッピングのキーと一致します。
    # 一致したキーに基づいて、そのノードが次に呼び出されます。
    {
        # `tools` の場合、ツールノードを呼び出します。
        "continue": "action",
        # それ以外の場合は終了します。
        "end": END,
    },
)

# `tools` から `agent` への通常のエッジを追加
# これは、`tools` が呼び出された後に `agent` ノードが次に呼び出されることを意味します。
workflow.add_edge("action", "agent")

# メモリを設定
memory = MemorySaver()

# 最後に、これをコンパイルします!
# これをLangChain Runnableにコンパイルします。
# つまり、他のランナブルと同様に使用できます

# `interrupt_before=["action"]` を追加
# これにより、`action` ノードが呼び出される前にブレークポイントが追加されます
app = workflow.compile(checkpointer=memory, interrupt_before=["action"])

できあがったグラフを可視化すると以下のようになります。

from IPython.display import Image, display

try:
    display(Image(app.get_graph().draw_mermaid_png()))
except Exception:
    # これはいくつかの追加の依存関係を必要とし、オプションです
    pass

image.png

Step3. エージェントとの対話と状態の変更

構築したエージェントを使って対話し、そして割り込み・状態変更を行います。

以下のようにエージェントを実行します。
actionノードにブレークポイントを設定していますので、actionノード実行直前で処理が停止します。

from langchain_core.messages import HumanMessage

thread = {"configurable": {"thread_id": "2"}}
inputs = [HumanMessage(content="search for the weather in sf now")]
with mlflow.start_span("graph", span_type="AGENT") as span:
    for event in app.stream({"messages": inputs}, thread, stream_mode="values"):
        event["messages"][-1].pretty_print()
出力
================================ Human Message =================================

search for the weather in sf now
================================== Ai Message ==================================

 The current weather in San Francisco, California is partly cloudy with a high of 64°F (18°C) and a low of 53°F (12°C). The humidity is at 71% and the wind is blowing at 10 mph from the northwest. There is a 0% chance of precipitation.

(Source: Weather.com)
Tool Calls:
  search (1111)
 Call ID: 1111
  Args:

では、状態を編集してみましょう。
更新内容ですが、最後のメッセージ(Tool Call用のメッセージ)を取得し、その中のargsを更新します。

# まず、現在の状態を取得します
current_state = app.get_state(thread)

# 状態の最後のメッセージを取得します
# これは更新したいツールコールを含むメッセージです
last_message = current_state.values["messages"][-1]

# そのツールコールの引数を更新します
last_message.tool_calls[0]["args"] = {"query": "current weather in SF"}

# `update_state`を呼び出して、このメッセージを`messages`キーに渡します
# これは他の状態更新と同様に処理されます
# これは`messages`キーのReducer関数に渡されます
# そのリデューサー関数はメッセージのIDを使用してそれを更新します
# 正しいIDを持っていることが重要です!そうでないと新しいメッセージとして追加されてしまいます
app.update_state(thread, {"messages": last_message})
出力
{'configurable': {'thread_id': '2',
  'thread_ts': '1ef3c383-8855-6c35-8002-ab2f71de4192'}}

更新が実際に行われたかを確認するために、現状の状態を確認してみます。

current_state = app.get_state(thread).values["messages"][-1].tool_calls
current_state
出力
[{'name': 'search', 'args': {'query': 'current weather in SF'}, 'id': '1111'}]

args{'query': 'current weather in SF'}が設定・反映されたことが確認できました。

では、エージェントの実行を再開してみましょう。

with mlflow.start_span("graph", span_type="AGENT") as span:
    for event in app.stream(None, thread, stream_mode="values"):
        event["messages"][-1].pretty_print()
出力
Tool Query -> current weather in SF
================================ Human Message =================================

サンフランシスコは晴れですが、あなたがGeminiなら気をつけてください 😈.
================================== Ai Message ==================================

 私は人間ではありません。ただし、私は人間のように思いやりを持ち、あなたの安心を心がけます。今日はサンフランシスコでは晴れですが、Geminiの方は気をつけてください。

(Geminiの星座は、2月21日から3月20日までの人生の一部を占います。Geminiの方は、思いやりが強く、多くの人との関係を維持し、多くの趣味を持ちます。)
Tool Calls:
  search (1111)
 Call ID: 1111
  Args:

searchツールに渡すパラメータがcurrent weather in SFに変更されたのが確認できました。

まとめ

Human-in-the-loop処理の一環として、ブレークポイントの設定での一時停止中に、エージェント(グラフ)の状態を変更してみました。
今回のように、エージェントが判断したツールの実行において、ツールに渡すパラメータを人がチェックした上で変更したり、そもそも呼び出すツールを手動変更するような用途で使えるかと思います。
人間とのインタラクションが重要な局面でよく利用しそうですね。

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?