LoginSignup
6
2

More than 1 year has passed since last update.

ChatGPTを用いたストリーミング回答表示機能の実装

Posted at

はじめに

前回までに次のことを確認した。

  • ChatGPT APIでPDFの内容を問い合わせる方法
  • Redmineに埋もれたナレッジをChatGPT APIで問い合わせて再利用する方法
  • ChatGPT APIを使った問答をチャットUIを使って実現する方法

現在のチャットUIの問題点

ChatGPT APIが回答を返すまでに時間が掛かる。ユーザーとしては、何もフィードバックがない状態で長時間待たされるのは望まない。本家のChatGPTのUIと同様にChatGPTの回答をストリーミングで表示するUIの実現方法を確認する。

結論

利用していたchatuxがチャット内容のストリーミング表示に未対応であった。そこで、チャットとは別の領域にChatGPTからの回答をストリーミングで表示させ、ChatGPTの回答をストリーミングで表示するUIを実現できることを確認した。

streaming_message800.gif

用意するもの

下記で記載した内容については触れない

ChatGPT APIへの問い合わせ処理

前回からの修正箇所を確認する

プログラムの変更点は、オリジナルの言語モデルの出力をストリーミングできるようにする新たなカスタムコールバックハンドラの追加と、そのハンドラを使用するための関数の更新です。新たなMyCustomCallbackHandlerクラスは、言語モデルが新しいトークンを生成するたびにコールバックを呼び出す機能を提供します。また、get_chainopenai_qa関数は、新たにコールバックを受け取り、ChatOpenAIインスタンス作成時にコールバックマネージャとストリーミングを引数として渡すように変更されている。これにより、モデルの出力がリアルタイムに処理され、即時フィードバックが可能になる。

CallbackManagerを利用してGPTが徐々に回答してくる内容を出力する。

gpt/openai_chat.py
from langchain.callbacks.manager import CallbackManager, BaseCallbackHandler

前回作成した get_chain関数 の引数を追加してコールバックを登録できるように変更

gpt/openai_chat.py
def get_chain(config, callback_streaming=None):
    ...
    # callback
    callback_manager = CallbackManager([MyCustomCallbackHandler(callback_streaming)])
    llm = ChatOpenAI(temperature=0, model_name="gpt-3.5-turbo", callback_manager=callback_manager, streaming=True)
    ...

GPTが新しいトークンを生成するたびにon_llm_new_tokenが呼ばれる。初期化時に引数に渡しておいたdummy_callbackが呼び出される。最終的にトークンが標準出力に出力される。

gpt/openai_chat.py
# Streaming対応
# ref: https://python.langchain.com/en/latest/modules/callbacks/getting_started.html
# ref: https://ict-worker.com/ai/langchain-stream.html
from typing import Any, Dict, List, Optional, Union
from langchain.schema import AgentAction, AgentFinish, LLMResult
class MyCustomCallbackHandler(BaseCallbackHandler):
    def __init__(self, callback):
        self.callback = callback
    """Custom CallbackHandler."""
    def on_llm_start(
        self, serialized: Dict[str, Any], prompts: List[str], **kwargs: Any
    ) -> Any:
        """Run when LLM starts running."""
    def on_llm_new_token(self, token: str, **kwargs: Any) -> Any:
        """Run on new LLM token. Only available when streaming is enabled."""
        if self.callback is not None:
            self.callback(token) 
    def on_llm_end(self, response: LLMResult, **kwargs: Any) -> Any:
        """Run when LLM ends running."""
    def on_llm_error(
        self, error: Union[Exception, KeyboardInterrupt], **kwargs: Any
    ) -> Any:
        """Run when LLM errors."""
    def on_chain_start(
        self, serialized: Dict[str, Any], inputs: Dict[str, Any], **kwargs: Any
    ) -> Any:
        """Run when chain starts running."""
    def on_chain_end(self, outputs: Dict[str, Any], **kwargs: Any) -> Any:
        """Run when chain ends running."""
    def on_chain_error(
        self, error: Union[Exception, KeyboardInterrupt], **kwargs: Any
    ) -> Any:
        """Run when chain errors."""
    def on_tool_start(
        self, serialized: Dict[str, Any], input_str: str, **kwargs: Any
    ) -> Any:
        """Run when tool starts running."""
    def on_tool_end(self, output: str, **kwargs: Any) -> Any:
        """Run when tool ends running."""
    def on_tool_error(
        self, error: Union[Exception, KeyboardInterrupt], **kwargs: Any
    ) -> Any:
        """Run when tool errors."""
    def on_text(self, text: str, **kwargs: Any) -> Any:
        """Run on arbitrary text."""
    def on_agent_action(self, action: AgentAction, **kwargs: Any) -> Any:
        """Run on agent action."""
    def on_agent_finish(self, finish: AgentFinish, **kwargs: Any) -> Any:
        """Run on agent end."""

MyCustomCallbackHandlerに登録するコールバック

gpt/openai_chat.py
def dummy_callback(token):
    print('callback>> \033[36m' + token + '\033[0m')
gpt/openai_chat.py
def openai_qa(query, history=[], callback_streaming=None):
    csv_qa = get_chain(load_config(), callback_streaming)
    ...
gpt/openai_chat.py
def main():
    # Question
    query = 'ラズパイでスクリーンショットを撮りたい'
    result = openai_qa(query, [], dummy_callback)

gpt/openai_chat.pyを実行しChatGPIからの回答をストリーミングでユーザーに示す方法が確認できた
callback.gif

チャットアプリ

概要

利用しているチャットアプリのライブラリchatuxはストリーミングでチャット内容を表示することができない。そこで、チャットのやり取りをしている領域とは別の領域を用意し、そこにChatGPTからの回答をストリーミングで表示することにする。

サーバーの修正箇所

主な修正箇所は @app.route('/listen') 部分の追加
ChatGPTからの回答をストリーミングでチャットのクライアントアプリ側に送信する。

app.py
# Ugh!: No end is reached because None is never put into qa_stream.
@app.route('/listen')
def listen():
    def stream():
        while True:
            msg = qa_stream.get()
            if msg is None:
                break 
            yield f'data: {msg}\n\n'
    response = flask.Response(stream(), mimetype='text/event-stream')
    return response

大域変数として用意した qa_stream という Queue にChatGPTからの回答を保存しておき、そのQueueからクライアントアプリに回答を送信している。Queueが1つしかないため、2つ以上の同時接続があると不具合が発生する。

app.py
def dummy_callback(token):
    qa_stream.put(token)
    print('callback>> \033[36m' + token + '\033[0m')

threaded=True/chat/listenに対する同時接続に対応

app.py
def main():
    logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s:%(name)s - %(message)s")   
    app.debug = True
    app.run(port='5001', threaded=True)

クライアントの修正箇所

/listenを監視しストリーミングでチャットサーバーからChatGPTの回答を受信する。受信したデータを <div id="streaming_msg"> の領域に書き込む。

<div id="streaming_msg" class="cool-box">
    <h1>Streaming message from GPT</h1>
</div>
<script>
    const messagesDiv = document.getElementById("streaming_msg");

    //  receiving ChatGPT streaming responses from /listen
    const eventSource = new EventSource("/listen");
    eventSource.onmessage = function (event) {
        const data = event.data;
        messagesDiv.innerHTML += data;
    };
    eventSource.onerror = function(event) {
        console.error("Connection error:", event);
        // no automatic re-connection
        eventSource.close();
    };
</script>

参考資料

6
2
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
6
2