なかなかニッチだと思いつつ、少し進捗が出たのでまとめます。
要件
APIでOpenAIのコンプリーションを取得する際、デフォルトではすべての生成が終わってから返却されます。プロンプトによっては数十秒待つことに。これを改善するために、コンプリーションをストリームすることができます。
OpenAI APIを直接呼び出すのであればシンプルです。こちらの記事に書いているように引数stream
をTrue
に設定するだけです。
ただ、実際にはAPIの呼び出し前や呼び出し後に処理を追加したいケースが多く、その場合には前処理、後処理を含むロジックを一連のパイプラインとしてデプロイする必要が出てきます。
そのため、ここではDatabricksのドライバープロキシーを用いてFlaskに処理のパイプラインをデプロイして、クライアントから呼び出す構成を組みます。その際に生成結果をストリーミングするようにします。
実装
サーバー
import os
os.environ["OPENAI_API_KEY"] = dbutils.secrets.get("demo-token-takaaki.yayoi", "openai_api_key")
import openai
def chat_gpt_helper(prompt):
"""
この関数はOpenAIのcompletions APIを用いてGpt4モデルのレスポンスを返却します
"""
try:
resp = ''
openai.api_key = os.getenv('OPENAI_API_KEY')
for chunk in openai.ChatCompletion.create(
model="gpt-4",
messages=[{
"role": "user",
"content":prompt
}],
stream=True,
):
content = chunk["choices"][0].get("delta", {}).get("content")
if content is not None:
yield content
except Exception as e:
print(e)
return str(e)
Flask側でもMIMEタイプtext/event-stream
でストリームを搬出しています。
from flask import Flask, jsonify, request, Response, stream_with_context
app = Flask("openai-streaming")
@app.route('/', methods=['POST'])
def stream_chat_gpt():
"""
ChatGPTのレスポンスをストリーミングします
"""
prompt = request.get_json(force = True).get('prompt','')
return Response(stream_with_context(chat_gpt_helper(prompt)),
mimetype='text/event-stream')
こちらはサーバーのURLやポート番号を確認するためのものです。
from dbruntime.databricks_repl_context import get_context
ctx = get_context()
port = "7777"
driver_proxy_api = f"https://{ctx.browserHostName}/driver-proxy-api/o/0/{ctx.clusterId}/{port}"
print(f"""
driver_proxy_api = '{driver_proxy_api}'
cluster_id = '{ctx.clusterId}'
port = {port}
""")
driver_proxy_api = 'https://xxxxxx.databricks.com/driver-proxy-api/o/0/0713-082554-nqq9eafd/7777'
cluster_id = '0713-082554-nqq9eafd'
port = 7777
以下のコマンドでドライバープロキシーを起動します。明示的に停止しない限り実行され続けるので注意してください。
app.run(host="0.0.0.0", port=port, debug=True, use_reloader=False, threaded=True)
クライアント
今回は同じDatabricksワークスペースの同じクラスターに別のノートブックをアタッチします。
クライアント側もtext/event-stream
を受け付けるようにしています。
import requests
import json
def get_stream(data):
headers = {"Accept": "text/event-stream"}
output_text = []
s = requests.Session()
with s.post("http://127.0.0.1:7777/", headers=headers, json=data, stream=True) as resp:
for line in resp.iter_content(decode_unicode=True):
if line:
print(line, end="", flush=True)
output_text.append(line)
return "".join(output_text)
プロンプトを入力してみます。
data = {"prompt": "猫を讃える歌を歌ってください"}
get_stream(data)
動きました!
参考資料
色々参考にさせていただいています。
- openai-cookbook/examples/How_to_stream_completions.ipynb at main · openai/openai-cookbook
- Streaming-ChatGPT-Responses-in-ReactJS/server/app.py at main · anshumankmr/Streaming-ChatGPT-Responses-in-ReactJS
- Stream Responses from OpenAI API with Python: A Step-by-Step Guide — CodingTheSmartWay
- ChatGPTをぬるぬるにする🐌Server-Sent Eventsの基礎知識
- pythonでSSE (server side events) の例を作ってみて遊んでみた - podhmo's diary
- Unable to send OpenAI stream response via Flask API - Stack Overflow
- Streamlitを使ってChatGPTのようなチャットアプリを簡単に作る
- 【python】【リアルタイム通信】FlaskでSSE(Server-Sent Events)を実装する
- FastAPIを使ってストリーム対応のLLM API RESTサーバを作る on Databricks - Qiita
- StreamlitでChatGPTのStreamを表示する - Qiita