1
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

GraphAI × MLX × FastAPI でローカルLLMパイプライン構築 Gemma 4Bで複数サイトのナレッジ抽出を高速・低コストで実現

Posted at

🟢 はじめに

ChatGPT(GPT-4o)やGeminiなどの大規模LLMを使えば高精度な処理は可能ですが、
実行コストやレートリミットなどの問題から、大量処理には向きません

今回は、私が実際に構築したローカルLLMパイプラインについて紹介します。
Web上の記事からナレッジを抽出するパイプラインの構築を目指しました。

小さなGemma 4Bモデルを使って、FastAPI + MLX + 並列処理高速かつ低コストな実行が可能になりました。


🎯 解決したかった課題

あるタスクで、「知りたいこと」に関する記事を複数収集し、ナレッジを抽出・統合したいという要件がありました。
しかし以下のような現実的な制約に直面しました:

  • Gpt-4o / Gemini は コンテキスト長が長くなるほど課金額が大きくなる
  • 2時間程度使い続けるだけで千円/日近い課金が必要になるケースもある(日次で数時間実行するような処理は組み込めない)
  • 処理自体は「複雑な推論」を必要とせず、不明点に対する回答や事実を抽出するだけ

つまり、「reasoning不要・数をこなす必要がある処理」にLLMの“大砲”はオーバースペック です


🔧 ナレッジ抽出処理フロー概要(7ステップ)

この処理フローをローカル環境で回すための構成を以下のように設計しました。

① ユーザーが知りたい内容を文章で入力
② LLMで検索用キーワードを3つ抽出
③ 各キーワードごとに SerpAPI で Google検索
④ 各検索結果から上位3記事のURLを取得(計9記事)
⑤ 各URLのHTMLを取得し、Markdown形式に整形(前処理済)
⑥ 各記事を2000文字でチャンク化しLLMでナレッジを抽出(最低9回)
⑦ 抽出したナレッジをLLMで統合(1回)

GraphAIを活用することで③〜⑥の9サイト分の処理を並列実行してスループットを確保しています。
また、MLX+FastAPIを活用することで⑥のLLMによるナレッジ抽出をバッチ化・並列化を行い高いTPS(token/second)を確保しています。

本記事では MLX+FastAPIのさわりの部分を紹介します。


⚙️ 実行環境と構成

項目 内容
実行環境 Mac Studio M3 Ultra / メモリ256GB
モデル gemma-3-4b-it-4bit-DWQ / MLX
LLM推論 FastAPI 上で常駐、起動時にモデルロード
並列処理 uvicorn --workers=4 によるプロセス並列
出力形式 Markdown
API管理 REST構成で各ステップを統一呼び出し

🤖 なぜ Gemma 4B(4bit)を選んだか?

理由 解説
Reasoning不要 複雑な思考は不要。表面的な情報の抽出で十分
コンテキスト長重視 複数記事に対応するため 4B でも十分
軽量 MLX 上で高速に動作、メモリ消費も抑えられる
起動時読み込みで待機 毎回のモデル初期化が不要で高速レスポンス可

🚀 MLX × FastAPIの実験結果

