はじめに
株式会社NTTデータ デジタルサクセスコンサルティング事業部の@yamato0811です。
データサイエンス・AI開発の現場において、Streamlitはその手軽さからプロトタイピングやデモンストレーションに広く活用されています。また、LangGraphはLLMを活用したAgentフローを容易に構築するフレームワークとして注目を集めています。
この度、StreamlitとLangGraphを利用してHuman-in-the-loopアプリケーションを実装する機会がありました。実装を進める中で、Agentフローの構築およびStreamlitとの連携が難しく、工夫が必要となる場面に遭遇しました。特に、StreamlitとLangGraphによるHuman-in-the-loopの実装は事例が少なく、参考にできる文献がほとんど存在しません。
本記事では、広告コピー文を生成する簡単なAgentアプリケーションの作成例を紹介しています。具体的な実装例を共有することで、同様のAgent開発やアプリケーション開発に挑戦するエンジニアの皆様にとって、少しでもお役に立てれば幸いです。
リポジトリは、以下のリンクからご確認いただけます。
想定読者
- Streamlitを用いたアプリケーション開発経験がある方
- LangGraphの概要を理解している方
- StreamlitとLangGraphを連携させたアプリケーション開発に興味がある方
- LangGraphでのHuman-in-the-loop実装に興味がある方
Human-in-the-loopとは
Human-in-the-loopは、エージェントシステムにおいて人間の介入を通じてユーザー体験を向上させる手法です。この手法では、ユーザーがエージェントの動作を監視し、必要に応じて介入することで、より正確で効果的な結果を得ることができます。
LangGraphでのHuman-in-the-loopの実装
LangGraphでは、breakpointを用いてグラフの実行を特定のステップで一時停止し、人間の承認や意見を求めることができます。以下には特に承認を求める際の流れを示します。
- ブレークポイント: LangGraphのbreakpointは、エージェントが特定のノードの実行前に一時停止し、人間の承認を待つことを可能にする
- 承認を待つ:グラフ状態を保存するチェックポイントを作成し、人間からの承認を待つ。チェックポイントはスレッドに保存され、あとからグラフを再開することが可能
- グラフの再開:人間からの承認が得られたら、グラフの実行を再開する
Human-in-the-loopのサンプルコードを以下に示します。
# node2の前で一時停止を行うブレイクポイントを設定してグラフをコンパイル
graph = builder.compile(checkpointer=checkpoitner, interrupt_before=["node_2"])
# ブレークポイントまでグラフを実行する
for event in graph.stream(inputs, thread, stream_mode="values"):
print(event)
# ... 人間の承認を得る ...
# 承認されたら、最後に保存されたチェックポイントからグラフの実行を再開する
for event in graph.stream(None, thread, stream_mode="values"):
print(event)
builder.compile
メソッドのinterrupt_before
引数に、ブレークポイントを設定したいノードのID(この場合はnode_2
)を指定します。これにより、グラフの実行時に、指定したノードの前で自動的に一時停止するようになります。
グラフの実行は、graph.stream
メソッドを使用します。最初の実行時には、入力データinputs
とスレッド情報thread
を渡します。
再開時には、graph.stream(None, thread, stream_mode="values")
のように、最初の引数にNone
を渡すことで、保存されたチェックポイントから実行が再開されます。
図およびサンプルコードはLangGraphのHuman-in-the-loop How-toページより引用しています。
アプリケーションの概要
商品情報を入力すると、広告コピー文を自動生成するAgentアプリケーションを開発しました。 本アプリケーションはHuman-in-the-loopを実装しており、生成されたコピーにユーザーが満足しなければ、フローの途中でAgentにコピーの再検討(再生成)を指示できます。加えて、Agentは改善に必要な追加情報をユーザーへインタラクティブに尋ねることができ、より精度の高いコピー文を生成できます。
python実装は以下に公開しています。Agentとして利用するLLMには、Amazon BedrockのClaude 3.5 Sonnet v2 および、Azure OpenAI gpt-4o を選択可能です。
LangGraphのグラフ構造
以下に、今回作成したAgentアプリケーションのグラフ構造を以下に示します。
アプリ実行の流れ
-
キャッチコピー生成の開始:
アプリを開始すると、最初のノード(__start__
)に到達する。このノードは、アプリの開始を表す。 -
キャッチコピー生成:
generate_copy
ノードに到達すると、AgentはLLM(今回はOpenAI GPT-4o)を用いてキャッチコピーを生成する。 -
ユーザーの選択:
user_select_copy
ノードに到達すると、ユーザーに生成されたキャッチコピーを表示し、ユーザーに選択を促す。
もし、ユーザーが「再検討」を選択した場合は、reflect_copy
ノードに遷移する。 -
キャッチコピーの改善:
reflect_copy
ノードに到達すると、Agentは生成したキャッチコピーの改善点と改善に必要な追加情報を思考する。 -
ユーザーの追加情報の入力:
user_input_additioal_info_copy
ノードに到達すると、ユーザーに追加情報の入力を促す。 -
キャッチコピーの再生成:
ユーザーが追加情報を入力すると、generate_copy
ノードに戻り、Agentはキャッチコピーの再生成を行う。 -
キャッチコピー生成の終了:
ユーザーが生成されたキャッチコピーを承認すると、dummy_end
ノードに到達し、キャッチコピー生成が終了する。
※ 2~6の手順は、ユーザーが生成されたキャッチコピーを承認するまで繰り返されます(いわゆるAgent Loop)。
LangGraphでAgentグラフの実装
本章では、LangGraphを使用したAgentグラフの実装方法について、具体的なコード例を交えて詳しく説明します。Stateの定義方法、各Nodeの役割とその実装手順、およびAgentグラフの効果的な構築・管理方法について解説します。さらに、Structured Outputを利用して出力形式を保証する方法についても説明します。
LangGraphのAgentグラフの実装はagent
ディレクトリ配下にまとめています。以下では、実装の詳細について説明します。
※ プログラムの一部のみ抜粋していますので、適宜githubコードを参照ください。
Stateの定義
グラフのノード間を遷移するState情報はagent/state.py
に以下のように定義しています。また、DisplayMessageDict
は、Streamlitに表示するメッセージのタイトル、アイコン、本文を定義しています。
from typing import Annotated, List
from typing_extensions import TypedDict
class DisplayMessageDict(TypedDict):
title: Annotated[str, "表示用のタイトル"]
icon: Annotated[str, "アイコン"]
message_text: Annotated[str, "表示用のメッセージ"]
class State(TypedDict):
# ================
# Input
# ================
# Initial
product_info: Annotated[str, "商品情報"]
# Stremlit上でのuser input
additional_info_input: Annotated[str, "入力された追加情報"]
selected_copy: Annotated[dict, "選択されたキャッチコピー"]
# ================
# Output
# ================
copies: Annotated[list[dict], "キャッチコピーのリスト"]
additional_info: Annotated[str, "必要な追加情報"]
is_rethink: Annotated[bool, "再検討を行うか"]
# ================
# 処理用State
# ================
iteration_count: Annotated[int, "現在の反復回数"]
is_finish: Annotated[bool, "終了判定フラグ"]
display_message_dict: Annotated[DisplayMessageDict, "表示用のメッセージ"]
# 履歴管理用
messages: Annotated[list, "会話履歴のリスト"]
Nodeの定義
以下に、agent/node.py
の主要な部分の説明を行います。
まず、LangGraphのノード管理を簡素化するため、独自にデータクラスNodeType
を定義しています。これと後述するGraphBuilder
クラスと組み合わせて使用することで、ノード名のハードコードを避け、可読性と保守性の向上が期待できます。
@dataclass
class NodeType:
name: str # ノードの名前
func: Callable # ノードで実行する関数
Node
クラスでは、LangGraphの各ノードの処理を定義します。
各ノードはNodeType
を使って定義し、ノード名と対応する処理関数を結び付けます。例えば、self.generate_copy
は"generate_copy"
という名前を持ち、self._generate_copy
関数を実行するノードとして定義します。
class Node:
def __init__(
self,
llm: LLM,
prompt: Dict[str, Dict[str, str]],
) -> None:
self.llm = llm
self.prompt = prompt
# ================
# Define Node
# ================
self.generate_copy = NodeType("generate_copy", self._generate_copy)
self.user_select_copy = NodeType("user_select_copy", self._user_input)
self.reflect_copy = NodeType("reflect_copy", self._reflect_copy)
self.user_input_additioal_info_copy = NodeType(
"user_input_additioal_info_copy", self._user_input
)
self.end = NodeType("dummy_end", self._end_node)
genarate_copyノード
genarate_copy
ノードの処理関数は以下のように記載しています。
初回のコンテンツ生成(state["iteration_count"] == 0
)と2回目以降(else
ブロック)で処理が異なります。初回は商品情報を入力したプロンプトを、2回目以降はユーザーの追加情報を考慮したユーザープロンプトを使用してLLMを実行します。
また、今回は所望の形式で出力を得るため、LLM実行にlangchainのStructured Outputを用いています。(具体的な使用については後述します。)
LLMの出力結果は整形してStateに格納します。display_message_dict
には、Streamlitに表示するためのメッセージを格納します。
def _generate_copy(self, state: State) -> State:
print("Node: generate_copy")
product_info = state["product_info"]
# 初回コンテンツ生成
if state["iteration_count"] == 0:
system_prompt = SystemMessage(
content=self.prompt["generate_copy"]["system"]
)
human_prompt = HumanMessagePromptTemplate.from_template(
self.prompt["generate_copy"]["user_first"]
).format(
product_info=product_info,
output_format_instruction=get_output_format_instructions(Copies),
)
state["messages"] = [system_prompt, human_prompt]
# 2回目以降のコンテンツ生成
else:
human_prompt = HumanMessagePromptTemplate.from_template(
self.prompt["generate_copy"]["user_second"]
).format(
product_info=product_info,
additional_info=state["additional_info"],
additional_info_input=state["additional_info_input"],
output_format_instruction=get_output_format_instructions(Copies),
state=state,
)
# 履歴にユーザーの入力を追加
state["messages"].append(human_prompt)
# invoke
ai_message = self.llm((state["messages"]), Copies)
# 履歴にAIの出力を追加
state["messages"].append(AIMessage(ai_message.model_dump_json()))
# AIの出力をリストに変換
output_list = ai_message.model_dump()["copies"]
# streamlit表示用のメッセージ
message_text = ""
for output in output_list:
# avoid to break markdown format
output["copy_text"] = output["copy_text"].replace("\n", "")
# markdown改行のため空白スペース(\u0020)が2つ必要
message_text += f"""
**【{output["title"]}】**\u0020\u0020
**キャッチコピー**:{output["copy_text"]}\u0020\u0020
**理由**:{output["reason"]}
"""
display_message_dict = {
"title": f"**キャッチコピーの作成** {state['iteration_count'] + 1}回目",
"icon": "📝",
"message_text": message_text,
}
# 'reason'キーのみを削除した新しいリストを生成
filtered_list = filter_key_from_list(output_list, "reason")
# 状態の更新
state["copies"] = filtered_list
state["display_message_dict"] = display_message_dict
return state
genarate_copy
ノードで使用してるプロンプトは別ファイルで管理しています。
今回はXMLタグを使ってプロンプトの構造化を行っています。
generate_copy:
system: |
あなたはプロのコピーライターです。
user_first: |
<instruction>
以下のproductタグ内の情報を基に、商品のキャッチコピーを3つ生成して下さい。
なぜその出力にしたかの理由も考えて下さい。
</instruction>
<product>
{product_info}
</product>
<output>
{output_format_instruction}
</output>
user_second: |
<instruction>
以下のadditional_infoタグ内のユーザーからの追加情報を考慮した上で、productタグ内の情報を基に、商品のキャッチコピー3つ生成して下さい。
なぜその出力にしたかの理由も考えて下さい。
</instruction>
<additional_info>
{additional_info}: {additional_info_input}
</additional_info>
<output>
{output_format_instruction}
</output>
reflect_copyノード
reflect_copy
ノードも同様にして定義しています。
genarate_copy
ノードで初回実行かどうかを判定するため、処理の最後でカウントアップ(state["iteration_count"] += 1
)している点には注意してください。
def _reflect_copy(self, state: State) -> State:
print("Node: reflect_copy")
copies = state["copies"]
human_prompt = HumanMessagePromptTemplate.from_template(
self.prompt["reflect_copy"]["user"]
).format(
copies=copies,
output_format_instruction=get_output_format_instructions(ReflectDetails),
)
# 履歴にユーザーの入力を追加
state["messages"].append(human_prompt)
# invoke
ai_message = self.llm((state["messages"]), ReflectDetails)
# 履歴にAIの出力を追加
state["messages"].append(AIMessage(ai_message.model_dump_json()))
# 文字列をPythonの辞書に変換
data = ai_message.model_dump()
display_message_dict = {
"title": f"**キャッチコピーの改善** {state['iteration_count'] + 1}回目",
"icon": "🔄",
"message_text": f"""
**改善点**:{data["improvement_point"]}\u0020\u0020
**必要な追加情報**:{data["additional_info"]}\u0020\u0020
**理由**:{data["reason"]}
""",
}
# 状態の更新
state["additional_info"] = data["additional_info"]
state["display_message_dict"] = display_message_dict
# カウントアップ
state["iteration_count"] += 1
return state
reflect_copy
ノードで使用しているプロンプトは以下です。
reflect_copy:
user: |
<instruction>
copyタグ内の複数のキャッチコピーを評価し、改善点を考えて下さい。
また、改善点を実現するために必要な「ユーザーからの追加情報」をとても簡潔に1つだけ考えて下さい。
なぜその出力にしたかの理由も考えて下さい。
</instruction>
<copy>
{copies}
</copy>
<output>
{output_format_instruction}
</output>
その他のノード
user_input
ノードは、一時停止を行うためのダミーノードなので内部処理は記載していません。また、Streamlit実装で必要となるダミーのend_node
も定義しておきます。
def _user_input(self, state: State):
pass
def _end_node(self, state: State):
print("Node: end_node")
return {"is_finish": True, "display_message_dict": None}
分岐関数
Conditional edge(条件分岐)を実現するための分岐関数も定義しています。
stateのis_rethink
がTrueの場合に"reflect"
へ、Falseの場合に"end"
へルーティングします。
def should_rethink(self, state: State) -> Literal["reflect", "end"]:
if state["is_rethink"]:
return "reflect"
else:
return "end"
(補足)Structured Output
LLMに事前定義した形式で出力させるように、LangChainのStructured Outputを使用しています。
出力のスキーマ形式は、agent/output_structure.py
に定義してあります。
from typing import Optional
from pydantic import BaseModel, Field
# ================
# Copy Generation
# ================
class Copy(BaseModel):
"""コピー"""
title: str = Field(description="タイトル(例:案1, 案2, ..)")
reason: str = Field(description="回答の理由")
copy_text: str = Field(description="キャッチコピー")
class Copies(BaseModel):
"""コピーの出力形式"""
copies: list[Copy] = Field(description="キャッチコピーのリスト")
# ================
# Reflect
# ================
class ReflectDetails(BaseModel):
"""ユーザーからのフィードバック情報"""
reason: str = Field(description="回答の理由")
improvement_point: str = Field(description="改善点")
additional_info: Optional[str] = Field(
default=None,
description="ユーザーに求める追加情報の内容(体言止め)",
)
LLMを呼び出す際に、以下のようにpydanticで定義したスキーマを渡すことで、スキーマに従った出力を取得できます。
# invoke
ai_message = self.llm((state["messages"]), Copies)
json_data = ai_message.model_dump()
このときの出力例は以下です。
[
{
"title": "案1",
"reason": "『保湿』という商品の機能を直接的にアピールしつつ、つるつるの肌を想像させるため。",
"copy_text": "しっとり肌へ、うるおいの魔法を。"
},
{
"title": "案2",
"reason": "季節や環境に左右されず保湿できることを強調し、毎日使える定番アイテムであることを伝えています。",
"copy_text": "毎日の潤いチャージ、これ一本で。"
},
{
"title": "案3",
"reason": "肌に優しいイメージを持たせ、安心して使える保湿クリームであることを伝えています。",
"copy_text": "肌に優しさ、保湿の贈り物。"
}
]
また、LangChainのOutputParserを使うと、LLM出力のフォーマット指示プロンプトを取得することも可能です。
def get_output_format_instructions(model: BaseModel) -> str:
"""出力フォーマットの指示を取得する"""
parser = PydanticOutputParser(pydantic_object=model)
output_format_instruction = parser.get_format_instructions()
return output_format_instruction
例えば、上記の関数を使用してCopies
のフォーマット指示を取得すると、以下のような指示が得られます。このフォーマット指示を、出力形式を指定したいプロンプトに付加することで、出力形式の指示を与えることができます。
The output should be formatted as a JSON instance that conforms to the JSON schema below.
As an example, for the schema {"properties": {"foo": {"title": "Foo", "description": "a list of strings", "type": "array", "items": {"type": "string"}}}, "required": ["foo"]}
the object {"foo": ["bar", "baz"]} is a well-formatted instance of the schema. The object {"properties": {"foo": ["bar", "baz"]}} is not well-formatted.
Here is the output schema:
{"$defs": {"Copy": {"description": "コピー", "properties": {"title": {"description": "タイトル(例:案1, 案2, ..)", "title": "Title", "type": "string"}, "reason": {"description": "回答の理由", "title": "Reason", "type": "string"}, "copy_text": {"description": "キャッチコピー", "title": "Copy Text", "type": "string"}}, "required": ["title", "reason", "copy_text"], "title": "Copy", "type": "object"}}, "description": "コピーの出力形式", "properties": {"copies": {"description": "キャッチコピーのリスト", "items": {"$ref": "#/$defs/Copy"}, "title": "Copies", "type": "array"}}, "required": ["copies"]}
この仕組みを利用することで、プロンプトを意識することなく、スキーマ形式を変更するだけで出力形式を変更することが可能となります。
GraphBuliderの定義
LangGraph のノード管理を簡素化するため、グラフ処理のラッパークラスGraphBuilder
を独自に作成しています。このクラスを利用することで、ノード名をハードコードすることなく、グラフ構造を定義できるため、コードの保守性と再利用性が向上します。
class GraphBuilder:
def __init__(self, state: State) -> None:
self.work_flow: StateGraph = StateGraph(state)
def add_node(self, node: NodeType) -> None:
self.work_flow.add_node(node.name, node.func)
def add_edge(self, from_node: NodeType, to_node: NodeType) -> None:
self.work_flow.add_edge(from_node.name, to_node.name)
def set_finish_point(self, end_node: NodeType) -> None:
self.work_flow.set_finish_point(end_node.name)
def set_entry_point(self, node: NodeType) -> None:
self.work_flow.set_entry_point(node.name)
def add_conditional_edges(
self,
from_node: NodeType,
condition_func: Callable,
path_map: Dict[str, Union[str, Any]],
) -> None:
self.work_flow.add_conditional_edges(from_node.name, condition_func, path_map)
def compile_flow(self) -> CompiledStateGraph:
return self.work_flow.compile()
Graphの定義
agent/agent.py
でグラフ構造を構築しています。
graph_builder
を使用して、ノードとエッジをグラフに追加していきます。add_node
でノードを、add_edge
でノード間の接続を定義します。また、add_conditional_edges
では、条件分岐の接続を定義しています。
set_entry_point
とset_finish_point
で開始ノードと終了ノードを指定します。
その後、interrupt_before
に一時停止を行うノードを指定し、グラフのコンパイルを行っています。
class Agent:
def __init__(
self,
llm: LLM,
prompt: Dict[str, Dict[str, str]],
) -> None:
# ================
# Init
# ================
graph_builder = GraphBuilder(State)
self.node = Node(llm, prompt)
# ================
# Build Graph
# ================
# Add nodes
graph_builder.add_node(self.node.generate_copy)
graph_builder.add_node(self.node.user_select_copy)
graph_builder.add_node(self.node.reflect_copy)
graph_builder.add_node(self.node.user_input_additioal_info_copy)
graph_builder.add_node(self.node.end)
# Add edges
graph_builder.add_edge(self.node.generate_copy, self.node.user_select_copy)
graph_builder.add_conditional_edges(
self.node.user_select_copy,
self.node.should_rethink,
{
"reflect": self.node.reflect_copy.name,
"end": self.node.end.name,
},
)
graph_builder.add_edge(
self.node.reflect_copy, self.node.user_input_additioal_info_copy
)
graph_builder.add_edge(
self.node.user_input_additioal_info_copy, self.node.generate_copy
)
# Set entry and finish point
graph_builder.set_entry_point(self.node.generate_copy)
graph_builder.set_finish_point(self.node.end)
# Set up memory
self.memory = MemorySaver()
self.graph = graph_builder.work_flow.compile(
checkpointer=self.memory,
interrupt_before=[
self.node.user_select_copy.name,
self.node.user_input_additioal_info_copy.name,
],
)
interrupt_before
についての補足
LangGraphでは、グラフコンパイル時の引数interrupt_before
で、グラフの実行を一時停止するブレークポイントを設定することができます。ブレークポイントとは、指定したノードの実行前、または実行後にグラフの実行を一時停止する機能であり、停止のタイミングはグラフコンパイル時の引数interrupt_before
または interrupt_after
で制御できます。
ブレークポイントを使用する場合は、必ずチェックポインターを利用し、グラフの状態を永続化しておく必要があります。グラフの実行再開時には、以下のようにグラフの入力にNone
を指定するだけで、停止直前のグラフの状態を引き継いで一時停止したノードから実行を再開できます。
agent.graph.stream(None, thread, stream_mode="values")
https://langchain-ai.github.io/langgraph/how-tos/human_in_the_loop/breakpoints/#simple-usage
StreamlitでHuman-in-the-loopの実装
本章では、StreamlitとLangGraphを連携させる具体的な実装方法について説明します。LangGraphと連携するためには、グラフの状態を取得しつつ、グラフの実行(ブレイクポイントからのグラフ再実行)を行うことが重要です。そこで、グラフの状態を取得しながら各ノードに応じた適切な処理を実装する方法について、具体的なコードを用いながら詳しく解説します。
※ プログラムの一部のみ抜粋していますので、適宜githubコードを参照ください。
メイン処理実行準備
まず、LangGraphフロー実行に必要なStreamlitの諸々の設定を行います。
以下のコードには、次の処理が含まれています。
- ページconfigの設定
- LLMとプロンプトの初期化
- セッション管理の初期化
- Streamlit Session Stateは別ファイルに切り出して管理しています
- ユーザー入力フォームの表示
- 初期表示される入力UIも別ファイルに切り出しています
- 過去のメッセージ履歴を表示
- Streamlitではユーザーによる入力やボタンのクリックなどのイベントが発生すると、アプリケーション全体が再実行され、変数がリセットされます
- そのため、Session Stateに保存したメッセージを表示する必要があります
MODEL = "claude-3-5" # specify "gpt-4o" or "claude-3-5"
THREAD_ID = "1"
PROMPT_PATH = "agent/prompt/prompt_templates.yaml"
TEMPERATURE = 1.0
def main() -> None:
# ================
# Page Config
# ================
st.set_page_config(
page_title="Streamlit×LangGraph コピー生成",
page_icon="🤖",
initial_sidebar_state="auto",
)
st.title("Streamlit×LangGraph コピー生成")
# ================
# Init Actor
# ================
llm = LLM(MODEL, TEMPERATURE)
prompt: Dict[str, Dict[str, str]] = load_yaml(PROMPT_PATH)
# ================
# Streamlit Session State
# ================
session_manager = SessionManager(llm=llm, prompt=prompt)
agent = session_manager.get_agent()
# ================
# Input
# ================
product_info = input_form()
# ================
# Display
# ================
display_history(session_manager.get_messages())
グラフ実行部分
次に、LangGraphのグラフ実行部分について説明します。
まず、グラフ実行に必要なthread
とinitial_input
を定義し、これらを用いてグラフの実行準備をします。initial_input
にはStateの初期値を設定します。
thread = {"configurable": {"thread_id": THREAD_ID}}
initial_input = {
"product_info": product_info,
"iteration_count": 0,
"is_finish": False,
"display_message_dict": None,
"messages": [],
"additional_info_input": "",
}
while True
のループがLang Graphのグラフ実行部分です。
グラフが終了ノードに到達するまではグラフの実行を続ける(break pointで止まったとしても、ユーザー入力があればグラフの実行を続ける)必要があるため、最終ノードでのみbreak
するループを作成します。
ループの中では、グラフの状態(位置)によって処理を分けます。
条件 | 処理 |
---|---|
開始ノードの場合 |
initial_input を設定してグラフ実行 |
次のノードが存在する場合 (開始ノード以外で) |
次のノード名に応じて処理を分岐 |
終了ノードの場合 | 生成されたコピーを表示し、while Trueループを抜ける |
while True:
# 開始ノードの場合
if agent.is_start_node(thread):
# グラフの実行
stream_graph(agent, initial_input, thread, session_manager)
# 次のノードがある場合
next_graph: tuple[str, ...] | Any = agent.get_next_node(thread)
if next_graph:
if next_graph[0] == agent.node.user_select_copy.name:
select_item(
agent=agent,
thread=thread,
state_key="copies",
selectbox_message="お気に入りのキャッチコピーを選択してください",
state_update_key="selected_copy",
as_node=next_graph[0],
)
elif next_graph[0] == agent.node.user_input_additioal_info_copy.name:
input_additional_info(
agent=agent,
thread=thread,
as_node=next_graph[0],
)
# グラフの実行
stream_graph(agent, None, thread, session_manager)
# 終了ノードの場合
if agent.is_end_node(thread):
selected_copy = agent.get_state_value(thread, "selected_copy")
st.success(f"生成したコピー: {selected_copy["copy_text"]}")
break
ここで、開始・終了ノードの判定(agent.is_start_node()
, agent.is_end_node()
)や、次のノード名取得(agent.get_next_node()
)は以下のHelper関数を作成し、使用しています。また、get_state_value
はStateを取得する関数です。
def is_start_node(self, thread: dict) -> bool:
return self.graph.get_state(thread).created_at is None
def is_end_node(self, thread: dict) -> bool:
return self.get_state_value(thread, "is_finish")
def get_next_node(self, thread: dict) -> tuple[str, ...]:
return self.graph.get_state(thread).next
def get_state_value(
self, thread: dict, name: str
) -> Union[dict[str, Any], Any, None]:
state = self.graph.get_state(thread)
if state and name in state.values:
return state.values.get(name)
return None
dammy_end
ノードを作成していたのは、アプリケーションの終了判定を行うためです。
dammy_end
ノード内で、is_finish
StateをTrue
に更新しています。
グラフの実行、再実行には事前適宜した関数stream_graph
を使用しています。
def stream_graph(
agent: Agent,
input: Dict | None,
thread: Dict,
session_manager: SessionManager,
) -> None:
"""
グラフのストリーミングを行う
"""
events = agent.graph.stream(input, thread, stream_mode="values")
for event in events: # event is state in each node.
# get synchronized result. You should not get state from thread before update completely.
if display_message_dict := event.get("display_message_dict"):
# 表示
_display_message(display_message_dict)
# messageの保存
session_manager.save_message_to_session_state(display_message_dict)
グラフのstream実行の際に、agent.get_state_value
によってStateを取得すると、Stateの一部の要素が空になって返ってくることがありました。これは恐らく、Iteratorから取得したevent(State)と、threadから取得するStateの同期に遅延が発生しているためです。
events = agent.graph.stream(input, thread, stream_mode="values")
for event in events:
if display_message_dict := agent.get_state_value(thread, 'display_message_dict')
そこで、完全に更新されたグラフ結果を得るため、event.get()
を利用するようにしました。この修正により、Stateが空になる問題が解消され、各ステップの実行結果を正確に取得できるようになりました。
events = agent.graph.stream(input, thread, stream_mode="values")
for event in events: # event is state in each node.
# get synchronized result. You should not get state from thread before update completely.
if display_message_dict := event.get("display_message_dict"):
各ノードに応じた処理分岐
上述の通り、次のノードが存在する場合は、以下のように次のノード名に応じて処理を分岐させます。
if next_graph[0] == agent.node.user_select_copy.name:
コードの肥大化を防ぐために、処理は別ファイルに切り出しています。
ここでは、次のノードがuser_input_additioal_info_copy
の場合の処理を説明します。
elif next_graph[0] == agent.node.user_input_additioal_info_copy.name:
input_additional_info(
agent=agent,
thread=thread,
as_node=next_graph[0],
)
input_additional_info
関数では、ユーザーに追加情報を求めるための入力フィールドを表示し、その情報をStateに保存します。
def input_additional_info(agent: Agent, thread: dict, as_node: str) -> None:
"""
追加情報入力関数。
Args:
agent (Agent): エージェントオブジェクト
thread (dict): 現在のスレッドの辞書
as_node (str): ステート更新ノード名
"""
additional_info = agent.get_state_value(thread, "additional_info")
# ユーザー情報入力
additional_info_input = st.text_input(f"「{additional_info}」を入力してください")
if not st.button(
"次へ",
disabled=not bool(additional_info_input),
key=as_node,
):
print("User Input Stop")
st.stop() # この時点で処理が停止
print("Entered User Input: ", additional_info_input)
agent.graph.update_state(
thread,
{
"additional_info_input": additional_info_input,
"display_message_dict": None,
},
as_node=as_node,
)
「次へ」ボタンを押下されていない場合には次の処理を実行しないように、st.stop()
を使用しています。st.stop()
の使い方はStreamlitのドキュメントを参照してください。
まとめ
本記事では、StreamlitとLangGraphを連携させてHuman-in-the-loopな広告コピー生成アプリケーションを実装する方法を解説しました。本記事を参考にぜひ、Streamlit×LangGraphのアプリケーションを実装してみてください!
謝辞
本実装は、同じチームの@ren8kさんと共に取り組みました。
また、アプリケーションに関しては、同チームの藤田さんから多くのアドバイスをいただきました。
この場を借りて、心より感謝申し上げます。
仲間募集
NTTデータ テクノロジーコンサルティング事業本部 では、以下の職種を募集しています。
1. クラウド技術を活用したデータ分析プラットフォームの開発・構築(ITアーキテクト/クラウドエンジニア)
クラウド/プラットフォーム技術の知見に基づき、DWH、BI、ETL領域におけるソリューション開発を推進します。
https://enterprise-aiiot.nttdata.com/recruitment/career_sp/cloud_engineer
2. データサイエンス領域(データサイエンティスト/データアナリスト)
データ活用/情報処理/AI/BI/統計学などの情報科学を活用し、よりデータサイエンスの観点から、データ分析プロジェクトのリーダーとしてお客様のDX/デジタルサクセスを推進します。
https://enterprise-aiiot.nttdata.com/recruitment/career_sp/datascientist
3.お客様のAI活用の成功を推進するAIサクセスマネージャー
DataRobotをはじめとしたAIソリューションやサービスを使って、
お客様のAIプロジェクトを成功させ、ビジネス価値を創出するための活動を実施し、
お客様内でのAI活用を拡大、NTTデータが提供するAIソリューションの利用継続を推進していただく人材を募集しています。
https://nttdata.jposting.net/u/job.phtml?job_code=804
4.DX/デジタルサクセスを推進するデータサイエンティスト《管理職/管理職候補》
データ分析プロジェクトのリーダとして、正確な課題の把握、適切な評価指標の設定、分析計画策定や適切な分析手法や技術の評価・選定といったデータ活用の具現化、高度化を行い分析結果の見える化・お客様の納得感醸成を行うことで、ビジネス成果・価値を出すアクションへとつなげることができるデータサイエンティスト人材を募集しています。ソリューション紹介
Trusted Data Foundationについて
~データ資産を分析活用するための環境をオールインワンで提供するソリューション~
https://www.nttdata.com/jp/ja/lineup/tdf/
最新のクラウド技術を採用して弊社が独自に設計したリファレンスアーキテクチャ(Datalake+DWH+AI/BI)を顧客要件に合わせてカスタマイズして提供します。
可視化、機械学習、DeepLearningなどデータ資産を分析活用するための環境がオールインワンで用意されており、これまでとは別次元の量と質のデータを用いてアジリティ高くDX推進を実現できます。
TDFⓇ-AM(Trusted Data Foundation - Analytics Managed Service)について
~データ活用基盤の段階的な拡張支援(Quick Start) と保守運用のマネジメント(Analytics Managed)をご提供することでお客様のDXを成功に導く、データ活用プラットフォームサービス~
https://www.nttdata.com/jp/ja/lineup/tdf_am/
TDFⓇ-AMは、データ活用をQuickに始めることができ、データ活用の成熟度に応じて段階的に環境を拡張します。プラットフォームの保守運用はNTTデータが一括で実施し、お客様は成果創出に専念することが可能です。また、日々最新のテクノロジーをキャッチアップし、常に活用しやすい環境を提供します。なお、ご要望に応じて上流のコンサルティングフェーズからAI/BIなどのデータ活用支援に至るまで、End to Endで課題解決に向けて伴走することも可能です。
NTTデータとDatabricksについて
NTTデータは、お客様企業のデジタル変革・DXの成功に向けて、「databricks」のソリューションの提供に加え、情報活用戦略の立案から、AI技術の活用も含めたアナリティクス、分析基盤構築・運用、分析業務のアウトソースまで、ワンストップの支援を提供いたします。NTTデータとTableauについて
ビジュアル分析プラットフォームのTableauと2014年にパートナー契約を締結し、自社の経営ダッシュボード基盤への採用や独自のコンピテンシーセンターの設置などの取り組みを進めてきました。さらに2019年度にはSalesforceとワンストップでのサービスを提供開始するなど、積極的にビジネスを展開しています。
これまでPartner of the Year, Japanを4年連続で受賞しており、2021年にはアジア太平洋地域で最もビジネスに貢献したパートナーとして表彰されました。
また、2020年度からは、Tableauを活用したデータ活用促進のコンサルティングや導入サービスの他、AI活用やデータマネジメント整備など、お客さまの企業全体のデータ活用民主化を成功させるためのノウハウ・方法論を体系化した「デジタルサクセス」プログラムを提供開始しています。
https://www.nttdata.com/jp/ja/lineup/tableau/
NTTデータとAlteryxについて
Alteryx導入の豊富な実績を持つNTTデータは、最高位にあたるAlteryx Premiumパートナーとしてお客さまをご支援します。
導入時のプロフェッショナル支援など独自メニューを整備し、特定の業種によらない多くのお客さまに、Alteryxを活用したサービスの強化・拡充を提供します。
NTTデータとDataRobotについて
NTTデータはDataRobot社と戦略的資本業務提携を行い、経験豊富なデータサイエンティストがAI・データ活用を起点にお客様のビジネスにおける価値創出をご支援します。
NTTデータとInformaticaについて
データ連携や処理方式を専門領域として10年以上取り組んできたプロ集団であるNTTデータは、データマネジメント領域でグローバルでの高い評価を得ているInformatica社とパートナーシップを結び、サービス強化を推進しています。
https://www.nttdata.com/jp/ja/lineup/informatica/
NTTデータとSnowflakeについて
NTTデータではこれまでも、独自ノウハウに基づき、ビッグデータ・AIなど領域に係る市場競争力のあるさまざまなソリューションパートナーとともにエコシステムを形成し、お客さまのビジネス変革を導いてきました。
Snowflakeは、これら先端テクノロジーとのエコシステムの形成に強みがあり、NTTデータはこれらを組み合わせることでお客さまに最適なインテグレーションをご提供いたします。