2024/04/09 続編書きました。
LangChainを使って色々LLMアプリを作って遊んでいます。
- 体感速度が遅いけど、どこが遅いかわからない
- サンプルソースをコピペして作ったので、実は中身のことをわかってない
- 入力と出力だけじゃなくて、中間の状態も知りたい
みたいなことってありませんか?そんなときに使えるツールを見つけましたのでご紹介します。
Langfuseとは
LangfuseはLLMエンジニアリングプラットフォームです。LLMアプリからメトリクスやトレースを取得し可視化できます。また、評価、プロンプトの管理、データセットの作成なども行えます
OSS開発が進められており、開発の主導はFinto Technologies GmbHというドイツの企業のようです。
主要機能(公式サイトより)
LangSmithと類似したツールですが、OSSなのでセルフホストできる点がポイントです。クラウドサービスも提供されています。
Langfuseサーバーの起動
公式ドキュメントを参考にしました。
# Clone the Langfuse repository
git clone https://github.com/langfuse/langfuse.git
cd langfuse
docker-compose.ymlが用意されています。
詳細は不明ですが、Postgresが起動直後にシャットダウンしてしまう現象が発生。
そのためPostgresのバージョンを16.1
にダウングレードしました。(本日時点でlatestは16.2
)
services:
langfuse-server:
image: ghcr.io/langfuse/langfuse:latest
depends_on:
- db
ports:
- "3000:3000"
environment:
- DATABASE_URL=postgresql://postgres:postgres@db:5432/postgres
- NEXTAUTH_SECRET=mysecret
- SALT=mysalt
- NEXTAUTH_URL=http://localhost:3000
- TELEMETRY_ENABLED=${TELEMETRY_ENABLED:-true}
- NEXT_PUBLIC_SIGN_UP_DISABLED=${NEXT_PUBLIC_SIGN_UP_DISABLED:-false}
- LANGFUSE_ENABLE_EXPERIMENTAL_FEATURES=${LANGFUSE_ENABLE_EXPERIMENTAL_FEATURES:-false}
db:
- image: postgres
+ image: postgres:16.1
restart: always
environment:
- POSTGRES_USER=postgres
- POSTGRES_PASSWORD=postgres
- POSTGRES_DB=postgres
ports:
- 5432:5432
volumes:
- database_data:/var/lib/postgresql/data
volumes:
database_data:
driver: local
起動します。
# Start the server and database
docker compose up
上記方法はあくまでローカル環境向けとなります。プロダクション利用の際はSelf-Hosting Guideを参照し、環境を構築しましょう。
Langfuseの初期設定
Web画面で設定を行います。http://localhost:3000
にアクセスします。Sign up
のリンクからアカウントを作成し、サインインします。
サインイン直後の画面です。Create new project
からプロジェクトを作成します。
プロジェクトの設定画面です。
API keys
の部分でAPIキー(Secret KeyとPublic Key)が発行できます。後で使いますのでメモしておいてください。
最低限必要な設定は以上です。
検証用LLMアプリを作成
簡単にRAGを作成しました。
ナレッジDBを作成
iPhone15のリリース記事を持ってきました。
FaissのベクトルDBに格納し、ファイルに保存します。
from langchain_community.embeddings.bedrock import BedrockEmbeddings
from langchain_community.vectorstores import FAISS
from langchain_text_splitters import HTMLHeaderTextSplitter
headers_to_split_on = [
("h1", "Header 1"),
("h2", "Header 2"),
]
url = (
"https://www.apple.com/newsroom/2023/09/apple-debuts-iphone-15-and-iphone-15-plus/"
)
html_splitter = HTMLHeaderTextSplitter(headers_to_split_on=headers_to_split_on)
docs = html_splitter.split_text_from_url(url)
embeddings = BedrockEmbeddings(model_id="amazon.titan-embed-text-v1")
db = FAISS.from_documents(docs, embeddings)
db.save_local("faiss_index")
Streamlitアプリを作成
チャット履歴付きのRAGをStreamlitで作成します。長々と書いてますが、サンプルをほぼそのまま流用です。BedrockにしてStreamlitと組み合わせたぐらいです。
参考:Add chat history | 🦜️🔗 LangChain
import uuid
import streamlit as st
from langchain.chains import create_history_aware_retriever, create_retrieval_chain
from langchain.chains.combine_documents import create_stuff_documents_chain
from langchain_community.chat_message_histories import ChatMessageHistory
from langchain_community.chat_models.bedrock import BedrockChat
from langchain_community.embeddings.bedrock import BedrockEmbeddings
from langchain_community.vectorstores import FAISS
from langchain_core.messages import AIMessage, HumanMessage
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_core.runnables.history import RunnableWithMessageHistory
st.title("Bedrock Claude Haiku チャット")
if "retriever" not in st.session_state:
embeddings = BedrockEmbeddings(model_id="amazon.titan-embed-text-v1")
db = FAISS.load_local(
"faiss_index", embeddings, allow_dangerous_deserialization=True
)
retriever = db.as_retriever()
st.session_state["retriever"] = retriever
retriever = st.session_state["retriever"]
if "llm" not in st.session_state:
st.session_state["llm"] = BedrockChat(
model_id="anthropic.claude-3-haiku-20240307-v1:0"
)
llm = st.session_state["llm"]
if "session_id" not in st.session_state:
st.session_state["session_id"] = str(uuid.uuid4())[:8]
session_id = st.session_state["session_id"]
if "history" not in st.session_state:
st.session_state["history"] = ChatMessageHistory()
history = st.session_state["history"]
def get_session_history(session_id: str):
return history
def create_chain():
### Contextualize question ###
contextualize_q_system_prompt = """Given a chat history and the latest user question \
which might reference context in the chat history, formulate a standalone question \
which can be understood without the chat history. Do NOT answer the question, \
just reformulate it if needed and otherwise return it as is."""
contextualize_q_prompt = ChatPromptTemplate.from_messages(
[
("system", contextualize_q_system_prompt),
MessagesPlaceholder("chat_history"),
("human", "{input}"),
]
)
history_aware_retriever = create_history_aware_retriever(
llm, retriever, contextualize_q_prompt
)
### Answer question ###
qa_system_prompt = """You are an assistant for question-answering tasks. \
Use the following pieces of retrieved context to answer the question. \
If you don't know the answer, just say that you don't know. \
Use three sentences maximum and keep the answer concise.\
{context}"""
qa_prompt = ChatPromptTemplate.from_messages(
[
("system", qa_system_prompt),
MessagesPlaceholder("chat_history"),
("human", "{input}"),
]
)
question_answer_chain = create_stuff_documents_chain(llm, qa_prompt)
rag_chain = create_retrieval_chain(history_aware_retriever, question_answer_chain)
conversational_rag_chain = RunnableWithMessageHistory(
rag_chain,
get_session_history,
input_messages_key="input",
history_messages_key="chat_history",
output_messages_key="answer",
)
return conversational_rag_chain
if "chain" not in st.session_state:
st.session_state["chain"] = create_chain()
chain = st.session_state["chain"]
for h in history:
for message in h[1]:
if isinstance(message, AIMessage):
with st.chat_message("AI"):
st.write(message.content)
if isinstance(message, HumanMessage):
with st.chat_message("Human"):
st.write(message.content)
if prompt := st.chat_input("質問を入力"):
with st.chat_message("user"):
st.markdown(prompt)
with st.chat_message("assistant"):
stream = chain.stream(
{"input": prompt},
config={
"configurable": {"session_id": session_id},
},
)
def s():
for chunk in stream:
if "answer" in chunk:
yield chunk["answer"]
st.write_stream(s)
こんな感じになります。
チャット履歴付きのRAGが完成しました。
検証用LLMアプリにLangfuseを導入
LLMアプリからLangfuseサーバー側にトレース情報を送信する設定を行います。
LangfuseにはLangChainなどのフレームワークと簡単にインテグレーションする機能があるので、これを使用します。他にはLlamaIndex統合などもあります。
-
Import
+ # Initialize Langfuse handler + from langfuse.callback import CallbackHandler
-
CallbackHandlerを生成
Streamlit対応のためsession_stateにセットしています。session_idは一連のチャットごとに採番するイメージです。+ if "langfuse_handler" not in st.session_state: + st.session_state["langfuse_handler"] = CallbackHandler( + secret_key="sk-lf-2118d2b1-fa5e-4e1a-aa08-c1c1ab8671d9", + public_key="pk-lf-ef961176-2578-4a29-8d80-0ec3a1f34d8b", + host="http://localhost:3000", + session_id=session_id, + ) + langfuse_handler = st.session_state["langfuse_handler"]
-
Chain呼び出し時にCallbackHandlerを指定
stream = chain.stream( {"input": prompt}, config={ "configurable": {"session_id": session_id}, + "callbacks": [langfuse_handler], }, )
これで完了です。簡単ですね。
Langfuseのトレースを確認
設定後、検証用LLMアプリでメッセージを送信すると、Langfuseサーバーにトレース情報が送られます。Langfuseの画面のTracesメニューで確認できます。
適当にコピペして作ったので中身を把握してなかったのですが、こんなにいろいろ処理をしてたんですね😅
Sessionメニューでは1つのチャットしてのやり取りがまとめて確認できます。個々のトレースを確認したくなったら、右側のTrace IDをクリックすると画面遷移します。
Generationsメニューでは、LLM呼び出し部分だけをピックアップして一覧表示されます。レイテンシーを一覧で見れていいですね。
まとめ
Langfuseいいですね。まだ紹介できていない機能がたくさんあるので、次回紹介したいと思います。
いいねとストックをしてお待ち下さい(笑)
2024/04/09 続編書きました。