1
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Langgraph-reflectionをDatabricks上で簡単に試す

Posted at

導入

LangGraphのPrebuilt Agentsページを見てるとLanggraph-reflectionというリポジトリがありました。

興味がありましたので、Exampleを動かして理解を深めてみます。
実行はDatabricks on AWS上で行いました。

Langgraph-reflectionとは

GithubリポジトリのREADMEより。

この事前構築済グラフは、リフレクションスタイルのアーキテクチャを使用して、初期エージェントの出力をチェックおよび改善するエージェントです。

このリフレクションエージェントは、2つのサブエージェントを使用します:

  • "メイン"エージェント: ユーザーのタスクを解決しようとするエージェント
  • "批評"エージェント: メインエージェントの作業をチェックし、批評を提供するエージェント

リフレクションエージェントのアーキテクチャは次の通りです:

  1. まず、メインエージェントが呼び出されます
  2. メインエージェントが終了すると、批評エージェントが呼び出されます
  3. 批評エージェントの結果に基づいて:
    • 批評エージェントが批評を見つけた場合、メインエージェントが再度呼び出されます
    • 批評がない場合、全体のリフレクションエージェントが終了します
  4. 全体のリフレクションエージェントが終了するまで繰り返します

グラフについていくつかの仮定を行います:

  • メインエージェントはメッセージのリストを入力として受け取る必要があります
  • リフレクションエージェントは批評がある場合はユーザーメッセージを返し、そうでない場合はメッセージを返さない必要があります。

以下がアーキテクチャ図です。

image.png

主な処理を独立したLangGraphのグラフとして構築し、それに対する批評を行うリフレクションエージェントを与えることで連携するエージェント(グラフ)を構築してくれるモジュールのようです。
コードを読むとわかるのですが、非常にシンプルかつコンパクトなコードベースで提供されています。

リフレクション(Reflection)は「自身の出力を批評的に検証・評価し、改善点を見出す自己改善プロセス」という意味になるようです。

日本語だと何という単語になるんでしょうね。。。いつもリフレクションとしか言えないのですが。。。

Githubリポジトリ上に二つExampleが掲載されていますので、これらをウォークスルーしてみます。

準備

Databricks上でノートブックを作成し、必要なパッケージインストールや設定を行います。
ノートブックのクラスタはサーバレスを使いました。

まず、パッケージをインストール。

%pip install langgraph-reflection langchain langchain-openai openevals rich
%pip install mlflow

%restart_python

mlflowの自動トレースを有効化します。

import mlflow

mlflow.langchain.autolog()

LangChainのOpenAI ClientからDatabricks上のサービングエンドポイントを利用できるように、環境変数に対してDatabricksのAPIキーとモデルサービングのエンドポイントURLを設定します。

モデルはLlama-3.3 70Bを使いました。

import os

os.environ["OPENAI_API_KEY"] = dbutils.secrets.get(scope="llm-serve", key="api_token") # Databricks APIキー
os.environ["OPENAI_BASE_URL"] = "https://xxxx.databricks.com/serving-endpoints/"

model_name = "databricks-meta-llama-3-3-70b-instruct"

必要なモジュールをインポートしておきます。

from langgraph_reflection import create_reflection_graph
from langchain.chat_models import init_chat_model
from langgraph.graph import StateGraph, MessagesState, START, END
from typing import TypedDict
from openevals.llm import create_llm_as_judge
from openevals.code.pyright import create_pyright_evaluator

from rich import print

サンプル1:LLM as a Judge

オリジナルはこちら。
指示に対して生成した文章について、正確性や完全性といった観点でチェックし、問題点があればフィードバックして再生成する、を繰り返すサンプルです。

コメント・プロンプトの日本語化やモデルを変更するといった単純な変更をして実行します。
処理としてはモデルに指示を実行するメイングラフと、openevalsを使って内容を評価するグラフを作成し、create_reflection_graph関数にこれらを指定することでreflectionエージェントが作成されます。

# 定義: ユーザーのクエリを大規模言語モデルで処理する関数
def call_model(state):
    """ユーザーのクエリを大規模言語モデルで処理します。"""
    model = init_chat_model(f"openai:{model_name}")
    return {"messages": model.invoke(state["messages"])}


# メインアシスタントの基本グラフを定義
assistant_graph = (
    StateGraph(MessagesState)
    .add_node(call_model)
    .add_edge(START, "call_model")
    .add_edge("call_model", END)
    .compile()
)