1. MLX × FastAPIのセットアップ

  • requirements.txt
    mlx==0.25.2
    mlx-lm==0.24.1
    fastapi
    uvicorn
    uvloop
    python-dotenv
    
  • .env
    MODEL_DIR=mlx-community/gemma-3-4b-it-4bit-DWQ
    GPU_SLOTS=5
    BATCH_MAX_SIZE=5
    BATCH_TIMEOUT_SECONDS=1.0
    VERBOSE=False
    
  • server.py
    import os
    from dotenv import load_dotenv
    import asyncio
    import time
    import uuid
    from fastapi import FastAPI, HTTPException
    from typing import List, Dict, Tuple, Any, Optional # Optional を追加
    from pydantic import BaseModel, Field
    from mlx_lm import load as load_model, generate as generate_once
    from mlx_lm.sample_utils import make_sampler
    
    
    load_dotenv()
    
    
    # --- 設定値 ---
    MODEL_DIR = os.environ.get("MODEL_DIR", "mlx-community/gemma-3-4b-it-qat-4bit")
    print(f"Loading model from {MODEL_DIR}...")
    GPU_SLOTS = int(os.environ.get("GPU_SLOTS", 5))
    BATCH_MAX_SIZE = int(os.environ.get("BATCH_MAX_SIZE", 5))
    BATCH_TIMEOUT_SECONDS = float(os.environ.get("BATCH_TIMEOUT_SECONDS", 1.5))
    VERBOSE = os.environ.get("VERBOSE", "False").lower() in ("1", "true", "yes")
    
    
    # --- グローバル変数 ---
    model, tokenizer = load_model(MODEL_DIR)
    model.eval()
    
    app = FastAPI(title="MLX-LM API (async semaphore with batching)")
    
    sem = asyncio.Semaphore(GPU_SLOTS)
    request_queue: asyncio.Queue[Tuple[Any, asyncio.Future, str]] = asyncio.Queue()
    
    
    # --- Pydanticモデル ---
    class ChatReq(BaseModel):
        prompt: str = Field(..., example="こんにちは、自己紹介してください。")
        max_tokens: int = 1024
        temperature: float = 0.1
    
    
    class ChatResp(BaseModel): # 変更点: トークン数を追加
        text: str
        prompt_tokens: Optional[int] = None
        completion_tokens: Optional[int] = None
        total_tokens: Optional[int] = None
    
    
    class OpenAIChatMessage(BaseModel):
        role: str
        content: str
    
    
    class OpenAIChatRequest(BaseModel):
        messages: List[OpenAIChatMessage]
        model: str = MODEL_DIR
        max_tokens: int = 1024
        temperature: float = 0.1
    
    
    # --- バッチ処理ワーカ ---
    async def process_batches():
        while True:
            gathered_items: List[Tuple[Any, asyncio.Future, str]] = []
            batch_start_time = time.monotonic()
    
            while len(gathered_items) < BATCH_MAX_SIZE:
                try:
                    remaining_timeout = BATCH_TIMEOUT_SECONDS - (time.monotonic() - batch_start_time)
                    if remaining_timeout <= 0:
                        if len(gathered_items) > 0:
                            break
                        else:
                            await asyncio.sleep(0.01)
                            batch_start_time = time.monotonic()
                            continue
    
                    req_data, future, req_type = await asyncio.wait_for(
                        request_queue.get(),
                        timeout=max(0.001, remaining_timeout)
                    )
                    gathered_items.append((req_data, future, req_type))
                    request_queue.task_done()
                except asyncio.TimeoutError:
                    if len(gathered_items) > 0:
                        break
                except Exception as e:
                    print(f"Error in batch worker queue retrieval: {e}")
                    break
    
            if not gathered_items:
                continue
            batch_size = len(gathered_items)
            print(f"バッチ実績: {batch_size} 件のリクエストを処理します")
    
            batch_process_start = time.monotonic()
            llm_times = []
            for req_data, future, req_type in gathered_items:
                if future.done():
                    continue
                try:
                    async with sem:
                        loop = asyncio.get_running_loop()
    
                        current_prompt: str
                        current_max_tokens: int
                        current_temperature: float
    
                        if req_type == "completions":
                            assert isinstance(req_data, ChatReq)
                            current_prompt = req_data.prompt
                            current_max_tokens = req_data.max_tokens
                            current_temperature = req_data.temperature
                        elif req_type == "chat_completions":
                            assert isinstance(req_data, OpenAIChatRequest)
                            # OpenAI形式では、通常、複数のメッセージを結合してプロンプトを形成しますが、
                            # mlx_lm.generateは単一のプロンプト文字列を取るため、ここでは最後のメッセージを使用します。
                            # より高度なシナリオでは、tokenizer.apply_chat_templateなどを使用して
                            # 複数のメッセージから単一のプロンプト文字列を構築する必要があります。
                            # ここでは簡易的に最後のユーザーメッセージのcontentを使用します。
                            if req_data.messages:
                                # 最後のメッセージを使用するか、あるいは全メッセージを結合するかは要件によります。
                                # tokenizer.apply_chat_template が利用可能であれば、それを使うのがベストです。
                                # ここでは簡易的に最後のメッセージの content を使います。
                                current_prompt = tokenizer.apply_chat_template(
                                    [{"role": msg.role, "content": msg.content} for msg in req_data.messages],
                                    tokenize=False, # トークンIDではなく文字列として取得
                                    add_generation_prompt=True # モデルが応答を生成するためのプロンプト部分を追加
                                )
                                # もし apply_chat_template が上記のような引数を取らない、または利用できない場合、
                                # 手動でフォーマットするか、最後のメッセージの content を使うなどの代替策が必要です。
                                # 例: current_prompt = req_data.messages[-1].content
                            else:
                                current_prompt = "" # messages が空の場合のフォールバック
    
                            current_max_tokens = req_data.max_tokens
                            current_temperature = req_data.temperature
                        else:
                            raise ValueError(f"Unknown request type: {req_type}")
    
                        sampler = make_sampler(temp=current_temperature)
    
                        # tokenizer.encode() はリストのトークンIDを返す
                        prompt_token_ids = tokenizer.encode(current_prompt)
                        num_prompt_tokens = len(prompt_token_ids)
    
                        llm_start = time.monotonic()
                        text_result = await loop.run_in_executor(
                            None,
                            lambda: generate_once(
                                model,
                                tokenizer,
                                prompt=current_prompt, # ここで渡すプロンプトは文字列
                                max_tokens=current_max_tokens,
                                sampler=sampler,
                                verbose=VERBOSE
                            ),
                        )
                        llm_elapsed = time.monotonic() - llm_start
                        llm_times.append(llm_elapsed)
    
                        completion_token_ids = tokenizer.encode(text_result)
                        num_completion_tokens = len(completion_token_ids)
                        # 注意: `tokenizer.encode` は通常、文字列の先頭にBOSトークンなどを追加する場合があります。
                        #       正確な「生成された」トークンだけを数えたい場合は、`generate_once`が
                        #       生成したトークンIDのリストを直接返すようにするか、
                        #       `tokenizer.decode`する前のトークンIDリストの長さからプロンプトトークンIDの長さを引くなど、
                        #       より精密な制御が必要です。
                        #       ここでは、生成されたテキストを再度エンコードした長さを completion_tokens としています。
    
                        num_total_tokens = num_prompt_tokens + num_completion_tokens
    
                        if llm_elapsed >= 60:
                            print(f"LLM実行が60秒以上かかりました。current_prompt: {current_prompt[:100]}...")
                        if "javascript" in text_result or "JavaScript" in text_result:
                            print(f"JavaScriptから開始しています。current_prompt: {current_prompt[:100]}...")
    
                    if req_type == "completions":
                        future.set_result(ChatResp(
                            text=text_result,
                            prompt_tokens=num_prompt_tokens,
                            completion_tokens=num_completion_tokens,
                            total_tokens=num_total_tokens
                        ))
                    elif req_type == "chat_completions":
                        assert isinstance(req_data, OpenAIChatRequest)
                        response_payload = {
                            "id": f"chatcmpl-{uuid.uuid4().hex}",
                            "object": "chat.completion",
                            "created": int(time.time()),
                            "model": req_data.model,
                            "choices": [
                                {
                                    "index": 0,
                                    "message": {"role": "assistant", "content": text_result},
                                    "finish_reason": "stop", # mlx_lm.generate は stop 以外の理由を返さないため固定
                                }
                            ],
                            "usage": {
                                "prompt_tokens": num_prompt_tokens,
                                "completion_tokens": num_completion_tokens,
                                "total_tokens": num_total_tokens,
                            },
                        }
                        future.set_result(response_payload)
    
                except Exception as e:
                    print(f"Error during inference for a request: {e}")
                    if not future.done():
                        future.set_exception(e)
    
            batch_process_elapsed = time.monotonic() - batch_process_start
            print(f"バッチサイズ: {batch_size}件, バッチ全体の処理時間: {batch_process_elapsed:.3f}秒, 各LLM実行時間: {[f'{t:.3f}' for t in llm_times]}")
    
    
    @app.on_event("startup")
    async def startup_event():
        asyncio.create_task(process_batches())
        print(f"Batch processing worker started. Batch_size={BATCH_MAX_SIZE}, Batch_timeout={BATCH_TIMEOUT_SECONDS}s")
    
    
    async def _queue_request_and_wait(request_data: Any, request_type: str):
        start_time = time.monotonic()
        future = asyncio.get_running_loop().create_future()
        try:
            await request_queue.put((request_data, future, request_type))
            response_data = await future
            return response_data
        except asyncio.CancelledError:
            print("Request was cancelled by client.")
            raise HTTPException(status_code=499, detail="Client Closed Request")
        except Exception as e:
            print(f"Error in queuing request or awaiting future: {e}")
            raise HTTPException(status_code=500, detail=str(e))
        finally:
            elapsed = time.monotonic() - start_time
            print(f"_queue_request_and_wait elapsed: {elapsed:.3f}")
    
    
    @app.post("/v1/completions", response_model=ChatResp) # response_model は更新された ChatResp を指す
    async def completions_endpoint(req: ChatReq):
        return await _queue_request_and_wait(req, "completions")
    
    
    @app.post("/v1/chat/completions", response_model=Dict[str, Any])
    async def chat_completions_endpoint(req: OpenAIChatRequest):
        print("-- start --")
        if not req.messages:
            raise HTTPException(status_code=400, detail="messages is required")
        # OpenAIのチャット形式の場合、プロンプトの構築方法が重要です。
        # ここでは process_batches 内で apply_chat_template を使用する想定です。
        return await _queue_request_and_wait(req, "chat_completions")
    
  • 環境構築と起動
    # 環境構築
    python3 -m venv venv
    source venv/bin/activate  # Windowsの場合: venv\Scripts\activate
    pip install --upgrade pip
    pip install -r requirements.txt
    
    # 起動
    uvicorn server:app --host 0.0.0.0 --port 8080 --loop uvloop --workers 5
    

