はじめに
Streamlit × LangGraphでHuman-in-the-loopを実現するWebアプリケーションを作りました。
LangGraphでHuman-in-the-loopを実現することは容易ですが、Streamlitで表現するのに手こずったので、同じような方に少しでもお役に立てれば幸いです。
リポジトリを載せます。
LangGraphは処理が抽象化されていてわかりずらい部分があります。
その場合は大変ですがスクラッチで書いてみるのがおすすめです(自戒を込めて)。
想定読者
- Streamlit, LangGraphの概要を理解している方
- StreamlitでHuman-in-the-loopを実現したい方
Human-in-the-loopとは
AIエージェントと会話する中で、AIエージェントが判断に迷った場合や重要な決定が必要な場合に、ユーザーに判断を委ねることができます。
本アプリケーションでは、例えば回答するための情報が不足している場合にアラートを出し、ユーザーに追加情報を促します。
実装のポイント
LangGraphのチュートリアル「Add human-in-the-loop controls」にUIとしてStreamlitを追加する形で作りました。
グラフ構成
シングルエージェント構成のシンプルなものになっています。
※AIエージェントの定義は諸説ありますが、ここではToolの使用と繰り返し処理が出来るものと定義しています
- ユーザーの質問に回答するchatbotエージェント
- エージェントが使うToolはWeb検索をするための
search_web
、人間に問い合わせるためのhuman_assistance
の二つ