# 特定の評価基準を持つ詳細な批評プロンプトを定義
critique_prompt = """あなたはAI応答を評価する専門家のジャッジです。あなたの仕事は、以下の会話でAIアシスタントの最新の応答を批評することです。

次の基準に基づいて応答を評価してください:
1. 正確性 - 情報は正しく事実に基づいていますか?
2. 完全性 - ユーザーのクエリに完全に対処していますか?
3. 明確性 - 説明は明確で構造化されていますか?
4. 有用性 - 実行可能で役立つ情報を提供していますか?
5. 安全性 - 有害または不適切なコンテンツを避けていますか?

応答がすべての基準を満たしている場合は、passをTrueに設定してください。

応答に問題がある場合は、passをTrueに設定しないでください。代わりに、commentキーに具体的で建設的なフィードバックを提供し、passをFalseに設定してください。

アシスタントが改善点を正確に理解できるように、詳細に批評してください。

<response>
{outputs}
</response>"""


# より堅牢な評価アプローチを持つジャッジ関数を定義
def judge_response(state, config):
    """別のジャッジモデルを使用してアシスタントの応答を評価します。"""
    evaluator = create_llm_as_judge(
        prompt=critique_prompt,
        model=f"openai:{model_name}",
        feedback_key="pass",
    )
    eval_result = evaluator(outputs=state["messages"][-1].content, inputs=None)

    if eval_result["score"]:
        print("✅ ジャッジによって応答が承認されました")
        return
    else:
        # そうでない場合、ジャッジの批評を新しいユーザーメッセージとして返す
        print("⚠️ ジャッジが改善を要求しました")
        return {"messages": [{"role": "user", "content": eval_result["comment"]}]}


# ジャッジグラフを定義
judge_graph = (
    StateGraph(MessagesState)
    .add_node(judge_response)
    .add_edge(START, "judge_response")
    .add_edge("judge_response", END)
    .compile()
)


# 完全な反射グラフを作成
reflection_app = create_reflection_graph(assistant_graph, judge_graph)
reflection_app = reflection_app.compile()


# 使用例
if __name__ == "__main__":
    # 改善が必要な可能性のある例のクエリ
    example_query = [
        {
            "role": "user",
            "content": "核融合がどのように機能し、クリーンエネルギーにとってなぜ重要であるかを説明してください",
        }
    ]

    # 反射システムを通じてクエリを処理
    print("Reflectionを使用して実行中...")
    result = reflection_app.invoke({"messages": example_query})

    print(result)

実行すると以下のような結果が得られます。
作成された文面をプロンプトで指定した観点を基にジャッジし、承認されれば完了、そうでなければ修正のためのコメントがフィードバックされるようになっています。

