1
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

FastAPIで、ollamaとUIを中継するstream機能を作ってみた

Last updated at Posted at 2025-05-09

記載日2025/05/09

LLMを使っていると、ユーザからのリクエストを加工し、その加工済みリクエストをLLMに投げてレスポンスをもらいたいことがあります。この時、LLMのレスポンスをstream形式でユーザへ返すことができるとユーザ体験が良くなります。
そこで、FastAPIのstream機能を作ってみました。LLMにはOllama(Llama3.2:3b)を使います。

やりたいこと

「OllamaのstreamレスポンスをFastAPIで受け取り、streamのままFastAPIから返す」ということを目標とします。streamでない時と比べて、ユーザ体験が良くなります。

環境

OS:Ubuntu 22.04 (正確にはWindows11のWSL2上)
Python:3.11.11 (pyenv使用しています)

コード

Ollamaサーバはすでに起動している前提です。
以下のようなディレクトリを想定しています。

--- プロジェクトディレクトリ
 |
 |--- fastapi.py
 |
 |--- models
 |      |
 |      |--- models.py
 |
 |--- services
        |
        |--- llama.py
# models/models.py

from pydantic import BaseModel

class Message(BaseModel):
    data: str

class ChatResponse(BaseModel):
    model: str
    created_at: str
    message: str
    done_reason: str
    done: bool
    total_duration: int
    load_duration: int
    prompt_eval_count: int
    prompt_eval_duration: int
    eval_count: int
    eval_duration: int
# services.llama.py
# ollamaとのやり取り。

import urllib.request
import json
from typing import List, AsyncIterable
from models.models import ChatResponse

class Llama():
    def prepare_header(self) -> dict[str, str]:
        headers = {
            'Content-Type': 'application/json',
        }
        return headers

    def prepare_body(self, user_prompt: str):
        data = {
            "model": "llama3.2",
            "messages": [{
                "role": "user",
                "content": user_prompt
            }],
            "stream": True
        }
        return data
        
    def chat_stream(self, message: str) -> AsyncIterable[ChatResponse]:
        url_ollama = "http://localhost:11434" + "/api/chat"

        
        req = urllib.request.Request(url_ollama, json.dumps(self.prepare_body(message)).encode(), self.prepare_header(), method='POST')
        with urllib.request.urlopen(req) as res:
            is_continue = True
            while is_continue:
                body = res.readline()
                if body == None or json.loads(body.decode())["done"] == True:
                    is_continue = False
                yield body
                
# fastapi.py

from fastapi import FastAPI
from fastapi import HTTPException
from fastapi.responses import StreamingResponse
from services.llama import Llama
from models.models import Message


app = FastAPI()
LlamaC = Llama()

@app.get("/")
async def root():
    now = datetime.datetime.now().strftime('%H:%M:%S')
    return {"health_check":"OK", "status":200, "time": now}
    
@app.post("/stream/")
async def stream(data: Message) -> StreamingResponse:
    try:
        return StreamingResponse(content = LlamaC.chat_stream(data.data), media_type="text/event-stream")
    except TimeoutError as e:
        print("Error:", e)
        raise HTTPException(status_code=408, detail="Timeout Error")
    except Exception as e:
        print("Error:", e)
        raise HTTPException(status_code=500, detail="Internal Service Error")

streamの動きを確認してみよう(StreamlitでUI作って、FastAPIのstreamを試す)

StreamlitでUIを作り、FastAPIからのレスポンスがstreamになっていることを確認します。
以下コードで下画像のようにstreamが確認できます。

・ストリームの途中

・レスポンスをすべて受け取った状態

# ui.py

import json
import streamlit as st
import streamlit.components.v1 as stc
import urllib.request


fastapi_path: str = "http://localhost:8000/"

def main():
    st.title("ストリーム確認")
    
    # 入力ボックス
    text = st.text_input("入力してください。")

    # ボタンを設置し、ボタンが押されたら実行
    if st.button("Stream data"):
        with st.spinner("waiting"):
            st.write_stream(stream(text))

def prepare_header() -> dict[str, str]:
    headers = {
        'Content-Type': 'application/json',
    }
    return headers

def prepare_body(message: str):
    data = {
        "data": message
    }
    return data
    
def stream(message: str):
    path = fastapi_path + "stream/"
    req = urllib.request.Request(path, json.dumps(prepare_body(message)).encode(), prepare_header(), method='POST')
    with urllib.request.urlopen(req) as res:
        is_x = True
        while is_x:
            body = res.readline()
            if body == None or json.loads(body.decode())["done"] == True:
                    is_x = False
            json_body = json.loads(body.decode())
            yield json_body["message"]["content"]
        
if __name__ == "__main__":
    main()

参考にさせて頂いたサイト

・ollamaの返り値のバイト列と文字列の扱いについて
https://qiita.com/masakielastic/items/2a04aee632c62536f82c
・StreamingResponseの使い方について
https://book.st-hakky.com/hakky/fastapi-streaming/
https://fastapi.tiangolo.com/ja/advanced/custom-response/#streamingresponse_1
https://zenn.dev/nano_sudo/scraps/a5daa87ca8f8d4
・Streamlitの基本
https://zenn.dev/alivelimb/books/python-web-frontend/viewer/about-streamlit
https://gihyo.jp/article/2024/10/monthly-python-2410
https://qiita.com/sypn/items/80962d84126be4092d3c
・StreamlitでのAPI呼び出し
https://js2iiu.com/2024/09/07/streamlit-11-api/

1
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?