2. MLX × FastAPIの実行

テスト用のスクリプトです。
2025年5月4に開催された井上尚弥対カルデナスについての記事からテキストを抜き出したものに対しLLMを使って情報を抽出しています。

import asyncio
import time
import statistics
import json
import httpx
from datetime import datetime

_text = """
このページではJavaScriptを使用しています。 プロボクシングの井上尚弥選手が4日、アメリカ・ラスベガスで行われたスーパーバンタム級の4団体統一王座の防衛戦で、アメリカのラモン・カルデナス選手に8ラウンド、テクニカルノックアウト勝ちし、4団体統一チャンピオンとして4回目の防衛に成功しました。32歳の井上選手は、これまでプロ29戦全勝で、スーパーバンタム級4団体統一王座の4回目の防衛戦として4日、アメリカ・ラスベガスで、WBA=世界ボクシング協会のスーパーバンタム級1位、アメリカのカルデナス選手と対戦しました。    井上選手は第1ラウンド、左のジャブを軸に相手に圧力をかけましたが、第2ラウンドに接近戦からカルデナス選手に左のフックで顔面を捉えられてダウンを喫し、会場は騒然となりました。    それでも徐にペースを取り戻し、第7ラウンドに右のストレートが相手の顔を捉えてダウンを奪い返しました。    そして、直後の第8ラウンドは相手をロープ際に追い詰め、連打を浴びせたところでレフェリーが試合を止めました。    井上選手は8ラウンド45秒でテクニカルノックアウトで勝って4団体統一チャンピオンとして4回目の防衛に成功し、プロ30戦全勝としました。## 井上「映像で見ていたより強い選手だった」井上選手は世界タイトルマッチ23試合目のノックアウト勝ちとなり、77年ぶりに最多記録を更新しましたが、第2ラウンドにダウンを喫するなど序盤は苦しい展開となりました。    試合後の会見で「カルデナス選手はタフさがあるので、そう簡単にはいかないなと思っていた。映像で見ていたよりもっともっと強い選手だった。全体的なボクシングもそうだが、すごく対策をしてきていると感じた」と振り返りました。    そして、ダウンのあとは「落ち着いてポイントをピックアップしていくことを考えた」ということで、「ラスベガスでカルデナス選手と戦えたことはキャリアの中ですごくいい経験になった」と話していました。    井上選手の次の防衛戦はことし9月に行われ、WBA=世界ボクシング協会の暫定チャンピオンで、ウズベキスタンのムロジョン・アフマダリエフ選手と戦うことが発表されました。## カルデナス「最高のショーを見せられたことがうれしい」井上選手からダウンを奪う健闘を見せたカルデナス選手は試合後「重要なことは最高の相手と戦うことで、負けることは気にしていなかった。パンチをもらっても思わず笑ってしまうくらい、この瞬間を心から楽しんでいた。悔しい気持ちはあるが、これがボクシングだ」とところどころ赤く腫れた顔で笑顔を見せていました。    試合については「井上選手が入ってくるタイミングをねらっていた。彼の手がパンチを打つ時に少し下がることがわかっていたので、そこをねらうことがプランだった」と明かしました。    そのうえで「彼は間違いなく史上最高の選手の1人だ。パワーそのものより、6発、7発、8発と打ってくる連打が圧倒的だった。レフェリーに止められたときは『まだ行ける』と伝えたし、残念だったが、皆さんに最高のショーを見せられたことがなによりうれしい」と充実感をにじませていました。## ボクシングの聖地でも高まる期待感ラスベガスでは、これまでに多くのビッグマッチが行われていて、アメリカのボクシングの聖地と呼ばれています。    なかでも5月5日は「シンコ・デ・マヨ」というボクシングが盛んなメキシコ最大の祝日で、毎年、この日に近い週末などに注目の試合が組まれています。    2015年に「世紀の一戦」と呼ばれたフロイド・メイウェザーさんとマニー・パッキャオさんの試合も、この「シンコ・デ・マヨ」にあわせてラスベガスで行われました。    ヘビー級をはじめとした重量級の人気が高いアメリカで、スーパーバンタム級の試合がメインイベントとして組まれることは異例だということで、井上選手への高い期待感がうかがえます。    会場はラスベガスの中心部にある2万人が収容できる「Tーモバイルアリーナ」で、対戦相手がメキシコ系アメリカ人のカルデナス選手ということもあり、会場の周辺には数日前から井上選手の巨大広告が至る所に掲げられ、街全体が盛り上がりを見せていました。    また、井上選手が所属する大橋ジムの大橋秀行会長は、井上選手にとって過去最高額のファイトマネーになることを明かしています。    メキシコにルーツがあるという男性ファンは「井上選手はパワーもフットワークもあり、ザ・モンスターと呼ばれるだけある」としたうえで、「私たちにとってもこの日は特別ですが、世界中で楽しまれているスポーツで日本人のボクサーが注目されることはいいことだと思う。彼は未来に名前を残すだろう」と話していました。    また、観戦に訪れた日本人の男性は「ここで日本人がメインを張るということにびっくりしている。早く終わらないノックアウト勝ちを期待している」と話していました。20250505/10014797341000
"""
_question = """
1. 井上尚弥はどのような試合で勝利しましたか?
2. 井上選手の次の防衛戦はいつ、どこで行われますか?
3. カルデナス選手は試合後にどのような感想を述べましたか?
"""
_query = f"""
    # メタ情報:
    - 現在の時刻は「{datetime.now()}」です。
    
    # 命令指示書
    あなたはプロのジャーナリストです。
    コンテンツを作成するために収集した情報を整理しています。
    制約条件に従い入力情報を元に下記手順で最高の成果物を生成してください。
    
    1. 入力情報から質問に対する回答を抽出し、成果物フォーマットの「QA」に列挙すること。回答不可の場合は「回答不可」とすること。
    2. 入力情報から最大3つの事実を抽出し、成果物フォーマットの「事実」に列挙すること。可能な限り5W2Hを明らかにし定量的に表現すること。

    # 制約条件
    - コードや論理式ではなく日本語のレポートとして出力すること
    - 最終成果物のみ出力すること

    # 質問
    ```
    {_question}
    ```

    # 入力情報
    ```
    {_text}
    ```

    # 成果物フォーマット
    ```
    ## QA:
        - "質問1": "回答1"
        - "質問2": "回答2"
        - "質問3": "回答3"
        ・・・
    ## 事実:
        - "事実1"
        - "事実2"
        ・・・
    ```
    """