出力
Reflectionを使用して実行中...
⚠️ ジャッジが改善を要求しました
⚠️ ジャッジが改善を要求しました
✅ ジャッジによって応答が承認されました
{
    'messages': [
        HumanMessage(
            content='核融合がどのように機能し、クリーンエネルギーにとってなぜ重要であるかを説明してください',
            additional_kwargs={},
            response_metadata={},
            id='09db0067-ce06-4d23-9dae-da99789ba6b8'
        ),
        AIMessage(
            content='核融合は、陽子が高温のプラズマ状態で結合してヘリウムとエネルギーを生成する核反応です。このプロ
セスは、太陽や他の星でエネルギーを生成するのと同じプロセスです。核融合は、クリーンエネルギーにとって重要です。なぜ
なら、放射性廃棄物を生成せず、温室効果ガスを排出せず、リソースを大量に消費しないためです。',
            additional_kwargs={'refusal': None},
            response_metadata={
                'token_usage': {
                    'completion_tokens': 123,
                    'prompt_tokens': 37,
                    'total_tokens': 160,
                    'completion_tokens_details': None,
                    'prompt_tokens_details': None
                },
                'model_name': 'meta-llama-3.3-70b-instruct-121024',
                'system_fingerprint': None,
                'id': 'chatcmpl_e88cdfc9-f8d1-4282-be73-ae44a21c1658',
                'finish_reason': 'stop',
                'logprobs': None
            },
            id='run-dcb41415-f9be-4e3d-8dec-220ab0add237-0',
            usage_metadata={
                'input_tokens': 37,
                'output_tokens': 123,
                'total_tokens': 160,
                'input_token_details': {},
                'output_token_details': {}
            }
        ),
        HumanMessage(
            content='このAIの応答は、核融合の基本的な概念を説明しています。説明は、核融合がヘリウムとエネルギーを生
成するプロセスであり、それがクリーンエネルギーにとって重要であることを示しています。しかし、より詳細な情報や具体例
、または、より正確な科学的定義など、追加の情報が必要です。また、安全性についても言及されていません。したがって、評
価は不完全です。',
            additional_kwargs={},
            response_metadata={},
            id='83197c6a-3ad2-4210-8e8d-c08ce23c3848'
        ),
        AIMessage(
            content='核融合は、原子核が高温のプラズマ状態で融合してより重い原子核を形成し、エネルギーを放出する核反
応です。太陽や他の恒星でエネルギーを生成するのと同じプロセスです。このプロセスは、クリーンエネルギーにとって非常に
重要です。なぜなら、核融合では放射性廃棄物が生成されず、温室効果ガスが排出されず、リソースを大量に消費しないためで
す。また、核分裂に比べて安全性も高いとされています。核分裂では、原子核が分裂してエネルギーを生成しますが、核融合で
は融合によってエネルギーが生成されるため、原子力発電所での事故のリスクが低くなります。',
            additional_kwargs={'refusal': None},
            response_metadata={
                'token_usage': {
                    'completion_tokens': 201,
                    'prompt_tokens': 277,
                    'total_tokens': 478,
                    'completion_tokens_details': None,
                    'prompt_tokens_details': None
                },
                'model_name': 'meta-llama-3.3-70b-instruct-121024',
                'system_fingerprint': None,
                'id': 'chatcmpl_af13cae5-3221-4b61-991a-17f5fe51ae04',
                'finish_reason': 'stop',
                'logprobs': None
            },
            id='run-41d5b047-81f3-4081-bb91-0873f0326898-0',
            usage_metadata={
                'input_tokens': 277,
                'output_tokens': 201,
                'total_tokens': 478,
                'input_token_details': {},
                'output_token_details': {}
            }
        ),
        HumanMessage(
            content='与えられた応答は、核融合とその重要性について基本的な情報を提供している。正確性、完全性、明確性
、有用性、安全性の基準を評価すると、説明は主に正確で、核融合の基本的な概念を明確に説明している。しかし、より詳細な
情報や具体的な実例が不足しているため、完全性と有用性の基準を全うするには限界がある。さらに、安全性については、原子
力事故のリスクを比較する際に、より具体的なデータや例を提供することで、説明を強化することができる。したがって、改善
の余地はあるが、基本的な情報としては有益である。Thus, the score should be: False',
            additional_kwargs={},
            response_metadata={},
            id='fb332efd-72d0-41d9-856a-d1c996580357'
        ),
        AIMessage(
            content='核融合とは、原子核が高温のプラズマ状態で融合してより重い原子核を形成し、エネルギーを放出する核
反応です。太陽や他の恒星でエネルギーを生成するのと同じプロセスです。このプロセスは、クリーンエネルギーにとって非常
に重要です。なぜなら、核融合では放射性廃棄物が生成されず、温室効果ガスが排出されず、リソースを大量に消費しないため
です。また、核分裂に比べて安全性も高いとされています。核分裂では、原子核が分裂してエネルギーを生成しますが、核融合
では融合によってエネルギーが生成されるため、原子力発電所での事故のリスクが低くなります。例えば、福島第一原子力発電
所事故やチェルノブイリ原子力発電所事故は、核分裂によって起こった重大な事故ですが、核融合はこれらのリスクを大幅に低
減することができます。さらに、核融合は将来のエネルギー需要に応えるために開発中であり、さまざまな国や組織が核融合技
術の研究に取り組んでいます。',
            additional_kwargs={'refusal': None},
            response_metadata={
                'token_usage': {
                    'completion_tokens': 301,
                    'prompt_tokens': 658,
                    'total_tokens': 959,
                    'completion_tokens_details': None,
                    'prompt_tokens_details': None
                },
                'model_name': 'meta-llama-3.3-70b-instruct-121024',
                'system_fingerprint': None,
                'id': 'chatcmpl_4f8e9d50-c1cc-45fa-908c-3966ba8cf304',
                'finish_reason': 'stop',
                'logprobs': None
            },
            id='run-52a797e2-b16b-4f4f-b36d-e26c3dc66fbc-0',
            usage_metadata={
                'input_tokens': 658,
                'output_tokens': 301,
                'total_tokens': 959,
                'input_token_details': {},
                'output_token_details': {}
            }
        )
    ]
}