プロジェクト構成
.
├── main.py # エントリーポイント & UI
├── src/
│ ├── agent.py # AIエージェントの実装
│ └── setup.py # 設定とロギングのセットアップ
├── config.yaml # モデルとAPIの設定ファイル
...
main.py
エントリーポイントとなり、こちらにStreamlitの実装を集約しています。
Streamlitはボタンクリックなどのイベントが起きると状態がリセットされるため、状態を保持したい値はst.session_state
に入れます。
AIエージェントの実装はagent.py
でAgentクラスにまとめており、そのインスタンスをst.session_state["agent"]
に入れることで、状態管理する値をなるべく減らすようにしています。
thread_idを用いて会話履歴を管理するためにst.session_state["config"]
を定義します。
interruptを捕捉するためにst.session_state["interrupted"]
, st.session_state["interrupt_value"]
を定義します(後述)。
if "agent" not in st.session_state:
st.session_state["agent"] = Agent()
if "messages" not in st.session_state:
st.session_state["messages"] = []
if "graph" not in st.session_state:
st.session_state["graph"] = st.session_state["agent"].graph
st.session_state["graph_image"] = st.session_state["agent"].graph_image
st.session_state["config"] = st.session_state["agent"].config
st.session_state["interrupted"] = False
st.session_state["interrupt_value"] = None
LangGraphのHuman-in-the-loopはinterrupt
とCommand
で実現します。
interrupt(中断) → Command(再開)の流れです。
- interrupt: 特定の時点で実行を一時停止し、人間によるレビュー用に情報を提示
- Command: 人間が指定した値で実行を再開するために使用
※類似の機能でinterrupt_before
がありますが、より簡素にHuman-in-the-loopを実現するためにinterrupt
が導入されています
interruptはToolのhuman_assistance
で定義します(後述)。
Commandはグラフ実行(st.session_state["graph"].stream()
)の際に定義する必要があり、通常のグラフ実行と区別するために、interruptしているかどうかを判断する必要があります。
def process_stream(user_input: dict[str, list[dict[str, str]]] | Command) -> None:
# Get events generator from graph.stream
# events is a generator that yields each event in the conversation flow
events = st.session_state["graph"].stream(
user_input, # ← Commandの際はCommand(resume={"data": prompt})となる
config=st.session_state["config"],
stream_mode="values",
)
# Convert generator to list to process the last event
list_events = list(events)
last_event = list_events[-1]
# DEBUG
# st.write("list_events: ", list_events)
st.write("last_event: ", last_event)
...
def main() -> None:
...
if prompt:
st.session_state["messages"].append({"role": "user", "content": prompt})
with st.chat_message("user"):
st.markdown(prompt)
user_input: dict[str, list[dict[str, str]]] | Command
if not st.session_state["interrupted"]:
user_input = {"messages": [{"role": "user", "content": prompt}]}
process_stream(user_input)
else:
# If interrupted, prompt input from the user
user_input = Command(resume={"data": prompt})
process_stream(user_input)
# 質問: エラーが出ます。どうすればいいですか?
# st.write("last_event: ", last_event)の結果
last_event:
{
"__interrupt__":[
0:"Interrupt(value={'query': '何のエラーが出ているのか、具体的なメッセージや状況を教えてください。どの操作やシステムでエラーが出ているのかも教えてください。」}】'}, resumable=True, ns=['tools:8bc5e43d-1179-b6fb-095a-355846e81f80'])"
]
}
上記の通り、グラフ実行の結果であるeventsの最後の要素を見ると、interruptしている場合は"__interrupt__"
となるため、こちらで条件分岐します。
st.session_state["interrupted"]
をtrueにし、次の実行、つまりCommandによるグラフ実行(再開)ができるようにします。
st.session_state["interrupt_value"]
にeventsの最後の要素のクエリを入れて、UIに表示し、人間によるレビューを促します。
if "__interrupt__" in last_event:
st.session_state["interrupted"] = True
st.session_state["interrupt_value"] = last_event["__interrupt__"][0].value[
"query"
]
# If necessary, add a custom message
content = (
f"{st.session_state['interrupt_value']}\n\n"
"⚠️ **回答するための情報が不足しています。確認してください。**"
)
st.session_state["messages"].append(
{
"role": "assistant",
"content": content,
}
)
with st.chat_message("assistant"):
st.markdown(content)
elif "messages" in last_event:
st.session_state["interrupted"] = False
st.session_state["interrupt_value"] = None
for message in last_event["messages"]:
if isinstance(message, AIMessage) and message.content:
ai_message = str(message.content)
st.session_state["messages"].append(
{"role": "assistant", "content": ai_message}
)
with st.chat_message("assistant"):
st.markdown(ai_message)
else:
logger.debug(f"Unexpected event type received: {last_event}")
raise Exception("Unexpected event type in stream response")
agent.py
AgentクラスにAIエージェントの実装をまとめています。
chatbotエージェントではユーザーからの質問に対してToolを呼ぶかどうかを判断しますが、同じTool、あるいは別のToolを複数回呼ぶ場合があります。複数回の実装は複雑になるのと、チュートリアルのコードでもassert len(message.tool_calls) <= 1
で複数回の実装は避けられていることから、単数呼び出しとしています。
class Agent:
...
# Nodes
def chatbot(self, state: State) -> dict[str, list]:
# Add system message if not present at the beginning
if not any(isinstance(message, SystemMessage) for message in state["messages"]):
system_prompt = SystemMessage(
content=(
"You are a helpful assistant. "
"IMPORTANT: Do not hesitate to use the human_assistance tool "
"whenever you are uncertain or need more details. It is better "
"to ask for clarification than to make assumptions. "
"Use only one tool per turn, with a single call per tool."
)
)
state["messages"].insert(0, system_prompt)
logger.debug(f"state['messages']: {state['messages']}")
message = self.llm_with_tools.invoke(state["messages"])
logger.debug(f"tool_calls/len: {len(message.tool_calls)}")
logger.debug(f"tool_calls: {message.tool_calls}")
# When multiple tool calls are detected, use only the first one
# to prevent parallel tool execution and ensure sequential processing
if len(message.tool_calls) > 1:
message.tool_calls = [message.tool_calls[0]]
# assert len(message.tool_calls) <= 1
return {"messages": [message]}
人間をレビュアーとするToolを定義します。ここでinterruptを使って処理を中断します。
@tool
def human_assistance(query: str) -> str:
"""Request assistance from a human.
This tool implements LangGraph's Human-in-the-Loop functionality.
It is used when the AI:
- needs help making critical decisions
- is uncertain about the best course of action
- requires human judgment on complex matters
The conversation flow is interrupted to delegate the decision-making
to the user, ensuring better accuracy and reliability.
"""
logger.debug(f"human_assistance/start: {query}")
human_response = interrupt({"query": query})
logger.debug(f"human_assistance/end: {human_response}")
return human_response["data"]
なお、ToolとしてWeb検索をするためのsearch_webを定義しましたが、config.yaml
のsearch_webをfalseにすることで、外部検索をせずに、人間によるレビューをより促すことができます。
おわりに
StreamlitでHuman-in-the-loopを実現しました。これをベースに、ステートの追加やマルチエージェントの構築が出来ればと思います。
参考文献
類似の事例が少ない中で参考にさせていただきました。感謝申し上げます。
Toolの使い方が大変わかりやすかったです。感謝申し上げます。