URL = "http://localhost:8080/v1/chat/completions"
HEADERS = {"Content-Type": "application/json"}
PAYLOAD = {
    "messages": [{"role": "user", "content": _query}],
    # "max_tokens": 1024,
    # "model": "your-model-name"
}

# CONCURRENCY_LEVELS を 1 と 5 を含むように調整 (またはテストしたい値に)
CONCURRENCY_LEVELS = [1, 5, 10, 15, 20]

async def one_request(client, latencies, prompt_tokens_list, completion_tokens_list, concurrency_level: int):
    t0 = time.perf_counter()
    try:
        r = await client.post(URL, headers=HEADERS, content=json.dumps(PAYLOAD))
        latency = time.perf_counter() - t0
        r.raise_for_status()
        
        latencies.append(latency)
        response_json = r.json()

        if "usage" in response_json and isinstance(response_json["usage"], dict):
            prompt_tokens_list.append(response_json["usage"].get("prompt_tokens", 0))
            completion_tokens_list.append(response_json["usage"].get("completion_tokens", 0))
        else:
            prompt_tokens_list.append(0)
            completion_tokens_list.append(0)

        if concurrency_level in [1, 5]:
            if "choices" in response_json and \
               isinstance(response_json["choices"], list) and \
               len(response_json["choices"]) > 0 and \
               "message" in response_json["choices"][0] and \
               isinstance(response_json["choices"][0]["message"], dict) and \
               "content" in response_json["choices"][0]["message"]:
                print(f"\n--- LLM Output (Concurrency: {concurrency_level}, Request Latency: {latency:.3f}s) ---")
                print(response_json["choices"][0]["message"]["content"])
                print("---------------------------------------------------\n")
            else:
                print(f"\n[Warning] Could not find LLM message content in response for concurrency {concurrency_level}.")
                # print(f"Full response for debugging: {response_json}") # デバッグ用に全レスポンス表示も可能

    except httpx.HTTPStatusError as e:
        print(f"HTTP error: {e.response.status_code} - {e.response.text} - URL: {e.request.url}")
    except httpx.RequestError as e:
        print(f"Request error: {e} - URL: {e.request.url}")
    except json.JSONDecodeError:
        status_code = r.status_code if 'r' in locals() and hasattr(r, 'status_code') else 'N/A'
        response_text = r.text if 'r' in locals() and hasattr(r, 'text') else 'N/A'
        print(f"JSON decode error. Status: {status_code}, Response: {response_text}")
    except Exception as e:
        print(f"Unexpected error in one_request: {type(e).__name__} - {e}")

