8
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

C++スレッドプール自作シリーズ

Part1 std::thread Part2 ワークスティーリング Part3 Future/Promise Part4 ベンチマーク
👈 Now - - -

はじめに

「for文で100回ループするより、100スレッドで並列に回したほうが速いでしょ?」

...そう思ってた時期が私にもありました。

実際にやってみると、スレッド生成のオーバーヘッドでむしろ遅くなる。

そこで登場するのがスレッドプール。あらかじめスレッドを作っておいて使い回す仕組み。

このシリーズでは、C++でスレッドプールをゼロから自作しながら、並列処理の深淵を覗いていくよ。

なぜスレッドプールが必要なの?

スレッド生成のコスト

#include <thread>
#include <chrono>
#include <iostream>

int main() {
    auto start = std::chrono::high_resolution_clock::now();
    
    for (int i = 0; i < 1000; ++i) {
        std::thread t([]{ /* 何もしない */ });
        t.join();
    }
    
    auto end = std::chrono::high_resolution_clock::now();
    auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(end - start);
    
    std::cout << "1000スレッド生成・破棄: " << duration.count() << "ms\n";
    return 0;
}
1000スレッド生成・破棄: 847ms

何もしないスレッドを1000個作って捨てるだけで847ms

OSがスレッドを作るとき、こんなことが起きてる:

  1. カーネルにシステムコール発行
  2. スタック領域の確保(デフォルト1-8MB)
  3. Thread Control Block (TCB) の初期化
  4. スケジューラへの登録

これを毎回やるのは無駄すぎる。

スレッドプールの発想

【スレッド都度生成】
Task1 → Thread生成 → 実行 → Thread破棄
Task2 → Thread生成 → 実行 → Thread破棄
Task3 → Thread生成 → 実行 → Thread破棄
...

【スレッドプール】
┌─────────────────────────────────────────────┐
│ Thread1 ──┬── Task1 → Task4 → Task7 → ...  │
│ Thread2 ──┼── Task2 → Task5 → Task8 → ...  │
│ Thread3 ──┼── Task3 → Task6 → Task9 → ...  │
│ Thread4 ──┘                                 │
└─────────────────────────────────────────────┘
             ↑
         タスクキューから取得して処理

スレッドを使い回すことで、生成・破棄のオーバーヘッドを削減できる。

std::threadの基本

まずはC++標準のスレッドAPIを理解しよう。

基本的な使い方

#include <thread>
#include <iostream>

void hello() {
    std::cout << "Hello from thread!\n";
}

int main() {
    std::thread t(hello);  // スレッド生成&開始
    t.join();              // スレッドの終了を待つ
    return 0;
}

ラムダ式で書く

#include <thread>
#include <iostream>

int main() {
    int value = 42;
    
    std::thread t([&value] {
        std::cout << "Value: " << value << "\n";
        value = 100;
    });
    
    t.join();
    std::cout << "After thread: " << value << "\n";
    return 0;
}
Value: 42
After thread: 100

joinとdetach

メソッド 動作 使いどころ
join() スレッド終了まで待機 結果を受け取りたい時
detach() スレッドを切り離す Fire-and-forget

注意: join()detach()もしないままスレッドオブジェクトが破棄されるとプログラムが異常終了する。

// ❌ NG: terminateが呼ばれる
{
    std::thread t([]{ /* ... */ });
    // join()もdetach()もせずにスコープを抜ける → 死
}

// ⭕ OK: ちゃんとjoin
{
    std::thread t([]{ /* ... */ });
    t.join();
}

スレッドIDの取得

#include <thread>
#include <iostream>

int main() {
    std::cout << "Main thread ID: " << std::this_thread::get_id() << "\n";
    
    std::thread t([] {
        std::cout << "Worker thread ID: " << std::this_thread::get_id() << "\n";
    });
    
    t.join();
    return 0;
}
Main thread ID: 140234567890432
Worker thread ID: 140234567886080

ハードウェア並列度

#include <thread>
#include <iostream>

int main() {
    unsigned int n = std::thread::hardware_concurrency();
    std::cout << "論理CPUコア数: " << n << "\n";
    return 0;
}
論理CPUコア数: 16

この値をスレッドプールのデフォルトスレッド数として使うことが多い。

データ競合との戦い

マルチスレッドプログラミングで一番怖いのがデータ競合(Race Condition)

データ競合の例

#include <thread>
#include <iostream>

int counter = 0;

void increment() {
    for (int i = 0; i < 100000; ++i) {
        ++counter;  // ここが危険
    }
}

int main() {
    std::thread t1(increment);
    std::thread t2(increment);
    
    t1.join();
    t2.join();
    
    std::cout << "Counter: " << counter << "\n";
    // 期待: 200000
    return 0;
}
Counter: 156847  ← 毎回違う値になる

++counterは内部的に:

  1. メモリからcounterを読む
  2. 1を足す
  3. メモリに書き戻す

この3ステップの途中で別スレッドが割り込むと、更新が上書きされて消える。

mutexで保護する

#include <thread>
#include <mutex>
#include <iostream>

int counter = 0;
std::mutex mtx;

