1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

生成AIに関する記事を書こう!
Qiita Engineer Festa20242024年7月17日まで開催中!

DatabricksとLangGraphで学ぶAgenticアプローチ: ノードの並列実行

Posted at

導入

LangGraphのHow-to Guideウォークスルーの2回目です。

今回は、こちらの内容である「ノードを並列実行するためのブランチを作成」をしてみます。

検証はDatabricks on AWS、DBRは15.3MLを使っています。

ノードの並列実行とは

ノードの並列実行は、文字通りグラフの流れを複数に分岐させ、複数のノードを並列に実行する機能です。
例えば、RAGで複数バリエーションの検索を行うようなユースケースで、検索を並列実行することでレイテンシの削減など、効率的に処理を実行することができます。

今回はそのやり方を中心とした内容です。

以下、公式ドキュメントの序文を邦訳。

ノードを並列実行するためのブランチを作成する方法

LangGraphは、通常のエッジまたはconditional_edgesを使用したファンアウトとファンインをネイティブにサポートしています。

これにより、ノードを並列に実行して、グラフ全体の実行を高速化できます。

以下は、自分に合った分岐データフローの作成を追加する方法を示す例です。

次より、公式ドキュメントで示されている例をDatabricks上で実装・実行します。
なお、LangSmithでのトレーシングをする部分などは省略し、逆にMLflow Tracingが使えるように変更しています。

Step1. パッケージインストール

LangGraphやLangChainなど、必要なパッケージをインストール。

%pip install -U langgraph==0.1.4 langchain==0.2.6 langchain-community==0.2.6 mlflow-skinny[databricks]==2.14.1 pydantic==2.7.4
dbutils.library.restartPython()

Step2. 並列ノードのファンアウトとファンイン

まず、単純にグラフの流れを分岐(ファンアウト)し、二つのノードを並列実行した上で、また一つの流れに戻す(ファンイン)のサンプルを実行します。

import operator
from typing import Annotated, Any
from typing_extensions import TypedDict
from langgraph.graph import StateGraph

import mlflow


class State(TypedDict):
    # The operator.add reducer fn makes this append-only
    aggregate: Annotated[list, operator.add]


class ReturnNodeValue:
    def __init__(self, node_secret: str):
        self._value = node_secret

    @mlflow.trace(name="ReturnNodeValue", span_type="node")
    def __call__(self, state: State) -> Any:
        print(f"Adding {self._value} to {state['aggregate']}")
        return {"aggregate": [self._value]}


builder = StateGraph(State)
builder.add_node("a", ReturnNodeValue("I'm A"))
builder.set_entry_point("a")
builder.add_node("b", ReturnNodeValue("I'm B"))
builder.add_node("c", ReturnNodeValue("I'm C"))
builder.add_node("d", ReturnNodeValue("I'm D"))
builder.add_edge("a", "b")
builder.add_edge("a", "c")
builder.add_edge("b", "d")
builder.add_edge("c", "d")
builder.set_finish_point("d")
graph = builder.compile()

作成したグラフを可視化します。

from IPython.display import Image, display

display(Image(graph.get_graph().draw_mermaid_png()))

image.png

では、実行してみましょう。

with mlflow.start_span("graph", span_type="AGENT") as span:
    result = graph.invoke(
        {
            "aggregate": [],
        },
        {
            "configurable": {"thread_id": "foo"},
        },
    )

    print()
    print("--- 結果 ---")
    print(result)
出力
Adding I'm A to []
Adding I'm B to ["I'm A"]
Adding I'm C to ["I'm A"]
Adding I'm D to ["I'm A", "I'm B", "I'm C"]

--- 結果 ---
{'aggregate': ["I'm A", "I'm B", "I'm C", "I'm D"]}

エッジを単純につなげるだけで並列実行ができるようです。
かなり容易ですね。

例外処理について、ドキュメントの中で補足されていますので、邦訳して記載します