async def run_test_for_concurrency(concurrency_level: int):
    latencies = []
    prompt_tokens_in_this_run = []
    completion_tokens_in_this_run = []

    print(f"\n--- Running test for CONCURRENCY = {concurrency_level} ---")
    timeout_config = httpx.Timeout(10.0, read=300.0, write=300.0, pool=None)
    async with httpx.AsyncClient(timeout=timeout_config) as client:
        tasks = [one_request(client, latencies, prompt_tokens_in_this_run, completion_tokens_in_this_run, concurrency_level) for _ in range(concurrency_level)]
        
        test_start_time = time.perf_counter()
        await asyncio.gather(*tasks, return_exceptions=True)
        total_wall_time = time.perf_counter() - test_start_time

    successful_requests = len(latencies)
    avg_latency = 0
    p95_latency = 0
    
    total_prompt_tokens = sum(prompt_tokens_in_this_run)
    total_completion_tokens = sum(completion_tokens_in_this_run)
    
    prompt_tps = 0.0
    completion_tps = 0.0

    if successful_requests > 0:
        latencies.sort()
        avg_latency = statistics.mean(latencies)
        # P95の計算: 要素数が0や1の場合でもエラーにならないように調整
        if len(latencies) == 1:
            p95_latency = latencies[0]
        elif len(latencies) > 1:
            p95_index = min(int(len(latencies) * 0.95), len(latencies) - 1)
            if p95_index < 0: p95_index = 0 # 念のため
            p95_latency = latencies[p95_index]
        # else (len(latencies) == 0) の場合は p95_latency は 0 のまま
    else:
        print("No successful requests completed for this concurrency level.")

    if total_wall_time > 0:
        prompt_tps = total_prompt_tokens / total_wall_time
        completion_tps = total_completion_tokens / total_wall_time
    else:
        prompt_tps = 0.0 if total_prompt_tokens == 0 else float('inf')
        completion_tps = 0.0 if total_completion_tokens == 0 else float('inf')


    print(f"Results for CONCURRENCY = {concurrency_level}:") # このprintはLLM出力より後になる場合がある
    print(f"  Total wall time    : {total_wall_time:.3f}s")
    print(f"  Successful reqs    : {successful_requests}/{concurrency_level}")
    if successful_requests > 0:
        print(f"  Avg latency        : {avg_latency:.3f}s")
        print(f"  P95 latency        : {p95_latency:.3f}s")
    print(f"  Total prompt tokens: {total_prompt_tokens}")
    print(f"  Total compl tokens : {total_completion_tokens}")
    print(f"  Prompt TPS         : {prompt_tps:.2f}")
    print(f"  Completion TPS     : {completion_tps:.2f}")
    
    return {
        "concurrency": concurrency_level,
        "total_wall_time": total_wall_time,
        "avg_latency": avg_latency if successful_requests > 0 else None,
        "p95_latency": p95_latency if successful_requests > 0 else None,
        "total_prompt_tokens": total_prompt_tokens,
        "total_completion_tokens": total_completion_tokens,
        "prompt_tps": prompt_tps,
        "completion_tps": completion_tps,
        "successful_requests": successful_requests,
        "attempted_requests": concurrency_level,
    }