void increment() {
    for (int i = 0; i < 100000; ++i) {
        std::lock_guard<std::mutex> lock(mtx);  // RAIIでロック
        ++counter;
    }
}

int main() {
    std::thread t1(increment);
    std::thread t2(increment);
    
    t1.join();
    t2.join();
    
    std::cout << "Counter: " << counter << "\n";
    return 0;
}
Counter: 200000  ← 正しい値

atomic変数を使う

単純なインクリメントならstd::atomicのほうが速い。

#include <thread>
#include <atomic>
#include <iostream>

std::atomic<int> counter{0};

void increment() {
    for (int i = 0; i < 100000; ++i) {
        ++counter;  // アトミックなインクリメント
    }
}

int main() {
    std::thread t1(increment);
    std::thread t2(increment);
    
    t1.join();
    t2.join();
    
    std::cout << "Counter: " << counter << "\n";
    return 0;
}
Counter: 200000

mutex vs atomic

種類 用途 速度
std::mutex 複数の操作をまとめて保護 遅い
std::atomic 単一の変数への操作 速い

最小限のスレッドプール

ここまでの知識で、超シンプルなスレッドプールを作ってみよう。

#include <thread>
#include <mutex>
#include <condition_variable>
#include <queue>
#include <functional>
#include <vector>
#include <iostream>

class ThreadPool {
public:
    ThreadPool(size_t num_threads) : stop_(false) {
        for (size_t i = 0; i < num_threads; ++i) {
            workers_.emplace_back([this] {
                while (true) {
                    std::function<void()> task;
                    
                    {
                        std::unique_lock<std::mutex> lock(queue_mutex_);
                        
                        // タスクが来るか、停止フラグが立つまで待機
                        condition_.wait(lock, [this] {
                            return stop_ || !tasks_.empty();
                        });
                        
                        // 停止フラグが立っていて、タスクがなければ終了
                        if (stop_ && tasks_.empty()) {
                            return;
                        }
                        
                        // タスクを取り出す
                        task = std::move(tasks_.front());
                        tasks_.pop();
                    }
                    
                    // タスクを実行
                    task();
                }
            });
        }
    }
    
    ~ThreadPool() {
        {
            std::unique_lock<std::mutex> lock(queue_mutex_);
            stop_ = true;
        }
        
        condition_.notify_all();
        
        for (std::thread& worker : workers_) {
            worker.join();
        }
    }
    
    void enqueue(std::function<void()> task) {
        {
            std::unique_lock<std::mutex> lock(queue_mutex_);
            tasks_.push(std::move(task));
        }
        condition_.notify_one();
    }

private:
    std::vector<std::thread> workers_;
    std::queue<std::function<void()>> tasks_;
    
    std::mutex queue_mutex_;
    std::condition_variable condition_;
    bool stop_;
};

使ってみる

int main() {
    ThreadPool pool(4);
    
    for (int i = 0; i < 10; ++i) {
        pool.enqueue([i] {
            std::cout << "Task " << i << " running on thread " 
                      << std::this_thread::get_id() << "\n";
        });
    }
    
    std::this_thread::sleep_for(std::chrono::seconds(1));
    return 0;
}
Task 0 running on thread 140234567886080
Task 1 running on thread 140234567881728
Task 2 running on thread 140234567877376
Task 3 running on thread 140234567873024
Task 4 running on thread 140234567886080
Task 5 running on thread 140234567881728
...

同じスレッドIDが再利用されてるのがわかる。これがスレッドプールの効果。

condition_variableの解説

上のコードでstd::condition_variableを使ったけど、これが分かりにくいので補足。

condition_.wait(lock, [this] {
    return stop_ || !tasks_.empty();
});

これは「stop_がtrueになるか、tasks_が空でなくなるまで待つ」という意味。

┌────────────────────────────────────────────────────────┐
│ Worker Thread                                          │
│                                                        │
│   ┌──────────────────┐                                │
│   │ キューは空?      │                                │
│   └────────┬─────────┘                                │
│            │                                           │
│       Yes  │  No                                       │
│            ▼                                           │
│   ┌──────────────────┐    ┌──────────────────┐       │
│   │ wait()で眠る     │    │ タスクを取り出す │       │
│   │ (CPUを消費しない) │    │                  │       │
│   └────────┬─────────┘    └────────┬─────────┘       │
│            │                        │                  │
│            │ notify_one()で起床     │                  │
│            ▼                        ▼                  │
│         ループの先頭へ           タスク実行           │
└────────────────────────────────────────────────────────┘

notify_one()待ってるスレッドを1つだけ起こすnotify_all()は全部起こす。

まとめ

今回学んだこと:

トピック ポイント
スレッド生成コスト 1000回で約1秒かかることも
std::thread join/detachを忘れると死ぬ
データ競合 mutexかatomicで保護する
スレッドプール スレッドを使い回して効率化
condition_variable 効率的にタスクを待つ

次回はワークスティーリングキューを実装して、より高性能なスレッドプールを作るよ。

この記事が役に立ったら、いいね・ストックしてもらえると嬉しいです!

8
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
8
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?