LangGraphはノードを「スーパーステップ」内で実行するため、並列分岐は並列に実行されますが、スーパーステップ全体はトランザクションです。これらの分岐のいずれかで例外が発生した場合、更新は状態には適用されません

エラーが発生しやすい場合(不安定なAPI呼び出しを処理したい場合)、LangGraphはこれに対処するための2つの方法を提供します。

  1. ノード内に通常のPythonコードを記述して、例外をキャッチして処理できます。
  2. retry_policyを設定して、特定の種類の例外を発生させるノードを再試行するようにグラフに指示できます。失敗したブランチのみが再試行されるため、冗長な作業の実行について心配する必要はありません。

これらを組み合わせることで、並列実行を実行し、例外処理を完全に制御できます。

Step3. 並列ノードのファンアウトとファンイン(追加ステップあり)

ここまででノードの並列実行が出来ました。
一方、公式ドキュメントは、次のように続きます。

上記の例は、各パスが 1 ステップのみの場合にファンアウトおよびファンインする方法を示しています。しかし、1 つのパスに複数のステップがある場合はどうでしょうか。

先ほどは分岐後、各1ノードが実行され、分岐が集約されました。
では、分岐後、ノードを複数実行するとしたら?というケースのサンプルが以下となります。

import operator
from typing import Annotated
from typing_extensions import TypedDict
from langgraph.graph import StateGraph
from IPython.display import Image, display

class State(TypedDict):
    # operator.addのreducer関数を使用することで、リストに結果を追加していきます
    aggregate: Annotated[list, operator.add]


builder = StateGraph(State)
builder.add_node("a", ReturnNodeValue("I'm A"))
builder.set_entry_point("a")
builder.add_node("b", ReturnNodeValue("I'm B"))
builder.add_node("b2", ReturnNodeValue("I'm B2"))
builder.add_node("c", ReturnNodeValue("I'm C"))
builder.add_node("d", ReturnNodeValue("I'm D"))
builder.add_edge("a", "b")
builder.add_edge("a", "c")
builder.add_edge("b", "b2")
builder.add_edge(["b2", "c"], "d")
builder.set_finish_point("d")
graph = builder.compile()

display(Image(graph.get_graph().draw_mermaid_png()))

image.png

実行してみましょう。

with mlflow.start_span("graph", span_type="AGENT") as span:
    result = graph.invoke(
        {
            "aggregate": [],
        },
        {
            "configurable": {"thread_id": "foo"},
        },
    )

    print()
    print("--- 結果 ---")
    print(result)
出力
Adding I'm A to []
Adding I'm B to ["I'm A"]
Adding I'm C to ["I'm A"]
Adding I'm B2 to ["I'm A", "I'm B", "I'm C"]
Adding I'm D to ["I'm A", "I'm B", "I'm C", "I'm B2"]

--- 結果 ---
{'aggregate': ["I'm A", "I'm B", "I'm C", "I'm B2", "I'm D"]}

ファンアウト(分岐後)に複数ステップがある場合、最後のファンイン(ノードの集約)のときに、builder.add_edge(["b2", "c"], "d")のような形でエッジを複数のノードから繋げるとよいようです。

Step4. 条件分岐

ここまでは、分岐・並列実行することが確定しているときのやり方でした。
一方で、並列実行のための分岐自体が静的に確定していないときのやり方も公式ドキュメントで解説されています。

ファンアウトが確定的でない場合は、add_conditional_edgesを直接使用できます。

条件分岐が後でルーティングする既知の "シンク" ノードがある場合は、then=<final-node-name>を条件付きエッジの作成時に指定できます。

ということのようです。
add_conditional_edgesを指定して、条件によって並列実行するルートを切り替えてみましょう。

import operator
from typing import Annotated, Sequence
from typing_extensions import TypedDict
from langgraph.graph import END, START, StateGraph
from IPython.display import Image, display

class State(TypedDict):
    aggregate: Annotated[list, operator.add]
    which: str