async def main():
    all_results = []
    start_all_tests_time = time.perf_counter()

    for concurrency in CONCURRENCY_LEVELS:
        result = await run_test_for_concurrency(concurrency)
        all_results.append(result)
        if concurrency < CONCURRENCY_LEVELS[-1]:
            await asyncio.sleep(2) # 次のテストの前に少し待つ

    end_all_tests_time = time.perf_counter()
    print(f"\nTotal time for all benchmark runs: {end_all_tests_time - start_all_tests_time:.2f}s")

    print("\n\n--- Summary of All Runs ---")
    header = "| Concurrency | Wall Time (s) | Avg Latency (s) | P95 Latency (s) | Prompt Tokens | Compl Tokens | Prompt TPS | Compl TPS | Reqs (Succ/Att) |"
    print(header)
    print("|" + "-" * (len(header) - 2) + "|")

    for res in all_results:
        avg_lat_str = f"{res['avg_latency']:.3f}" if res['avg_latency'] is not None else "N/A"
        p95_lat_str = f"{res['p95_latency']:.3f}" if res['p95_latency'] is not None else "N/A"
        prompt_tokens_str = str(res.get('total_prompt_tokens', 0))
        compl_tokens_str = str(res.get('total_completion_tokens', 0))
        prompt_tps_str = f"{res.get('prompt_tps', 0.0):.2f}"
        compl_tps_str = f"{res.get('completion_tps', 0.0):.2f}"
        req_status_str = f"{res['successful_requests']}/{res['attempted_requests']}"
        
        print(f"| {res['concurrency']:<11} | {res['total_wall_time']:<13.3f} | {avg_lat_str:<15} | {p95_lat_str:<15} | {prompt_tokens_str:<13} | {compl_tokens_str:<12} | {prompt_tps_str:<10} | {compl_tps_str:<10} | {req_status_str:<15} |")