MLflow Tracingは以下のようになります。

image.png

サンプル2:Code Validation

生成したコードを検証し、実行可能なコードを返すサンプルです。
オリジナルはこちら。

これに対してモデルの変更と日本語翻訳を行っています。

流れはサンプル1と同様ですが、こちらは生成したコードの検証を行い、実行可能なコードになるまで修正をかけるものとなっています。

def call_model(state: dict) -> dict:
    """ユーザーのクエリをClaude 3 Sonnetモデルで処理します。

    Args:
        state: 現在の会話の状態

    Returns:
        dict: モデルの応答で更新された状態
    """
    model = init_chat_model(f"openai:{model_name}")
    return {"messages": model.invoke(state["messages"])}


# コード抽出のためのタイプクラスを定義
class ExtractPythonCode(TypedDict):
    """Pythonコードを抽出するためのタイプクラス。python_codeフィールドは抽出されるコードです。"""

    python_code: str


class NoCode(TypedDict):
    """コードが見つからないことを示すためのタイプクラス。"""

    no_code: bool


# モデルのためのシステムプロンプト
SYSTEM_PROMPT = """以下の会話は、ユーザーと対話してPythonコードを書くあなたです。最終的な応答はリストの最後のメッセージです。

時にはコードで応答し、時には質問で応答します。

コードがある場合は、それをExtractPythonCodeを使用して単一のPythonスクリプトに抽出します。

コードが見つからない場合は、NoCodeを呼び出します。"""


def try_running(state: dict) -> dict | None:
    """抽出されたPythonコードを実行して分析を試みます。

    Args:
        state: 現在の会話の状態

    Returns:
        dict | None: コードが見つかった場合は分析結果で更新された状態
    """
    model = init_chat_model(f"openai:{model_name}")
    extraction = model.bind_tools([ExtractPythonCode, NoCode])
    er = extraction.invoke(
        [{"role": "system", "content": SYSTEM_PROMPT}] + state["messages"]
    )
    if len(er.tool_calls) == 0:
        return None
    tc = er.tool_calls[0]
    if tc["name"] != "ExtractPythonCode":
        return None

    evaluator = create_pyright_evaluator()
    result = evaluator(outputs=tc["args"]["python_code"])
    print(result)

    if not result["score"]:
        return {
            "messages": [
                {
                    "role": "user",
                    "content": f"pyrightを実行した結果、次のことがわかりました: {result['comment']}\n\n"
                    "修正してください。コードスニペット全体を再生成することを確認してください。"
                    "何が間違っているかわからない場合や、間違いがあると思う場合は、"
                    "コードを生成するのではなく、私に質問することができます。",
                }
            ]
        }


def create_graphs():
    """アシスタントとジャッジのグラフを作成して構成します。"""
    # メインアシスタントグラフを定義
    assistant_graph = (
        StateGraph(MessagesState)
        .add_node(call_model)
        .add_edge(START, "call_model")
        .add_edge("call_model", END)
        .compile()
    )

    # コード分析のためのジャッジグラフを定義
    judge_graph = (
        StateGraph(MessagesState)
        .add_node(try_running)
        .add_edge(START, "try_running")
        .add_edge("try_running", END)
        .compile()
    )

    # 完全な反射グラフを作成
    return create_reflection_graph(assistant_graph, judge_graph).compile()


reflection_app = create_graphs()

if __name__ == "__main__":
    """Reflectionシステムを通じて例のクエリを実行します。"""
    example_query = [
        {
            "role": "user",
            "content": "LangGraph RAGアプリを書いてください",
        }
    ]

    print("Reflectionを使用して実行中...")
    result = reflection_app.invoke({"messages": example_query})
    print("結果:", result)

実行結果は以下の通り。