# b, c, dの3つのうち、条件に応じて実行するノードを変更するグラフを作成
builder = StateGraph(State)
builder.add_node("a", ReturnNodeValue("I'm A"))
builder.add_edge(START, "a")
builder.add_node("b", ReturnNodeValue("I'm B"))
builder.add_node("c", ReturnNodeValue("I'm C"))
builder.add_node("d", ReturnNodeValue("I'm D"))
builder.add_node("e", ReturnNodeValue("I'm E"))


@mlflow.trace(span_type="conditional_edge")
def route_bc_or_cd(state: State) -> Sequence[str]:
    """ b, c, dのうち、条件に応じて実行するノードを変更 """
    if state["which"] == "cd":
        return ["c", "d"]
    return ["b", "c"]

# ここがポイント。条件に応じて実行するノードを切り替える。
# 今回は単一のノードが選ばれるわけではなく、複数のノードが選択される
intermediates = ["b", "c", "d"]
builder.add_conditional_edges(
    "a",
    route_bc_or_cd,
    intermediates,
)
for node in intermediates:
    builder.add_edge(node, "e")


builder.add_edge("e", END)
graph = builder.compile()

display(Image(graph.get_graph().draw_mermaid_png()))

image.png

では、条件を変えて実行してみます。

まずは、b,cノードを選択して実行するパターン。

# b, cが選択されるパターン
with mlflow.start_span("graph", span_type="AGENT") as span:
    graph.invoke({"aggregate": [], "which": "bc"})
出力
Adding I'm A to []
Adding I'm B to ["I'm A"]
Adding I'm C to ["I'm A"]
Adding I'm E to ["I'm A", "I'm B", "I'm C"]

b, cが並列で実行され、dは実行されませんでした。

では、c,dノードを選択して実行するパターン。

# c, dが選択されるパターン
with mlflow.start_span("graph", span_type="AGENT") as span:
    graph.invoke({"aggregate": [], "which": "cd"})
出力
Adding I'm A to []
Adding I'm C to ["I'm A"]
Adding I'm D to ["I'm A"]
Adding I'm E to ["I'm A", "I'm C", "I'm D"]

c, dが実行されていますね。

条件に応じて、並列実行するノードを選択できました。
例えばRAGを構築するときに、検索するデータソースが複数あるときに、その中から有用なものを複数選んで検索するときなどに使えそうですね。

Step5. Stable Sorting(静的ソート)

続けて、公式ドキュメントではStable Sortingについて解説されています。
各ノードの結果を「状態(State)」へ書き込む際に、書き込み順序を制御する方法のようです。

ファンアウトすると、ノードは単一の「スーパーステップ」として並列に実行されます。各スーパーステップからの更新は、スーパーステップが完了すると、すべて順番に状態に適用されます。

並列スーパーステップからの更新の一貫性のある事前定義された順序が必要な場合は、出力を (識別キーと共に) 状態内の別のフィールドに書き込み、各ファンアウト ノードから通常のエッジをランデブー ポイントに追加して、それらを "シンク" ノードで結合する必要があります。

たとえば、並列ステップの出力を「信頼性」で並べ替えるとします。

確かに、並列処理の結果を一つにマージするときに順序が大事になるケースはありえそうですね。
以下がその例となります。

import operator
from typing import Annotated, Sequence
from typing_extensions import TypedDict
from langgraph.graph import StateGraph
from IPython.display import Image, display


@mlflow.trace(span_type="operator")
def reduce_fanouts(left, right):
    if left is None:
        left = []
    if not right:
        # Overwrite
        return []
    return left + right


class State(TypedDict):
    # The operator.add reducer fn makes this append-only
    aggregate: Annotated[list, operator.add]
    fanout_values: Annotated[list, reduce_fanouts]
    which: str


builder = StateGraph(State)
builder.add_node("a", ReturnNodeValue("I'm A"))
builder.set_entry_point("a")


