3
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

LLM合成データ生成を10倍高速化:asyncio.QueueとWorkerプールで作る5段階ストリーミングパイプライン

3
Last updated at Posted at 2026-06-09

はじめに

LLMのSFT(教師あり微調整)やReasoningモデル(思考型LLM)の学習において、「合成データ(Synthetic Data)」の作成は非常に重要です。

質の高いデータを作るためには、単発のプロンプトを投げるだけでなく、以下のようなマルチステップの生成・評価パイプラインを組んでます

  1. Question生成: シードとなる元テキストから問題を作り出す
  2. Answer生成: 問題とシードから正確な回答を作る
  3. Thinking生成: 問題、シード、回答を用いて、正確な思考プロセスを構築する
  4. Refine: 回答をブラッシュアップする
  5. Eval: 生成されたペアが妥当かをLLMにジャッジさせる

しかし、これを「全データでステップ1」→「全データでステップ2」という同期バッチ処理で回すと、ある絶望に直面します。
こんな感じです。

バッチ処理.png

「GPUの稼働率(MFU)が0.3前後から微動だにしない……遅すぎてデータ生成が終わらん……」

これはイメージ図ではありますが、生成時間がバラバラだとGPUがかなり遊んでしまいます。
そこで考えたのがコレ。

逐次処理.png

Qは問題生成、Aは回答生成、Tは思考過程生成をイメージしています。

この二つを比較するとこんなイメージ
比較.png
この適当な図だと、1.5倍くらい早くなりそうな予感がしません???

本記事では、この5段階パイプラインを asyncio.Queue と Worker(職人)タスクのプールを活用した「非同期キューイング・パイプライン」へ大改造し、GPUスループットを10倍(MFU 0.3 → 3〜5)に爆上げした設計と実装のポイントを紹介します。


1. 概念の理解

そもそも非同期処理って難しくない?
ずばり苦手です。概念はわかってもPythonですごく書きにくい。いや他の言語はもっと知らんけど。

なので、ChatGPTに教えを請いましたよ。素晴らしい。
あっという間に理解できた気がします。

大事なところだけ、まずは基本概念を理解します。

以下は疑似コードです。実行可能な実装はGitHubの
create_qa_model_httpx_pipeline_pool.py を参照してください。

import asyncio
from dataclasses import dataclass, field


NUM_STEPS = 5
worker_count = 5

@dataclass
class PipelineJob:
    item_id: int
    step: int
    payload: Dict[str, Any]
    previous_outputs: Dict[str, Any] = field(default_factory=dict)


async def run_one_step(job, worker_id):
    """
    job.stepによって実行する処理を変える関数
    長くなるので概念だけ
    """
    output = dict(job.previous_outputs)
    
    if job.step == 1:
        # 例:なんか表示したほうがわかりやすいかなと思ったので適当に書く。
        print(f"Worker {worker_id} processing {job.item_id} step 1") 
        # 問題生成
        outputs["text_1"] = f"item{job.item_id}-step1"
    elif job.step == 2:
        # 回答生成
    elif job.step == 3:
        # 思考過程生成
    elif job.step == 4:
        # 回答リファイン
    elif job.step == 5:
        # データセット評価
    else:
        # 例外処理
    return PipelineJob(
        item_id=job.item_id,
        step=job.step + 1, # ここで、ステップが更新される
        payload=job.payload,
        previous_outputs=outputs,
    )

async def create_qa_item_pool(texts: List[str]) -> List[Dict[str, Any]]:
    queue = asyncio.Queue(maxsize=worker_count * 2) # 読み込むキュー(シードデータ)の数を指定(メモリを抑えるため)

    async def item_processer(worker_id):
        while True:
            job = await queue.get() # queueに保存したジョブを取り出して実行
    
            # ジョブが無くなったら終了処理
            if job is None:
                queue.task_done()
                return
            try:
                curr = job
                # 同じワーカーがそのアイテムを最後まで処理する
                while curr.step <= NUM_STEPS:
                    curr = await run_one_step(curr, worker_id)
                # curr.step == NUM_STEPS + 1、outputs は最終結果を含む
                results[curr.item_id] = curr.previous_outputs
            except Exception as exc:
                print(f"Worker {worker_id} failed item {job.item_id}: {exc}")
            finally:
                queue.task_done()
            
    # item_processerを実行可能なワーカーを作る
    workers = [asyncio.create_task(item_processer(i)) for i in range(worker_count)]

    def build_initial(item_id: int) -> PipelineJob:
        return PipelineJob(item_id=item_id, step=1, payload={"text": texts[item_id]})

    # キューにシードデータを渡す。maxsizeを超えた分は読み込まれない
    total = len(texts)
    for i in range(total):
        await queue.put(build_initial(i))

    # 実行完了まで待つ
    await queue.join()

    # すべて完了したら、queueにNoneを加え、すべてのworkerを終了させる
    for _ in workers:
        await queue.put(None)
    await asyncio.gather(*workers)

    # return results in input order
    return [results.get(i, {}) for i in range(total)]

