イケてるチャットボットにはチャット履歴が必須では?
お疲れ様です。最近LanGraphとChainlitがマイブームのアスカです。
つい最近、LangGraphとChainlitの導入記事を書きました。
参考:AIマルチエージェントのアプリをChainlitで爆速開発しよう
要約すると、「Chainlit最高!UIは簡単に済ませてエージェント開発に注力しようぜ」みたいな話なんですが、以前の記事ではチャット履歴の保存まで書けていなかったです。
「イケてるチャットボットにはチャット履歴が必須では?」
この記事を読んでいる皆さんなら思ったはず...私も思います。という事での続編です。
※一部ドキュメント通りにできない箇所もあったので、直近実装してみたい人は必見です。
Chainlitでのチャット履歴保存方法
以下のページに概要説明があります。
データの永続化
前提条件
ユーザ認証がされている必要があります。これもいくつか選択肢がありますが、どれでもOKかと思います。
- パスワード
- ヘッダー
- OAuth
- Amazon Cognito
- ...
参考:認証設定の概要
今回は見慣れたGoogle認証を付けました。手順のみ記載しておきます。
- Google OAuth appの新しいOAuthクライアントを作成して、クライアントIDとシークレットキーを発行。
- それぞれ環境変数OAUTH_GOOGLE_CLIENT_ID、OAUTH_GOOGLE_CLIENT_SECRETとして登録。
- Chainlitに以下のコードを追加。
@cl.oauth_callback
def oauth_callback(
provider_id: str,
token: str,
raw_user_data: dict[str, str],
default_user: cl.User,
) -> cl.User | None:
return default_user
そうすると、こんなログイン画面が作成されます。簡単で感嘆ですよね。
チャット履歴の保存方法
データの永続化、いわゆるチャット履歴を保存するにはいくつか選択肢があります。
- Literal AI
- オープンソースデータレイヤー
- Official Data Layer
- SQLAlchemy Data Layer
- DynamoDB Data Layer
AWSに少し慣れているという事もあるので、DynamoDB Data Layerを使う事にしました。
チャット履歴保存の流れ
基本的にはドキュメントの流れになります。
参考:DynamoDB Data Layerの作り方
- AWSリソースの作成
- 認証情報の設定
- Chainlitでクライアントを設定
まずboto3が入っていなかったらインストールしましょう。
(例)
pip install boto3
1. AWSリソースの作成
ドキュメントを参考にCloudFormationで以下のDynamoDBテーブルとS3バケットを作成します。
※テーブル名とバケット名は適宜変更してください。
AWSTemplateFormatVersion: '2010-09-09'
Resources:
DynamoDBTable:
Type: 'AWS::DynamoDB::Table'
Properties:
TableName: 'your-table-name'
AttributeDefinitions:
- AttributeName: PK
AttributeType: S
- AttributeName: SK
AttributeType: S
- AttributeName: UserThreadPK
AttributeType: S
- AttributeName: UserThreadSK
AttributeType: S
KeySchema:
- AttributeName: PK
KeyType: HASH
- AttributeName: SK
KeyType: RANGE
GlobalSecondaryIndexes:
- IndexName: UserThread
KeySchema:
- AttributeName: UserThreadPK
KeyType: HASH
- AttributeName: UserThreadSK
KeyType: RANGE
Projection:
ProjectionType: INCLUDE
NonKeyAttributes:
- id
- name
BillingMode: PAY_PER_REQUEST
S3Bucket:
Type: 'AWS::S3::Bucket'
Properties:
AccessControl: Private
BucketName: 'your-bucket-name'
PublicAccessBlockConfiguration:
BlockPublicAcls: true
BlockPublicPolicy: true
IgnorePublicAcls: true
RestrictPublicBuckets: true
BucketEncryption:
ServerSideEncryptionConfiguration:
- ServerSideEncryptionByDefault:
SSEAlgorithm: AES256
2.認証情報の設定
aws cliのデフォルトの認証情報を必要な権限のものに設定しましょう。
3.Chainlitでクライアントを設定
ここでChainlitのコードを編集していきます。2025/1/13時点ではドキュメント通りにいかない部分があるので注意してください。
以下のコードを追記します。
import chainlit.data as cl_data
from chainlit.data.dynamodb import DynamoDBDataLayer
from chainlit.data.storage_clients.s3 import S3StorageClient
storage_client = S3StorageClient(bucket="<Your Bucket>")
cl_data._data_layer = DynamoDBDataLayer(table_name="<Your Table>", storage_provider=storage_client)
ドキュメント上は以下のようになっていますがインポートエラーが発生します。
20251/13時点でレポジトリを確認すると上記が正しいです。
from chainlit.data.storage_clients import S3StorageClient
次に環境変数AWS_REGIONにAWSリソースを作成したリージョンを設定します。
この設定がないとDynamoDBDataLayerはデフォルトでus-east-1を確認するので、テーブルが見つからないエラーになります。
※詳しくはおまけを参考
これで設定完了です。以下のようにチャット履歴を保存できるようになりました。
応答に対して人間によるフィードバック機能が自動で付くのが素敵です。
まとめ
以上のように、ある程度AWSに慣れていれば比較的簡単にチャット履歴を実装する事ができました。ドキュメント通りに実装できない部分もあったので、これからやってみる人の助けになれば嬉しいです。
まだRAG用のデータベースを登録したり、少し応用的な部分もあるかと思いますが、折を見て記事にしていきます。
おまけ
Literal AIを使ったデータレイヤーが何故か上手くいかない。
実行中に以下のようなエラーが表示されるため、ここに原因があるのではないかと思うんですが、cl.LangchainCallbackHandler()、最悪LangChain側のソースコードまで降りる必要がありそうだったので、一旦退避しました。
2025-01-13 10:10:08 - Error in callback coroutine: NotImplementedError('Chat model tracing is not supported in for original format.')
チャットモデルの出力フォーマットの話?知見のある方がいればコメントで教えてください。
DynamoDBDataLayerについて
このクラスはGitHubのレポジトリを確認すると以下のように定義されていました。
class DynamoDBDataLayer(BaseDataLayer):
def __init__(
self,
table_name: str,
client: Optional["DynamoDBClient"] = None,
storage_provider: Optional[BaseStorageClient] = None,
user_thread_limit: int = 10,
):
if client:
self.client = client
else:
region_name = os.environ.get("AWS_REGION", "us-east-1")
self.client = boto3.client("dynamodb", region_name=region_name) # type: ignore
self.table_name = table_name
self.storage_provider = storage_provider
self.user_thread_limit = user_thread_limit
self._type_deserializer = TypeDeserializer()
self._type_serializer = TypeSerializer()
(以下省略)
今回はtable_nameとstorage_providerを使ってDynamoDBDataLayerを定義しました。その場合はregion_nameを環境変数から読み取ってboto3を使ってクライアント作成するみたいですね。
先にboto3を使ってクライアントを作成してからDynamoDBDataLayerに渡す事も可能なようなのでお好みで
コード全文
あまり長くないので、コード全文を載せておきます。
必要なパッケージをインストールして環境変数の設定をしてもらったら
chainlit run app.py
で動作します
from typing import Annotated, Literal, TypedDict
import chainlit as cl
import chainlit.data as cl_data
from chainlit.data.dynamodb import DynamoDBDataLayer
from chainlit.data.storage_clients.s3 import S3StorageClient
from dotenv import load_dotenv
from langchain.schema.runnable.config import RunnableConfig
from langchain_community.tools.tavily_search import TavilySearchResults
from langchain_core.messages import HumanMessage
from langchain_core.tools import tool
from langchain_experimental.utilities import PythonREPL
from langchain_openai import ChatOpenAI
from langgraph.graph import END, START, MessagesState, StateGraph
from langgraph.prebuilt import create_react_agent
from langgraph.types import Command
storage_client = S3StorageClient(bucket="chainlit-datalayer-bucket-test")
cl_data._data_layer = DynamoDBDataLayer(
table_name="chainlit-datalayer-table", storage_provider=storage_client
)
load_dotenv()
tavily_tool = TavilySearchResults(max_results=2)
# This executes code locally, which can be unsafe
repl = PythonREPL()
@tool
def python_repl_tool(
code: Annotated[str, "The python code to execute to generate your chart."],
):
"""Use this to execute python code and do math. If you want to see the output of a value,
you should print it out with `print(...)`. This is visible to the user.
"""
try:
result = repl.run(code)
except BaseException as e:
return f"Failed to execute. Error: {e!r}"
result_str = (
f"Successfully executed:\n\\`\\`\\`python\n{code}\n\\`\\`\\`\nStdout: {result}"
)
return result_str
model = ChatOpenAI(model="gpt-4o-mini", temperature=0, streaming=True)
llm = model
# functions = [format_tool_to_openai_function(t) for t in tools]
# model = model.bind_functions(functions)
members = ["researcher", "coder"]
# Our team supervisor is an LLM node. It just picks the next agent to process
# and decides when the work is completed
options = members + ["FINISH"]
system_prompt = (
"You are a supervisor tasked with managing a conversation between the"
f" following workers: {members}. Given the following user request,"
" respond with the worker to act next. Each worker will perform a"
" task and respond with their results and status. When finished,"
" respond with FINISH."
)
class Router(TypedDict):
"""Worker to route to next. If no workers needed, route to FINISH."""
next: Literal[*options]
def supervisor_node(state: MessagesState) -> Command[Literal[*members, "__end__"]]:
messages = [
{"role": "system", "content": system_prompt},
] + state["messages"]
response = llm.with_structured_output(Router).invoke(messages)
goto = response["next"]
if goto == "FINISH":
goto = END
return Command(goto=goto)
research_agent = create_react_agent(
llm, tools=[tavily_tool], state_modifier="You are a researcher. DO NOT do any math."
)
def research_node(state: MessagesState) -> Command[Literal["supervisor"]]:
result = research_agent.invoke(state)
return Command(
update={
"messages": [
HumanMessage(content=result["messages"][-1].content, name="researcher")
]
},
goto="supervisor",
)
# NOTE: THIS PERFORMS ARBITRARY CODE EXECUTION, WHICH CAN BE UNSAFE WHEN NOT SANDBOXED
code_agent = create_react_agent(llm, tools=[python_repl_tool])
def code_node(state: MessagesState) -> Command[Literal["supervisor"]]:
result = code_agent.invoke(state)
return Command(
update={
"messages": [
HumanMessage(content=result["messages"][-1].content, name="coder")
]
},
goto="supervisor",
)
builder = StateGraph(MessagesState)
builder.add_edge(START, "supervisor")
builder.add_node("supervisor", supervisor_node)
builder.add_node("researcher", research_node)
builder.add_node("coder", code_node)
app = builder.compile()
@cl.oauth_callback
def oauth_callback(
provider_id: str,
token: str,
raw_user_data: dict[str, str],
default_user: cl.User,
) -> cl.User | None:
return default_user
@cl.on_message
async def on_message(msg: cl.Message):
config = {"configurable": {"thread_id": cl.context.session.id}}
cb = cl.LangchainCallbackHandler()
# cb = cl.AsyncLangchainCallbackHandler()
final_answer = cl.Message(content="")
for msg, metadata in app.stream(
{"messages": [HumanMessage(content=msg.content)]},
stream_mode="messages",
config=RunnableConfig(callbacks=[cb], **config),
):
if (
msg.content
and not isinstance(msg, HumanMessage)
and metadata["langgraph_node"] == "agent"
):
await final_answer.stream_token(msg.content)
await final_answer.send()
@cl.on_chat_resume
async def on_chat_resume(thread):
pass