概要
Chailitで作成したアプリのLLMOpsツールとしてLangfuseを導入しようとしたところ、公式のドキュメントやその他のサンプルコードが全く見つからかったのですが、試行錯誤の結果動作することがわかったためサンプルコードを残します。
同様の壁にぶつかった人の参考になれば幸いです。
導入手順
1. ChainlitのコードにLangfuseを追加する
LangChainにLangfuseを追加する手順と同様にChainlitのコード(ex: main.py
, app.py
etc...)に以下のコードを追加し、Langfuseを初期化します。
from langfuse.callback import CallbackHandler
langfuse_handler = CallbackHandler(
secret_key="sk-lf-...",
public_key="pk-lf-...",
host="https://cloud.langfuse.com"
)
2. Chainlitの@cl.on_message
にLangfuseのハンドラーを追加します
# メッセージ受信時の処理
@cl.on_message
async def on_message(message: cl.Message):
runnable = cast(Runnable, cl.user_session.get("runnable"))
msg = cl.Message(content="")
async for chunk in runnable.astream(
{"question": message.content},
config=RunnableConfig(callbacks=[
cl.LangchainCallbackHandler(),
langfuse_handler, # ここにハンドラーを追加します
]),
):
await msg.stream_token(chunk)
await msg.send()
結論
Langfuseの公式ドキュメントのinvoke
の例のように、Runnable
の引数のconfig(RunnableConfig)
のcallbacks
に指定してあげれば良いようです。
chain.invoke({"input": "<user_input>"}, config={"callbacks": [langfuse_handler]})
参考