導入
ChatGPT APIはStream出力をサポートしており、これを使うことで本家ChatGPTのように結果を順次表示することが可能です。
pythonを使ってStreaming出力を利用する場合は以下のように実装できます
response = openai.ChatCompletion.create(
model="gpt-3.5-turbo-0613",
messages=[
{'role': 'user', 'content': 'Hello?'}
],
stream=True
)
for chunk in response:
chunk_message = chunk['choices'][0]['delta'].get('content', '')
print(chunk_message)
langchainを使うとこう
llm = OpenAI(
model="gpt-3.5-turbo-instruct",
)
for chunk in llm.stream("Hello?"):
print(chunk)
このように非常に簡単に実装可能です。
しかし、これをサーバーサイドとして扱う場合には通常のAPIではなく、リアルタイムな通信処理を実装する必要があります。
この時に有効なのがServer-Sent Eventsです。
Server-Sent Eventsとは
Server-Sent Events(以下SSE)とはリアルタイムな通信をHTTP経由で提供する技術です。
似た技術としてはWebSocketがありますが、WebSocketが双方向であるのに対し、SSEはサーバーからクライアントへの単方向通信となっています。
その分扱いが簡単なので、クライアント側からリアルタイムに情報を送る必要がない場合はこちらが適しています。
FastAPIによる実装
さて、今回はサーバーサイドにFastAPIを利用します。
FastAPIは非常に簡易なpython用サーバーサイドフレームワークであり、SwaggerUIを自動生成してくれるため、プロトタイプ制作に適しています。
FastAPIにおけるSSEは次のようなコードで実装できます。
## ファイル名はmain.py
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import uvicorn
from pydantic import BaseModel
import asyncio
app = FastAPI()
class StreamRequest(BaseModel):
query: str
async def send_token(query: str):
for token in query:
yield token
await asyncio.sleep(0.1)
@app.post("/streaming")
async def ask_stream(request: StreamRequest) -> StreamingResponse:
return StreamingResponse(
send_token(request.query),
media_type="text/event-stream",
)
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8000)
FastAPIのStreamingResponseはGeneratorを受け取ってevent-streamを作成します。
今回はsend_token関数定義内でyieldを使用することでGenerator関数として扱っています。
適当なshellでpython main.py
を実行してサーバーを起動したのち、
次のコマンドを実行して動作を確認します。
curl -N -X POST \
-H "Content-Type: application/json" \
-H "accept: text/event-stream" \
-d '{"query":"Streaming responseのテスト"}' \
http://127.0.0.1:8000/streaming
入力したテキストが良い感じに1文字ずつ出力されたら成功です。
ChatGPT APIに置き換えるとこんな感じになります。
async def send_token(query: str):
response = openai.ChatCompletion.create(
model="gpt-3.5-turbo-0613",
messages=[
{'role': 'user', 'content': query}
],
stream=True
)
for chunk in response:
chunk_message = chunk['choices'][0]['delta'].get('content', '')
yield chunk_message
text/event-stream形式はテキストしか扱えませんが、json.dumpsメソッドなどを利用することで構造データを送ることも可能なため、使い方はいろいろあると思います。
複雑な処理が必要な場合の拡張
さて、基本的には上記の方法で良いのですが、時折コールバックのようなyieldを利用できない処理を使わなくてはならないことがあると思います。(というか、以前のlangchainがそうでした)
こういった場合、Iteratorを自作してやることで柔軟な実装が可能となります。
まずStreamingResponseに渡すIteratorを用意します。
Iteratorはイテレーターオブジェクト自体を返す__iter__メソッドと、データを順次出力し、最後にStopIterationを出力する__next__メソッドを含むクラスとして以下のように実装できます。
import queue, asyncio
class StreamIterator:
def __init__(self):
self.queue = queue.Queue()
def __iter__(self):
return self
def __next__(self):
item = self.queue.get()
if item is StopIteration:
raise item
return item
def send(self, data):
self.queue.put(data)
def close(self):
self.queue.put(StopIteration)
この実装ではiteratorのsendメソッドを呼び出すことでテキストを送信できます。
また、処理中にエラーが起きた場合、iteratorが残っていると終了できなくなるためfinallyでcloseメソッドを実行します。
async def main_process(query: str, iterator: StreamIterator):
try:
await send_token(query, iterator)
except Exception as err:
print(err)
finally:
iterator.close()
async def send_token(query: str, iterator: StreamIterator):
for token in query:
iterator.send(token)
await asyncio.sleep(0.1)
これらのプロセスをasyncioモジュールのイベントループ上で実行し、
StreamingResponseに渡したiteratorにmain_process内でデータを格納することでSSEを実装できます。
@app.post("/streaming")
async def ask_stream(request: StreamRequest) -> StreamingResponse:
iterator = StreamIterator()
event_loop = asyncio.get_event_loop()
event_loop.create_task(main_process(request.query, iterator))
return StreamingResponse(
iterator,
media_type="text/event-stream",
)
再び適当なshellでpython main.py
を実行してサーバーを起動したのち、
次のコマンドを実行して動作を確認します。
curl -N -X POST \
-H "Content-Type: application/json" \
-H "accept: text/event-stream" \
-d '{"query":"Streaming responseのテスト"}' \
http://127.0.0.1:8000/streaming
入力したテキストが良い感じに1文字ずつ出力されたら成功です。
この方式ではiteratorを渡すことでコールバックなどyieldの扱いが難しい処理でもSSEでデータを送ることが可能となります。
結び
今回はFastAPIでServer-Sents Eventを実装してみました。
Iteratorの自作は多分しばらく使う機会はないと思いますが、仕組みの理解の上では役立つので暇があれば触ってみても良いかもしれません