if __name__ == "__main__":
    # シードデータのイメージ
    texts = [
        "テキスト1",
        "テキスト2",
        "テキスト3",
        "テキスト4",
        "テキスト5",
        "テキスト6",
        "テキスト7",
        "テキスト8",
        "テキスト9",
        "テキスト10",
        "テキスト11",
        "テキスト12",
        "テキスト13",
        "テキスト14",
        "テキスト15"
    ]

    # 実行
    results = asyncio.run(create_qa_item_pool(texts))
    print(json.dumps(results, ensure_ascii=False, indent=2))

つまり、
queue = asyncio.Queue(maxsize=worker_count * 2)
で、queueに読み込むジョブ(シードデータ)の数の上限を決め、

workers = [asyncio.create_task(item_processer(i)) for i in range(worker_count)]で、item_processerを実行可能なワーカーを用意

item_processerはワーカー内でqueueからジョブを取得すると同時にqueueに新しいジョブが読み込まれる機能を定義しておく

そこで、queue.put()によってデータを流し込むと
queueに指定した数だけジョブが読み込まれ、item_processer関数内でジョブを取り出すと(queue.get())、ジョブが減った分だけジョブが新たに読み込まれ、

おなじワーカーがすべてのステップを一気にrun_one_stepを使って実行する。保持しているjob.stepを+1することで次のステップに進む

ムズイ。はい。理解に2日間を有しました。ちーん


2. さて、作ってみよう

あ、GitHub見てもらった方が早いわ。w
Codexさんと一緒に作りました。ほんとCodexって便利っすね。

main_create_imabari_qa_httpx_pipeline_pool.pyがメインとなる関数で、
このパイプラインを入れたのはpipelines/create_qa_model_httpx_pipeline_pool.pyです。

この構造なら、推論エンジン(vLLMなど)に常に一定の負荷をかけ続けつつ、クライアント側のメモリも極めて省エネに保つことができます。

※最終的にはSemaphoreも併用していますが、基本概念はココに書いた通りです。ただ、どっちが良いんでしょうね?


3. vLLMの設定も触っていこう

まず、使っている機材の特徴をつかんでおこう
GB10、いわゆるThinkStation PGX(まぁいわゆるDGX Spark。2重のいわゆる・・・💦)
GPUメモリは128GBで巨大だけど、メモリ帯域は小さい(273 GB/s)。

まず、対策前にMFU(Model Flops Utilization)を見られるようにしましょう。
ま、どのくらいに計算りそーせすを使っているかって指標だと思います。


vllm serve <model/path/> \
  --served-model-name Qwen3.5-35B-A3B-GPTQ-Int4 \
  --host 0.0.0.0 --port 8000 \
  --gpu-memory-utilization 0.85 \
  --max-num-seqs 128 \
  --max-num-batched-tokens 8192 \
  --enable-chunked-prefill \
  --reasoning-parser qwen3 \
  --enable-prefix-caching \
  --async-scheduling \
  --enable-mfu-metrics

こんな感じで--enable-mfu-metricsを付け加えるとサーバーに表示されます。


4. 圧倒的成果:GPU稼働率が10倍、MFUが ボーン と跳ねた!

この仕組みに切り替えた結果、データ生成の効率は劇的に変わりました。

指標 従来のバッチ同期 Queue + Workerプール
GPUの遊び時間 非常に多い(最遅データの待ち発生) ほぼゼロ(常にリクエストが補給される)
GPU稼働効率 (MFU) 0.3 程度 3 ~ 5 (10倍以上のスループット)

ちなみにプレフィルの時は35とか出てました。
6/10追記:96ワーカーにしたらMFUが5~9となり、5を切ることがなくなってきました!まだまだ工夫の仕様がありますね。

これまでは
一日に80くらいのシードデータが処理されていましたが、3時間くらいで時間で100以上のシードデータが処理されるようになりました。体感的にはやはり10倍くらいの処理速度に思えます。きっと、こんな感じだったのでしょう。(例として問題生成のみで表現してます)

ほんとは?.png

CPTデータを作るのに約一か月かかりましたが、もうちょっと良く考えればよかったですね。💦
といっても、このCPTデータ作成がきっかけとなり効率的なデータ生成を真面目に考えたわけですから、自分にとって必要な過程だったのでしょう。新パイプラインでは開始直後からJSONLファイルに成果データが滝のように書き込まれていくようになりました。


まとめ

LLMを複数回呼び出す複雑なデータ作成パイプラインを組む場合、「Queueで流量を制御し、固定数のWorkerを回す」のが最適解だと思います。あ、異論は認めます。ぜひ、他のアイデアはコメントくれると嬉しいです。

  • クライアント側のメモリを保護する(maxsize
  • サーバー側(LLMエンジン)に常に一定の実行数のリクエストを送り続ける(num_workers
  • 特定のデータ遅延に全体を巻き込ませない

「合成データを作りたいけれど処理が遅すぎる」と悩んでいる方は、ぜひ asyncio.Queue を使ったWorkerプール構築を試してみてください!

でわっ!

3
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
3
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?