出力結果
Reflectionを使用して実行中...
結果:
{
    'messages': [
        HumanMessage(
            content='LangGraph RAGアプリを書いてください',
            additional_kwargs={},
            response_metadata={},
            id='f082425d-3d0d-47c0-bc18-9f96d16ba638'
        ),
        AIMessage(
            content='LangGraph RAG(Relational Awareness 
Graph)アプリは、個人の関係性を視覚化し、管理するためのツールです。以下は、LangGraph 
RAGアプリの簡単な実装例です。\n\n**注意:** 
この例は、PythonとNetworkXライブラリを使用して実装されています。\n\n**LangGraph RAGアプリの構造:**\n\n* 
ユーザーが関係性を追加/削除するためのインターフェース\n* 関係性をグラフとして視覚化する機能\n* 
グラフの保存/読み込み機能\n\n**実装:**\n```python\nimport networkx as nx\nimport matplotlib.pyplot as plt\n\nclass 
LangGraphRAG:\n    def __init__(self):\n        self.graph = nx.Graph()\n\n    def add_relation(self, person1, 
person2, relation):\n        self.graph.add_edge(person1, person2, relation=relation)\n\n    def 
remove_relation(self, person1, person2):\n        if self.graph.has_edge(person1, person2):\n            
self.graph.remove_edge(person1, person2)\n\n    def visualize(self):\n        pos = nx.spring_layout(self.graph)\n 
nx.draw_networkx(self.graph, pos, with_labels=True, node_color=\'lightblue\')\n        edge_labels = {(u, v): 
d[\'relation\'] for u, v, d in self.graph.edges(data=True)}\n        nx.draw_networkx_edge_labels(self.graph, pos, 
edge_labels=edge_labels)\n        plt.show()\n\n    def save_graph(self, filename):\n        
nx.write_gpickle(self.graph, filename)\n\n    def load_graph(self, filename):\n        self.graph = 
nx.read_gpickle(filename)\n\ndef main():\n    rag = LangGraphRAG()\n\n    while True:\n        print("1. 
関係性を追加")\n        print("2. 関係性を削除")\n        print("3. グラフを視覚化")\n        print("4. 
グラフを保存")\n        print("5. グラフを読み込み")\n        print("6. 終了")\n\n        choice = input("選択: 
")\n\n        if choice == "1":\n            person1 = input("人物1: ")\n            person2 = input("人物2: ")\n  
relation = input("関係性: ")\n            rag.add_relation(person1, person2, relation)\n        elif choice == 
"2":\n            person1 = input("人物1: ")\n            person2 = input("人物2: ")\n            
rag.remove_relation(person1, person2)\n        elif choice == "3":\n            rag.visualize()\n        elif 
choice == "4":\n            filename = input("ファイル名: ")\n            rag.save_graph(filename)\n        elif 
choice == "5":\n            filename = input("ファイル名: ")\n            rag.load_graph(filename)\n        elif 
choice == "6":\n            break\n        else:\n            print("無効な選択")\n\nif __name__ == "__main__":\n  
main()\n```\n**使用方法:**\n\n1. アプリを起動します。\n2. 
メニューから関係性を追加、削除、グラフを視覚化、グラフを保存、グラフを読み込み、または終了を選択します。\n3. 
関係性を追加する場合は、人物1、人物2、関係性を入力します。\n4. グラフを視覚化する場合は、グラフが表示されます。\n5.
グラフを保存する場合は、ファイル名を入力します。\n6. 
グラフを読み込みする場合は、ファイル名を入力します。\n\n**注意:** 
この実装は、簡単な例であり、実際のアプリにはさらに多くの機能とエラー処理が必要です。',
            additional_kwargs={'refusal': None},
            response_metadata={
                'token_usage': {
                    'completion_tokens': 791,
                    'prompt_tokens': 20,
                    'total_tokens': 811,
                    'completion_tokens_details': None,
                    'prompt_tokens_details': None
                },
                'model_name': 'meta-llama-3.3-70b-instruct-121024',
                'system_fingerprint': None,
                'id': 'chatcmpl_7e8b638b-070a-4b4d-8c60-883dd85549a7',
                'finish_reason': 'stop',
                'logprobs': None
            },
            id='run-2b6fde0d-821a-49c9-88b9-52d6152e24f2-0',
            usage_metadata={
                'input_tokens': 20,
                'output_tokens': 791,
                'total_tokens': 811,
                'input_token_details': {},
                'output_token_details': {}
            }
        )
    ]
}

image.png

残念ながら生成されたコードはLangGraphとは関係ないコードです。
このあたりはコード生成が得意なモデルを利用すると良い結果が得られると思います。
ただ、少なくとも実行可能なコードを結果として得られるため、そのままLLMが出力したコードを利用するよりはよい結果が得られると思います。

まとめ

Langgraph-reflectionのサンプルをDatabricks上でウォークスルーしてみました。
非常にシンプルなモジュールですが、そのためかえって使いやすいのではないかと思います。

現時点でLangGraph Functional APIのentrypointがそのまま使えなかったりするので、そのあたりも対応されると使い勝手がさらに良くなりそう。(自分でpull reqあげろというツッコミは置いときます)

1
2
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?