C++スレッドプール自作シリーズ
| Part1 std::thread | Part2 ワークスティーリング | Part3 Future/Promise | Part4 ベンチマーク |
|---|---|---|---|
| 👈 Now | - | - | - |
はじめに
「for文で100回ループするより、100スレッドで並列に回したほうが速いでしょ?」
...そう思ってた時期が私にもありました。
実際にやってみると、スレッド生成のオーバーヘッドでむしろ遅くなる。
そこで登場するのがスレッドプール。あらかじめスレッドを作っておいて使い回す仕組み。
このシリーズでは、C++でスレッドプールをゼロから自作しながら、並列処理の深淵を覗いていくよ。
なぜスレッドプールが必要なの?
スレッド生成のコスト
#include <thread>
#include <chrono>
#include <iostream>
int main() {
auto start = std::chrono::high_resolution_clock::now();
for (int i = 0; i < 1000; ++i) {
std::thread t([]{ /* 何もしない */ });
t.join();
}
auto end = std::chrono::high_resolution_clock::now();
auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(end - start);
std::cout << "1000スレッド生成・破棄: " << duration.count() << "ms\n";
return 0;
}
1000スレッド生成・破棄: 847ms
何もしないスレッドを1000個作って捨てるだけで847ms。
OSがスレッドを作るとき、こんなことが起きてる:
- カーネルにシステムコール発行
- スタック領域の確保(デフォルト1-8MB)
- Thread Control Block (TCB) の初期化
- スケジューラへの登録
これを毎回やるのは無駄すぎる。
スレッドプールの発想
【スレッド都度生成】
Task1 → Thread生成 → 実行 → Thread破棄
Task2 → Thread生成 → 実行 → Thread破棄
Task3 → Thread生成 → 実行 → Thread破棄
...
【スレッドプール】
┌─────────────────────────────────────────────┐
│ Thread1 ──┬── Task1 → Task4 → Task7 → ... │
│ Thread2 ──┼── Task2 → Task5 → Task8 → ... │
│ Thread3 ──┼── Task3 → Task6 → Task9 → ... │
│ Thread4 ──┘ │
└─────────────────────────────────────────────┘
↑
タスクキューから取得して処理
スレッドを使い回すことで、生成・破棄のオーバーヘッドを削減できる。
std::threadの基本
まずはC++標準のスレッドAPIを理解しよう。
基本的な使い方
#include <thread>
#include <iostream>
void hello() {
std::cout << "Hello from thread!\n";
}
int main() {
std::thread t(hello); // スレッド生成&開始
t.join(); // スレッドの終了を待つ
return 0;
}
ラムダ式で書く
#include <thread>
#include <iostream>
int main() {
int value = 42;
std::thread t([&value] {
std::cout << "Value: " << value << "\n";
value = 100;
});
t.join();
std::cout << "After thread: " << value << "\n";
return 0;
}
Value: 42
After thread: 100
joinとdetach
| メソッド | 動作 | 使いどころ |
|---|---|---|
join() |
スレッド終了まで待機 | 結果を受け取りたい時 |
detach() |
スレッドを切り離す | Fire-and-forget |
注意: join()もdetach()もしないままスレッドオブジェクトが破棄されるとプログラムが異常終了する。
// ❌ NG: terminateが呼ばれる
{
std::thread t([]{ /* ... */ });
// join()もdetach()もせずにスコープを抜ける → 死
}
// ⭕ OK: ちゃんとjoin
{
std::thread t([]{ /* ... */ });
t.join();
}
スレッドIDの取得
#include <thread>
#include <iostream>
int main() {
std::cout << "Main thread ID: " << std::this_thread::get_id() << "\n";
std::thread t([] {
std::cout << "Worker thread ID: " << std::this_thread::get_id() << "\n";
});
t.join();
return 0;
}
Main thread ID: 140234567890432
Worker thread ID: 140234567886080
ハードウェア並列度
#include <thread>
#include <iostream>
int main() {
unsigned int n = std::thread::hardware_concurrency();
std::cout << "論理CPUコア数: " << n << "\n";
return 0;
}
論理CPUコア数: 16
この値をスレッドプールのデフォルトスレッド数として使うことが多い。
データ競合との戦い
マルチスレッドプログラミングで一番怖いのがデータ競合(Race Condition)。
データ競合の例
#include <thread>
#include <iostream>
int counter = 0;
void increment() {
for (int i = 0; i < 100000; ++i) {
++counter; // ここが危険
}
}
int main() {
std::thread t1(increment);
std::thread t2(increment);
t1.join();
t2.join();
std::cout << "Counter: " << counter << "\n";
// 期待: 200000
return 0;
}
Counter: 156847 ← 毎回違う値になる
++counterは内部的に:
- メモリからcounterを読む
- 1を足す
- メモリに書き戻す
この3ステップの途中で別スレッドが割り込むと、更新が上書きされて消える。
mutexで保護する
#include <thread>
#include <mutex>
#include <iostream>
int counter = 0;
std::mutex mtx;
void increment() {
for (int i = 0; i < 100000; ++i) {
std::lock_guard<std::mutex> lock(mtx); // RAIIでロック
++counter;
}
}
int main() {
std::thread t1(increment);
std::thread t2(increment);
t1.join();
t2.join();
std::cout << "Counter: " << counter << "\n";
return 0;
}
Counter: 200000 ← 正しい値
atomic変数を使う
単純なインクリメントならstd::atomicのほうが速い。
#include <thread>
#include <atomic>
#include <iostream>
std::atomic<int> counter{0};
void increment() {
for (int i = 0; i < 100000; ++i) {
++counter; // アトミックなインクリメント
}
}
int main() {
std::thread t1(increment);
std::thread t2(increment);
t1.join();
t2.join();
std::cout << "Counter: " << counter << "\n";
return 0;
}
Counter: 200000
mutex vs atomic
| 種類 | 用途 | 速度 |
|---|---|---|
std::mutex |
複数の操作をまとめて保護 | 遅い |
std::atomic |
単一の変数への操作 | 速い |
最小限のスレッドプール
ここまでの知識で、超シンプルなスレッドプールを作ってみよう。
#include <thread>
#include <mutex>
#include <condition_variable>
#include <queue>
#include <functional>
#include <vector>
#include <iostream>
class ThreadPool {
public:
ThreadPool(size_t num_threads) : stop_(false) {
for (size_t i = 0; i < num_threads; ++i) {
workers_.emplace_back([this] {
while (true) {
std::function<void()> task;
{
std::unique_lock<std::mutex> lock(queue_mutex_);
// タスクが来るか、停止フラグが立つまで待機
condition_.wait(lock, [this] {
return stop_ || !tasks_.empty();
});
// 停止フラグが立っていて、タスクがなければ終了
if (stop_ && tasks_.empty()) {
return;
}
// タスクを取り出す
task = std::move(tasks_.front());
tasks_.pop();
}
// タスクを実行
task();
}
});
}
}
~ThreadPool() {
{
std::unique_lock<std::mutex> lock(queue_mutex_);
stop_ = true;
}
condition_.notify_all();
for (std::thread& worker : workers_) {
worker.join();
}
}
void enqueue(std::function<void()> task) {
{
std::unique_lock<std::mutex> lock(queue_mutex_);
tasks_.push(std::move(task));
}
condition_.notify_one();
}
private:
std::vector<std::thread> workers_;
std::queue<std::function<void()>> tasks_;
std::mutex queue_mutex_;
std::condition_variable condition_;
bool stop_;
};
使ってみる
int main() {
ThreadPool pool(4);
for (int i = 0; i < 10; ++i) {
pool.enqueue([i] {
std::cout << "Task " << i << " running on thread "
<< std::this_thread::get_id() << "\n";
});
}
std::this_thread::sleep_for(std::chrono::seconds(1));
return 0;
}
Task 0 running on thread 140234567886080
Task 1 running on thread 140234567881728
Task 2 running on thread 140234567877376
Task 3 running on thread 140234567873024
Task 4 running on thread 140234567886080
Task 5 running on thread 140234567881728
...
同じスレッドIDが再利用されてるのがわかる。これがスレッドプールの効果。
condition_variableの解説
上のコードでstd::condition_variableを使ったけど、これが分かりにくいので補足。
condition_.wait(lock, [this] {
return stop_ || !tasks_.empty();
});
これは「stop_がtrueになるか、tasks_が空でなくなるまで待つ」という意味。
┌────────────────────────────────────────────────────────┐
│ Worker Thread │
│ │
│ ┌──────────────────┐ │
│ │ キューは空? │ │
│ └────────┬─────────┘ │
│ │ │
│ Yes │ No │
│ ▼ │
│ ┌──────────────────┐ ┌──────────────────┐ │
│ │ wait()で眠る │ │ タスクを取り出す │ │
│ │ (CPUを消費しない) │ │ │ │
│ └────────┬─────────┘ └────────┬─────────┘ │
│ │ │ │
│ │ notify_one()で起床 │ │
│ ▼ ▼ │
│ ループの先頭へ タスク実行 │
└────────────────────────────────────────────────────────┘
notify_one()は待ってるスレッドを1つだけ起こす。notify_all()は全部起こす。
まとめ
今回学んだこと:
| トピック | ポイント |
|---|---|
| スレッド生成コスト | 1000回で約1秒かかることも |
| std::thread | join/detachを忘れると死ぬ |
| データ競合 | mutexかatomicで保護する |
| スレッドプール | スレッドを使い回して効率化 |
| condition_variable | 効率的にタスクを待つ |
次回はワークスティーリングキューを実装して、より高性能なスレッドプールを作るよ。
この記事が役に立ったら、いいね・ストックしてもらえると嬉しいです!