Flowise API ストリーミングの Python コードを実装したので紹介します。思いのほか苦労しました。
動作確認環境
- Ubuntu
- Flowise 1.6.2
- Python 3.12
- python-socketio 5.11.2
- httpx 0.27.0
Flowiseとは
Flowise は、ユーザーが LLM アプリを簡単に視覚化して構築できるようにすることを目的としたローコード/ノーコードのドラッグ&ドロップツールです。
LLM の APIキーが手元にあれば、チャットフローのデザイナ画面を開き、箱を3つほど配置しポチポチ設定するだけであっという間に簡単なチャットボットを作れます。
このツールの優れた点の1つは、作成したチャットフローをAPIとして外部から呼び出せることです。画面のアイコンをクリックすると Python や JavaScript 等のサンプルコードを確認できます。
Flowise API の Streaming について
Flowise の API は、ストリーミングにも対応しています。次のドキュメントにサンプルコードがあります。
でも Python のサンプルコードは見当たりません。当初は、OpenAI API の stream や LangChain の Runnableインタフェース の stream
や astream
のようなものだろうと高をくくっていましたが、サンプルコードを見るとそんな感じではなくどちらかというとLangChain の Callbacks のようなイベント駆動です。
今回は、それを Python のジェネレーターとして使えるように実装します。
Python パッケージ
サンプルコードが用いている socketIOC の Python 版 python-socketio を使います。
Python パッケージをインストール
pip install python-socketio[client]
pip install python-socketio[asyncio_client]
pip install httpx
非同期ストリーミング
import os
import sys
import queue
import base64
import asyncio
import httpx
import socketio
import threading
async def astream_prediction(payload, endpoint, flow_id, timeout=180):
"""チャットフロー非同期ストリーミング"""
# クエリ送信
async def on_connect(bucket, sid):
try:
url = os.path.join(endpoint, "api/v1/prediction", flow_id)
payload["socketIOClientId"] = sid
async with httpx.AsyncClient() as client:
r = await client.post(url, json=payload, timeout=timeout)
r.raise_for_status()
return r.json()
except Exception as e:
await bucket.put(e)
# イベントハンドラ
async def on_event(bucket, sock):
try:
while True:
name, *vals = await sock.receive(timeout=timeout)
if name == "token":
await bucket.put(vals[0])
elif name == "end":
await bucket.put(None)
return
except Exception as e:
await bucket.put(e)
# ジェネレータ
async with socketio.AsyncSimpleClient() as sio:
await sio.connect(endpoint)
bucket = asyncio.Queue()
task1 = asyncio.create_task(on_connect(bucket, sio.sid))
task2 = asyncio.create_task(on_event(bucket, sio))
while True:
data = await bucket.get()
if data is None:
break
elif isinstance(data, Exception):
task1.cancel()
task2.cancel()
raise data
else:
yield data
await asyncio.gather(task1, task2)
return
クエリ送信で Flowise へプロンプトを送信しますがそのレスポンスは AI のテキスト生成が完了するまでブロックされます。その間、Flowise から通知されたイベントをイベントハンドラで処理します。token
イベントで AI が生成したテキストの断片が届きます。
ジェネレータは、イベントハンドラがキューに書き込んだテキストを読み出し yield
します。クエリ送信がエラーとなった場合は、Exception
がキューに書き込まれるのでそれを検知し処理を中断します。
実行コード例
FLOWISE_ENDPOINT = "http://localhost:3000"
FLOWISE_CHATFLOW_ID = "72dfcc42-9993-4a60-a93f-1607b9dd0168"
FLOWISE_USERNAME = "demo"
FLOWISE_PASSWORD = "demo"
PAYLOAD = {
"question": "その山について教えてください",
"history": [
{
"type": "userMessage",
"message": "日本で一番高い山は?"
},
{
"type": "apiMessage",
"message": "富士山です"
}
]
}
# 実行
asyncio.run(astream(PAYLOAD, FLOWISE_ENDPOINT, FLOWISE_CHATFLOW_ID))
同期ストリーミング
同期ストリーミングは、非同期ストリーミングのジェネレータを別スレッドで呼び出します。
def stream_prediction(payload, endpoint, flow_id, timeout=180):
"""チャットフロー同期ストリーミング"""
# 非同期ストリーミングのジェネレータ呼び出し
async def writer(bucket):
try:
async for chunk in astream_prediction(payload, endpoint, flow_id, timeout):
bucket.put(chunk)
bucket.put(None)
except Exception as e:
bucket.put(e)
bucket = queue.Queue()
coro = writer(bucket)
task = threading.Thread(target=asyncio.run, args=(coro,))
task.start()
try:
while True:
data = bucket.get()
if data is None:
break
elif isinstance(data, Exception):
raise data
else:
yield data
finally:
task.join()
実行コード例
def stream(payload, endpoint, flow_id):
# ジェネレータ呼び出し
for chunk in stream_prediction(payload, endpoint, flow_id):
print(chunk, end="", flush=True)
# 実行
stream(PAYLOAD, FLOWISE_ENDPOINT, FLOWISE_CHATFLOW_ID)
ストリーミング可否
Flowise のチャットフローがストリーミングに対応しているかどうかを調べる API があります。
def chatflows_streaming(endpoint, flow_id):
# チャットフローがストリーミングをサポートしているかを確認
url = os.path.join(endpoint, "api/v1/chatflows-streaming", flow_id)
r = httpx.get(url)
r.raise_for_status()
print(r.json())
# 実行
chatflows_streaming(FLOWISE_ENDPOINT, FLOWISE_CHATFLOW_ID)
実行結果
{'isStreaming': True}
おまけ
Flowise の UI 用の API を拝借することでチャットフローの一覧を取得できます。Flowise画面のログインユーザ/パスワードで Basic認証されています。
def chatflows(endpoint, username, password):
# チャットフロー一覧取得
token = base64.urlsafe_b64encode(f"{username}:{password}".encode()).decode()
headers = {"Authorization": f"Basic {token}"}
url = os.path.join(endpoint, "api/v1/chatflows")
r = httpx.get(url, headers=headers)
r.raise_for_status()
print(r.json())
# 実行
chatflows(FLOWISE_ENDPOINT, FLOWISE_USERNAME, FLOWISE_PASSWORD)
実行結果
[
{
"id": "72dfcc42-9993-4a60-a93f-1607b9dd0168",
"name": "demo",
"flowData": {...},
"deployed": False,
"isPublic": False,
"apikeyid": None,
"chatbotConfig": None,
"apiConfig": None,
"analytic": None,
"speechToText": None,
"createdDate": "2024-03-27T07: 04: 53.000Z",
"updatedDate": "2024-03-27T08: 10: 39.000Z",
"category": None
}
]
最後まで目を通していただきありがとうございます。