if __name__ == "__main__":
    asyncio.run(main())

実行結果

  • LLMからの出力例
    4b-4bitと約5GBの小さなモデルですが悪くないです。
    ## QA:
        - "質問1": 井上尚弥選手は、アメリカのラモン・カルデナス選手に8ラウンド、テクニカルノックアウトで勝利し、4団体統一チャンピオンとして4回目の防衛に成功しました。
        - "質問2": 井上選手の次の防衛戦は、2025年9月にWBA=世界ボクシング協会の暫定チャンピオンで、ウズベキスタンのムロジョン・アフマダリエフ選手との対戦が予定されています。
        - "質問3": 井上選手は試合後、カルデナス選手が非常にタフで、映像で見たよりもさらに強い選手であったとコメントしました。また、ダウンの後は落ち着いてポイントをピックアップすることを考え、この試合がキャリアにおける貴重な経験になったと述べています。
    
    ## 事実:
        - "事実1": 井上尚弥選手は、32歳のプロボクシング選手であり、プロ29戦全勝という記録を持っています。
        - "事実2": 井上選手は、2025年5月5日にアメリカ・ラスベガスのTーモバイルアリーナで、ラモン・カルデナス選手との試合を行い、テクニカルノックアウトで勝利しました。
        - "事実3": ラスベガスは、多くのビッグマッチが行われてきたボクシングの聖地であり、2015年にはフロイド・メイウェザー氏とマニー・パッキャオ氏の試合が開催されました。
    
  • 処理速度
    Concurrencyはテストツールの多重度です。(実は処理毎にバラツキが結構ありますが)1500 token 程度の記事から情報を抽出する処理を20回実行するのに50秒程度(600 TPS程度)と悪くないと思います。
    --- Summary of All Runs ---
    | Concurrency | Wall Time (s) | Avg Latency (s) | P95 Latency (s) | Prompt Tokens | Compl Tokens | Prompt TPS | Compl TPS | Reqs (Succ/Att) |
    |-------------------------------------------------------------------------------------------------------------------------------------------|
    | 1           | 4.007         | 4.007           | 4.007           | 1546          | 332          | 385.84     | 82.86      | 1/1             |
    | 5           | 15.777        | 9.481           | 15.776          | 7730          | 1663         | 489.96     | 105.41     | 5/5             |
    | 10          | 26.522        | 14.906          | 26.520          | 15460         | 3250         | 582.92     | 122.54     | 10/10           |
    | 15          | 44.367        | 22.487          | 44.367          | 23190         | 4935         | 522.69     | 111.23     | 15/15           |
    | 20          | 49.400        | 25.601          | 49.399          | 30920         | 6421         | 625.91     | 129.98     | 20/20           |
    

🔚 まとめ

ポイント 解説
必ずしも大規模LLMは必要でない 単語抽出などの“軽処理”は小モデルで十分
コストゼロでもナレッジ抽出可能 (LLMの)API利用料を気にせず回せる
FastAPI + MLXは高速 常駐&並列処理でスループット改善(余裕があればこのもう少し深掘ります)
パイプラインで自動化 キーワード抽出〜ナレッジ統合まで自動処理(ここも余裕があればもう少し深掘ります)

大事なのは、「適切なサイズのLLMを選んで、目的に合ったプロンプト設計と構成を組むこと」。
大モデル信仰を一度疑ってみると、個人開発にも十分な道が見えてきます。

1
2
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?