class ParallelReturnNodeValue:
    def __init__(
        self,
        node_secret: str,
        reliability: float,
    ):
        self._value = node_secret
        self._reliability = reliability

    @mlflow.trace(name="ParallelReturnNodeValue", span_type="node")
    def __call__(self, state: State) -> Any:
        print(f"Adding {self._value} to {state['aggregate']} in parallel.")
        return {
            "fanout_values": [
                {
                    "value": [self._value],
                    "reliability": self._reliability,
                }
            ]
        }

builder.add_node("b", ParallelReturnNodeValue("I'm B", reliability=0.9))
builder.add_node("c", ParallelReturnNodeValue("I'm C", reliability=0.1))
builder.add_node("d", ParallelReturnNodeValue("I'm D", reliability=0.3))

@mlflow.trace(span_type="node")
def aggregate_fanout_values(state: State) -> Any:
    # reliability(信頼性)の高い順に並び変える
    ranked_values = sorted(
        state["fanout_values"], key=lambda x: x["reliability"], reverse=True
    )
    return {
        "aggregate": [x["value"] for x in ranked_values] + ["I'm E"],
        "fanout_values": [],
    }

# ノードeにおいて、並び替えを実行
builder.add_node("e", aggregate_fanout_values)

# 条件分岐
@mlflow.trace(span_type="conditional_edge")
def route_bc_or_cd(state: State) -> Sequence[str]:
    if state["which"] == "cd":
        return ["c", "d"]
    return ["b", "c"]


intermediates = ["b", "c", "d"]
builder.add_conditional_edges("a", route_bc_or_cd, intermediates)

for node in intermediates:
    builder.add_edge(node, "e")

builder.set_finish_point("e")
graph = builder.compile()

display(Image(graph.get_graph().draw_mermaid_png()))

image.png

ノードb, c, dは出力としてReliability(信頼性)という値を持ちます。
今回の設定としては、b, d, cの順でReliabilityが高くなっています。

では、実行してみましょう。

まず、ノードb, cが選択されるケースです。

# b, cが選択されるパターン
# bの方がreliabilityが高いため、状態aggregateには先に追加される
with mlflow.start_span("graph", span_type="AGENT") as span:
    result = graph.invoke({"aggregate": [], "which": "bc", "fanout_values": []})
    print(result)
出力
Adding I'm A to []
Adding I'm B to ["I'm A"] in parallel.
Adding I'm C to ["I'm A"] in parallel.
{'aggregate': ["I'm A", ["I'm B"], ["I'm C"], "I'm E"], 'fanout_values': [], 'which': 'bc'}

実行結果として、aggregateへはbノードの出力が先に記録され、次にcノードの出力が記録されています。

では、c, dノードが選択されるパターンを実行してみます。

# c, dが選択されるパターン
# dの方がreliabilityが高いため、状態aggregateには先に追加される
with mlflow.start_span("graph", span_type="AGENT") as span:
    result = graph.invoke({"aggregate": [], "which": "cd"})
    print(result)
出力
Adding I'm A to []
Adding I'm C to ["I'm A"] in parallel.
Adding I'm D to ["I'm A"] in parallel.
{'aggregate': ["I'm A", ["I'm D"], ["I'm C"], "I'm E"], 'fanout_values': [], 'which': 'cd'}

こちらははdノードの出力が先に記録され、次にcノードの出力が記録されています。
これはReliabilityの順番でちゃんと記録されているということになります。

このように、並列動作時の状態記録の順番を制御することもできるようです。

あまり例が思いつかないのですが、チャット履歴を複数のノードから集めてくるときに、より重要なものを先にメッセージとして追加するようなケースが想定されるのでしょうか。

まとめ

LangGraphの並列実行ノードの作成について、公式ドキュメントの内容をウォークスルーしてみました。

ノードを並列で実行できるんだ、程度の理解を最初に持っていましたが、状態制御含めて奥が深いですね。
もう少し具体的なユースケースを理解しながら活用してみたいと思います。

1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?