はじめに
前回までに次のことを確認した。
- ChatGPT APIでPDFの内容を問い合わせる方法
- Redmineに埋もれたナレッジをChatGPT APIで問い合わせて再利用する方法
- ChatGPT APIを使った問答をチャットUIを使って実現する方法
現在のチャットUIの問題点
ChatGPT APIが回答を返すまでに時間が掛かる。ユーザーとしては、何もフィードバックがない状態で長時間待たされるのは望まない。本家のChatGPTのUIと同様にChatGPTの回答をストリーミングで表示するUIの実現方法を確認する。
結論
利用していたchatux
がチャット内容のストリーミング表示に未対応であった。そこで、チャットとは別の領域にChatGPTからの回答をストリーミングで表示させ、ChatGPTの回答をストリーミングで表示するUIを実現できることを確認した。
用意するもの
下記で記載した内容については触れない
ChatGPT APIへの問い合わせ処理
前回からの修正箇所を確認する
プログラムの変更点は、オリジナルの言語モデルの出力をストリーミングできるようにする新たなカスタムコールバックハンドラの追加と、そのハンドラを使用するための関数の更新です。新たなMyCustomCallbackHandlerクラス
は、言語モデルが新しいトークンを生成するたびにコールバック
を呼び出す機能を提供します。また、get_chain
とopenai_qa関数
は、新たにコールバック
を受け取り、ChatOpenAIインスタンス作成時にコールバックマネージャとストリーミングを引数として渡すように変更されている。これにより、モデルの出力がリアルタイムに処理され、即時フィードバックが可能になる。
CallbackManager
を利用してGPT
が徐々に回答してくる内容を出力する。
from langchain.callbacks.manager import CallbackManager, BaseCallbackHandler
前回作成した get_chain関数
の引数を追加してコールバック
を登録できるように変更
def get_chain(config, callback_streaming=None):
...
# callback
callback_manager = CallbackManager([MyCustomCallbackHandler(callback_streaming)])
llm = ChatOpenAI(temperature=0, model_name="gpt-3.5-turbo", callback_manager=callback_manager, streaming=True)
...
GPTが新しいトークンを生成するたびにon_llm_new_token
が呼ばれる。初期化時に引数に渡しておいたdummy_callback
が呼び出される。最終的にトークンが標準出力に出力される。
# Streaming対応
# ref: https://python.langchain.com/en/latest/modules/callbacks/getting_started.html
# ref: https://ict-worker.com/ai/langchain-stream.html
from typing import Any, Dict, List, Optional, Union
from langchain.schema import AgentAction, AgentFinish, LLMResult
class MyCustomCallbackHandler(BaseCallbackHandler):
def __init__(self, callback):
self.callback = callback
"""Custom CallbackHandler."""
def on_llm_start(
self, serialized: Dict[str, Any], prompts: List[str], **kwargs: Any
) -> Any:
"""Run when LLM starts running."""
def on_llm_new_token(self, token: str, **kwargs: Any) -> Any:
"""Run on new LLM token. Only available when streaming is enabled."""
if self.callback is not None:
self.callback(token)
def on_llm_end(self, response: LLMResult, **kwargs: Any) -> Any:
"""Run when LLM ends running."""
def on_llm_error(
self, error: Union[Exception, KeyboardInterrupt], **kwargs: Any
) -> Any:
"""Run when LLM errors."""
def on_chain_start(
self, serialized: Dict[str, Any], inputs: Dict[str, Any], **kwargs: Any
) -> Any:
"""Run when chain starts running."""
def on_chain_end(self, outputs: Dict[str, Any], **kwargs: Any) -> Any:
"""Run when chain ends running."""
def on_chain_error(
self, error: Union[Exception, KeyboardInterrupt], **kwargs: Any
) -> Any:
"""Run when chain errors."""
def on_tool_start(
self, serialized: Dict[str, Any], input_str: str, **kwargs: Any
) -> Any:
"""Run when tool starts running."""
def on_tool_end(self, output: str, **kwargs: Any) -> Any:
"""Run when tool ends running."""
def on_tool_error(
self, error: Union[Exception, KeyboardInterrupt], **kwargs: Any
) -> Any:
"""Run when tool errors."""
def on_text(self, text: str, **kwargs: Any) -> Any:
"""Run on arbitrary text."""
def on_agent_action(self, action: AgentAction, **kwargs: Any) -> Any:
"""Run on agent action."""
def on_agent_finish(self, finish: AgentFinish, **kwargs: Any) -> Any:
"""Run on agent end."""
MyCustomCallbackHandler
に登録するコールバック
def dummy_callback(token):
print('callback>> \033[36m' + token + '\033[0m')
def openai_qa(query, history=[], callback_streaming=None):
csv_qa = get_chain(load_config(), callback_streaming)
...
def main():
# Question
query = 'ラズパイでスクリーンショットを撮りたい'
result = openai_qa(query, [], dummy_callback)
gpt/openai_chat.py
を実行しChatGPIからの回答をストリーミングでユーザーに示す方法が確認できた
チャットアプリ
概要
利用しているチャットアプリのライブラリchatux
はストリーミングでチャット内容を表示することができない。そこで、チャットのやり取りをしている領域とは別の領域を用意し、そこにChatGPTからの回答をストリーミングで表示することにする。
サーバーの修正箇所
主な修正箇所は @app.route('/listen')
部分の追加
ChatGPTからの回答をストリーミングでチャットのクライアントアプリ側に送信する。
# Ugh!: No end is reached because None is never put into qa_stream.
@app.route('/listen')
def listen():
def stream():
while True:
msg = qa_stream.get()
if msg is None:
break
yield f'data: {msg}\n\n'
response = flask.Response(stream(), mimetype='text/event-stream')
return response
大域変数として用意した qa_stream
という Queue
にChatGPTからの回答を保存しておき、そのQueue
からクライアントアプリに回答を送信している。Queue
が1つしかないため、2つ以上の同時接続があると不具合が発生する。
def dummy_callback(token):
qa_stream.put(token)
print('callback>> \033[36m' + token + '\033[0m')
threaded=True
で /chat
と/listen
に対する同時接続に対応
def main():
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s:%(name)s - %(message)s")
app.debug = True
app.run(port='5001', threaded=True)
クライアントの修正箇所
/listen
を監視しストリーミングでチャットサーバーからChatGPTの回答を受信する。受信したデータを <div id="streaming_msg">
の領域に書き込む。
<div id="streaming_msg" class="cool-box">
<h1>Streaming message from GPT</h1>
</div>
<script>
const messagesDiv = document.getElementById("streaming_msg");
// receiving ChatGPT streaming responses from /listen
const eventSource = new EventSource("/listen");
eventSource.onmessage = function (event) {
const data = event.data;
messagesDiv.innerHTML += data;
};
eventSource.onerror = function(event) {
console.error("Connection error:", event);
// no automatic re-connection
eventSource.close();
};
</script>
参考資料