はじめに
LLMのSFT(教師あり微調整)やReasoningモデル(思考型LLM)の学習において、「合成データ(Synthetic Data)」の作成は非常に重要です。
質の高いデータを作るためには、単発のプロンプトを投げるだけでなく、以下のようなマルチステップの生成・評価パイプラインを組んでます
- Question生成: シードとなる元テキストから問題を作り出す
- Answer生成: 問題とシードから正確な回答を作る
- Thinking生成: 問題、シード、回答を用いて、正確な思考プロセスを構築する
- Refine: 回答をブラッシュアップする
- Eval: 生成されたペアが妥当かをLLMにジャッジさせる
しかし、これを「全データでステップ1」→「全データでステップ2」という同期バッチ処理で回すと、ある絶望に直面します。
こんな感じです。
「GPUの稼働率(MFU)が0.3前後から微動だにしない……遅すぎてデータ生成が終わらん……」
これはイメージ図ではありますが、生成時間がバラバラだとGPUがかなり遊んでしまいます。
そこで考えたのがコレ。
Qは問題生成、Aは回答生成、Tは思考過程生成をイメージしています。
この二つを比較するとこんなイメージ

この適当な図だと、1.5倍くらい早くなりそうな予感がしません???
本記事では、この5段階パイプラインを asyncio.Queue と Worker(職人)タスクのプールを活用した「非同期キューイング・パイプライン」へ大改造し、GPUスループットを10倍(MFU 0.3 → 3〜5)に爆上げした設計と実装のポイントを紹介します。
1. 概念の理解
そもそも非同期処理って難しくない?
ずばり苦手です。概念はわかってもPythonですごく書きにくい。いや他の言語はもっと知らんけど。
なので、ChatGPTに教えを請いましたよ。素晴らしい。
あっという間に理解できた気がします。
大事なところだけ、まずは基本概念を理解します。
以下は疑似コードです。実行可能な実装はGitHubの
create_qa_model_httpx_pipeline_pool.pyを参照してください。
import asyncio
from dataclasses import dataclass, field
NUM_STEPS = 5
worker_count = 5
@dataclass
class PipelineJob:
item_id: int
step: int
payload: Dict[str, Any]
previous_outputs: Dict[str, Any] = field(default_factory=dict)
async def run_one_step(job, worker_id):
"""
job.stepによって実行する処理を変える関数
長くなるので概念だけ
"""
output = dict(job.previous_outputs)
if job.step == 1:
# 例:なんか表示したほうがわかりやすいかなと思ったので適当に書く。
print(f"Worker {worker_id} processing {job.item_id} step 1")
# 問題生成
outputs["text_1"] = f"item{job.item_id}-step1"
elif job.step == 2:
# 回答生成
elif job.step == 3:
# 思考過程生成
elif job.step == 4:
# 回答リファイン
elif job.step == 5:
# データセット評価
else:
# 例外処理
return PipelineJob(
item_id=job.item_id,
step=job.step + 1, # ここで、ステップが更新される
payload=job.payload,
previous_outputs=outputs,
)
async def create_qa_item_pool(texts: List[str]) -> List[Dict[str, Any]]:
queue = asyncio.Queue(maxsize=worker_count * 2) # 読み込むキュー(シードデータ)の数を指定(メモリを抑えるため)
async def item_processer(worker_id):
while True:
job = await queue.get() # queueに保存したジョブを取り出して実行
# ジョブが無くなったら終了処理
if job is None:
queue.task_done()
return
try:
curr = job
# 同じワーカーがそのアイテムを最後まで処理する
while curr.step <= NUM_STEPS:
curr = await run_one_step(curr, worker_id)
# curr.step == NUM_STEPS + 1、outputs は最終結果を含む
results[curr.item_id] = curr.previous_outputs
except Exception as exc:
print(f"Worker {worker_id} failed item {job.item_id}: {exc}")
finally:
queue.task_done()
# item_processerを実行可能なワーカーを作る
workers = [asyncio.create_task(item_processer(i)) for i in range(worker_count)]
def build_initial(item_id: int) -> PipelineJob:
return PipelineJob(item_id=item_id, step=1, payload={"text": texts[item_id]})
# キューにシードデータを渡す。maxsizeを超えた分は読み込まれない
total = len(texts)
for i in range(total):
await queue.put(build_initial(i))
# 実行完了まで待つ
await queue.join()
# すべて完了したら、queueにNoneを加え、すべてのworkerを終了させる
for _ in workers:
await queue.put(None)
await asyncio.gather(*workers)
# return results in input order
return [results.get(i, {}) for i in range(total)]
if __name__ == "__main__":
# シードデータのイメージ
texts = [
"テキスト1",
"テキスト2",
"テキスト3",
"テキスト4",
"テキスト5",
"テキスト6",
"テキスト7",
"テキスト8",
"テキスト9",
"テキスト10",
"テキスト11",
"テキスト12",
"テキスト13",
"テキスト14",
"テキスト15"
]
# 実行
results = asyncio.run(create_qa_item_pool(texts))
print(json.dumps(results, ensure_ascii=False, indent=2))
つまり、
queue = asyncio.Queue(maxsize=worker_count * 2)
で、queueに読み込むジョブ(シードデータ)の数の上限を決め、
workers = [asyncio.create_task(item_processer(i)) for i in range(worker_count)]で、item_processerを実行可能なワーカーを用意
item_processerはワーカー内でqueueからジョブを取得すると同時にqueueに新しいジョブが読み込まれる機能を定義しておく
そこで、queue.put()によってデータを流し込むと
queueに指定した数だけジョブが読み込まれ、item_processer関数内でジョブを取り出すと(queue.get())、ジョブが減った分だけジョブが新たに読み込まれ、
おなじワーカーがすべてのステップを一気にrun_one_stepを使って実行する。保持しているjob.stepを+1することで次のステップに進む
ムズイ。はい。理解に2日間を有しました。ちーん
2. さて、作ってみよう
あ、GitHub見てもらった方が早いわ。w
Codexさんと一緒に作りました。ほんとCodexって便利っすね。
main_create_imabari_qa_httpx_pipeline_pool.pyがメインとなる関数で、
このパイプラインを入れたのはpipelines/create_qa_model_httpx_pipeline_pool.pyです。
この構造なら、推論エンジン(vLLMなど)に常に一定の負荷をかけ続けつつ、クライアント側のメモリも極めて省エネに保つことができます。
※最終的にはSemaphoreも併用していますが、基本概念はココに書いた通りです。ただ、どっちが良いんでしょうね?
3. vLLMの設定も触っていこう
まず、使っている機材の特徴をつかんでおこう
GB10、いわゆるThinkStation PGX(まぁいわゆるDGX Spark。2重のいわゆる・・・💦)
GPUメモリは128GBで巨大だけど、メモリ帯域は小さい(273 GB/s)。
まず、対策前にMFU(Model Flops Utilization)を見られるようにしましょう。
ま、どのくらいに計算りそーせすを使っているかって指標だと思います。
vllm serve <model/path/> \
--served-model-name Qwen3.5-35B-A3B-GPTQ-Int4 \
--host 0.0.0.0 --port 8000 \
--gpu-memory-utilization 0.85 \
--max-num-seqs 128 \
--max-num-batched-tokens 8192 \
--enable-chunked-prefill \
--reasoning-parser qwen3 \
--enable-prefix-caching \
--async-scheduling \
--enable-mfu-metrics
こんな感じで--enable-mfu-metricsを付け加えるとサーバーに表示されます。
4. 圧倒的成果:GPU稼働率が10倍、MFUが ボーン と跳ねた!
この仕組みに切り替えた結果、データ生成の効率は劇的に変わりました。
| 指標 | 従来のバッチ同期 | Queue + Workerプール |
|---|---|---|
| GPUの遊び時間 | 非常に多い(最遅データの待ち発生) | ほぼゼロ(常にリクエストが補給される) |
| GPU稼働効率 (MFU) | 0.3 程度 | 3 ~ 5 (10倍以上のスループット) |
ちなみにプレフィルの時は35とか出てました。
6/10追記:96ワーカーにしたらMFUが5~9となり、5を切ることがなくなってきました!まだまだ工夫の仕様がありますね。
これまでは
一日に80くらいのシードデータが処理されていましたが、3時間くらいで時間で100以上のシードデータが処理されるようになりました。体感的にはやはり10倍くらいの処理速度に思えます。きっと、こんな感じだったのでしょう。(例として問題生成のみで表現してます)
CPTデータを作るのに約一か月かかりましたが、もうちょっと良く考えればよかったですね。💦
といっても、このCPTデータ作成がきっかけとなり効率的なデータ生成を真面目に考えたわけですから、自分にとって必要な過程だったのでしょう。新パイプラインでは開始直後からJSONLファイルに成果データが滝のように書き込まれていくようになりました。
まとめ
LLMを複数回呼び出す複雑なデータ作成パイプラインを組む場合、「Queueで流量を制御し、固定数のWorkerを回す」のが最適解だと思います。あ、異論は認めます。ぜひ、他のアイデアはコメントくれると嬉しいです。
- クライアント側のメモリを保護する(
maxsize) - サーバー側(LLMエンジン)に常に一定の実行数のリクエストを送り続ける(
num_workers) - 特定のデータ遅延に全体を巻き込ませない
「合成データを作りたいけれど処理が遅すぎる」と悩んでいる方は、ぜひ asyncio.Queue を使ったWorkerプール構築を試してみてください!
でわっ!


