1
3

Flowise の API を Python でストリーミング

Last updated at Posted at 2024-04-02

Flowise API ストリーミングの Python コードを実装したので紹介します。思いのほか苦労しました。

動作確認環境

  • Ubuntu
  • Flowise 1.6.2
  • Python 3.12
  • python-socketio 5.11.2
  • httpx 0.27.0

Flowiseとは

Flowise は、ユーザーが LLM アプリを簡単に視覚化して構築できるようにすることを目的としたローコード/ノーコードのドラッグ&ドロップツールです。

LLM の APIキーが手元にあれば、チャットフローのデザイナ画面を開き、箱を3つほど配置しポチポチ設定するだけであっという間に簡単なチャットボットを作れます。

チャットフローのデザイナ画面
327-02.png

このツールの優れた点の1つは、作成したチャットフローをAPIとして外部から呼び出せることです。画面のアイコンをクリックすると Python や JavaScript 等のサンプルコードを確認できます。

PythonからAPIを呼び出すサンプルコード
327-04.png

Flowise API の Streaming について

Flowise の API は、ストリーミングにも対応しています。次のドキュメントにサンプルコードがあります。

でも Python のサンプルコードは見当たりません。当初は、OpenAI API の stream や LangChain の Runnableインタフェースstreamastream のようなものだろうと高をくくっていましたが、サンプルコードを見るとそんな感じではなくどちらかというとLangChain の Callbacks のようなイベント駆動です。

今回は、それを Python のジェネレーターとして使えるように実装します。

Python パッケージ

サンプルコードが用いている socketIOC の Python 版 python-socketio を使います。

Python パッケージをインストール

pip install python-socketio[client]
pip install python-socketio[asyncio_client]
pip install httpx

非同期ストリーミング

import os
import sys
import queue
import base64
import asyncio
import httpx
import socketio
import threading

async def astream_prediction(payload, endpoint, flow_id, timeout=180):
    """チャットフロー非同期ストリーミング"""
    # クエリ送信
    async def on_connect(bucket, sid):
        try:
            url = os.path.join(endpoint, "api/v1/prediction", flow_id)
            payload["socketIOClientId"] = sid
            async with httpx.AsyncClient() as client:
                r = await client.post(url, json=payload, timeout=timeout)
            r.raise_for_status()
            return r.json()
        except Exception as e:
            await bucket.put(e)

    # イベントハンドラ
    async def on_event(bucket, sock):
        try:
            while True:
                name, *vals = await sock.receive(timeout=timeout)
                if name == "token":
                    await bucket.put(vals[0])
                elif name == "end":
                    await bucket.put(None)
                    return
        except Exception as e:
            await bucket.put(e)

    # ジェネレータ
    async with socketio.AsyncSimpleClient() as sio:
        await sio.connect(endpoint)
        bucket = asyncio.Queue()
        task1 = asyncio.create_task(on_connect(bucket, sio.sid))
        task2 = asyncio.create_task(on_event(bucket, sio))
        while True:
            data = await bucket.get()
            if data is None:
                break
            elif isinstance(data, Exception):
                task1.cancel()
                task2.cancel()
                raise data
            else:
                yield data

    await asyncio.gather(task1, task2)
    return

クエリ送信で Flowise へプロンプトを送信しますがそのレスポンスは AI のテキスト生成が完了するまでブロックされます。その間、Flowise から通知されたイベントをイベントハンドラで処理します。tokenイベントで AI が生成したテキストの断片が届きます。

ジェネレータは、イベントハンドラがキューに書き込んだテキストを読み出し yield します。クエリ送信がエラーとなった場合は、Exception がキューに書き込まれるのでそれを検知し処理を中断します。

実行コード例

FLOWISE_ENDPOINT = "http://localhost:3000"
FLOWISE_CHATFLOW_ID = "72dfcc42-9993-4a60-a93f-1607b9dd0168"
FLOWISE_USERNAME = "demo"
FLOWISE_PASSWORD = "demo"
PAYLOAD = {
    "question": "その山について教えてください",
    "history": [
        {
            "type": "userMessage",
            "message": "日本で一番高い山は?"
        },
        {
            "type": "apiMessage",
            "message": "富士山です"
        }
    ]
}

# 実行
asyncio.run(astream(PAYLOAD, FLOWISE_ENDPOINT, FLOWISE_CHATFLOW_ID))

同期ストリーミング

同期ストリーミングは、非同期ストリーミングのジェネレータを別スレッドで呼び出します。

def stream_prediction(payload, endpoint, flow_id, timeout=180):
    """チャットフロー同期ストリーミング"""
    # 非同期ストリーミングのジェネレータ呼び出し
    async def writer(bucket):
        try:
            async for chunk in astream_prediction(payload, endpoint, flow_id, timeout):
                bucket.put(chunk)
            bucket.put(None)
        except Exception as e:
            bucket.put(e)

    bucket = queue.Queue()
    coro = writer(bucket)
    task = threading.Thread(target=asyncio.run, args=(coro,))
    task.start()
    try:
        while True:
            data = bucket.get()
            if data is None:
                break
            elif isinstance(data, Exception):
                raise data
            else:
                yield data
    finally:
        task.join()

実行コード例

def stream(payload, endpoint, flow_id):
    # ジェネレータ呼び出し
    for chunk in stream_prediction(payload, endpoint, flow_id):
        print(chunk, end="", flush=True)

# 実行
stream(PAYLOAD, FLOWISE_ENDPOINT, FLOWISE_CHATFLOW_ID)

ストリーミング可否

Flowise のチャットフローがストリーミングに対応しているかどうかを調べる API があります。

def chatflows_streaming(endpoint, flow_id):
    # チャットフローがストリーミングをサポートしているかを確認
    url = os.path.join(endpoint, "api/v1/chatflows-streaming", flow_id)
    r = httpx.get(url)
    r.raise_for_status()
    print(r.json())

# 実行
chatflows_streaming(FLOWISE_ENDPOINT, FLOWISE_CHATFLOW_ID)

実行結果

{'isStreaming': True}

おまけ

Flowise の UI 用の API を拝借することでチャットフローの一覧を取得できます。Flowise画面のログインユーザ/パスワードで Basic認証されています。

def chatflows(endpoint, username, password):
    # チャットフロー一覧取得
    token = base64.urlsafe_b64encode(f"{username}:{password}".encode()).decode()
    headers = {"Authorization": f"Basic {token}"}
    url = os.path.join(endpoint, "api/v1/chatflows")
    r = httpx.get(url, headers=headers)
    r.raise_for_status()
    print(r.json())

# 実行
chatflows(FLOWISE_ENDPOINT, FLOWISE_USERNAME, FLOWISE_PASSWORD)

実行結果

[
    {
        "id": "72dfcc42-9993-4a60-a93f-1607b9dd0168",
        "name": "demo",
        "flowData": {...},
        "deployed": False,
        "isPublic": False,
        "apikeyid": None,
        "chatbotConfig": None,
        "apiConfig": None,
        "analytic": None,
        "speechToText": None,
        "createdDate": "2024-03-27T07: 04: 53.000Z",
        "updatedDate": "2024-03-27T08: 10: 39.000Z",
        "category": None
    }
]

最後まで目を通していただきありがとうございます。

1
3
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
3