8
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

C++スレッドプール自作シリーズ

Part1 std::thread Part2 ワークスティーリング Part3 Future/Promise Part4 ベンチマーク
👈 Now - -

はじめに

前回は基本的なスレッドプールを作った。

でもあれには致命的な問題がある。

┌────────────────────────────────────────────────────────┐
│ 全スレッドが1つのキューに群がる                        │
│                                                        │
│   Thread1 ─┐                                          │
│   Thread2 ─┼──→ [ 共有キュー ] ← mutex でロック       │
│   Thread3 ─┤                                          │
│   Thread4 ─┘                                          │
│                                                        │
│   → スレッドが増えるとロック競合が激化                │
└────────────────────────────────────────────────────────┘

これを解決するのがワークスティーリングという技術。

ワークスティーリングとは

各スレッドが自分専用のキューを持つ。暇なスレッドは他のスレッドのキューから仕事を盗む

┌──────────────────────────────────────────────────────────────┐
│ ワークスティーリング                                         │
│                                                              │
│   Thread1 ──→ [ キュー1: ■■■■■ ]                            │
│   Thread2 ──→ [ キュー2: ■ ]                                │
│   Thread3 ──→ [ キュー3: ■■■■■■■ ]                          │
│   Thread4 ──→ [ キュー4: (空) ] ──→ キュー3から盗む!       │
│                                                              │
│   → ロック競合が減り、負荷が自動的に分散される              │
└──────────────────────────────────────────────────────────────┘

なぜ効率的なのか

方式 ロック頻度 負荷分散
共有キュー 毎回ロック 自動(ただし競合)
ローカルキューのみ なし 手動で振り分け必要
ワークスティーリング 盗むときだけ 自動で最適化

Lock-Free キューの実装

ワークスティーリングを実現するには、ロックなしで動作するキューが欲しい。

Chase-Lev Deque

ワークスティーリングの定番データ構造。

  • 所有者スレッド: 片方の端からpush/pop(高速)
  • 泥棒スレッド: 反対側の端からsteal(たまにしか起きない)
┌────────────────────────────────────────────────────────────┐
│ Chase-Lev Deque                                            │
│                                                            │
│   Bottom (所有者が使う)        Top (泥棒が使う)            │
│        ↓                            ↓                      │
│   [ Task1 | Task2 | Task3 | Task4 | Task5 ]               │
│        ↑ push/pop                   ↑ steal                │
│        (ロック不要)                 (CASで競合解決)        │
└────────────────────────────────────────────────────────────┘

実装コード

#include <atomic>
#include <vector>
#include <optional>

template<typename T>
class WorkStealingQueue {
public:
    static constexpr size_t INITIAL_CAPACITY = 1024;
    
    WorkStealingQueue() 
        : buffer_(new CircularBuffer(INITIAL_CAPACITY))
        , top_(0)
        , bottom_(0) 
    {}
    
    ~WorkStealingQueue() {
        delete buffer_.load(std::memory_order_relaxed);
    }
    
    // 所有者スレッドがタスクを追加
    void push(T item) {
        int64_t b = bottom_.load(std::memory_order_relaxed);
        int64_t t = top_.load(std::memory_order_acquire);
        CircularBuffer* buf = buffer_.load(std::memory_order_relaxed);
        
        // バッファが満杯なら拡張
        if (b - t >= static_cast<int64_t>(buf->capacity()) - 1) {
            buf = resize(buf, b, t);
        }
        
        buf->set(b, std::move(item));
        std::atomic_thread_fence(std::memory_order_release);
        bottom_.store(b + 1, std::memory_order_relaxed);
    }
    
    // 所有者スレッドがタスクを取得
    std::optional<T> pop() {
        int64_t b = bottom_.load(std::memory_order_relaxed) - 1;
        CircularBuffer* buf = buffer_.load(std::memory_order_relaxed);
        bottom_.store(b, std::memory_order_relaxed);
        
        std::atomic_thread_fence(std::memory_order_seq_cst);
        
        int64_t t = top_.load(std::memory_order_relaxed);
        
        if (t <= b) {
            // キューは空でない
            T item = buf->get(b);
            
            if (t == b) {
                // 最後の1つ → 泥棒と競合する可能性
                if (!top_.compare_exchange_strong(
                        t, t + 1,
                        std::memory_order_seq_cst,
                        std::memory_order_relaxed)) {
                    // 泥棒に取られた
                    bottom_.store(t + 1, std::memory_order_relaxed);
                    return std::nullopt;
                }
                bottom_.store(t + 1, std::memory_order_relaxed);
            }
            return item;
        } else {
            // キューは空
            bottom_.store(t, std::memory_order_relaxed);
            return std::nullopt;
        }
    }
    
    // 泥棒スレッドがタスクを盗む
    std::optional<T> steal() {
        int64_t t = top_.load(std::memory_order_acquire);
        std::atomic_thread_fence(std::memory_order_seq_cst);
        int64_t b = bottom_.load(std::memory_order_acquire);
        
        if (t < b) {
            // キューは空でない
            CircularBuffer* buf = buffer_.load(std::memory_order_consume);
            T item = buf->get(t);
            
            if (!top_.compare_exchange_strong(
                    t, t + 1,
                    std::memory_order_seq_cst,
                    std::memory_order_relaxed)) {
                // 他の泥棒に取られた
                return std::nullopt;
            }
            return item;
        }
        return std::nullopt;
    }
    
    bool empty() const {
        int64_t b = bottom_.load(std::memory_order_relaxed);
        int64_t t = top_.load(std::memory_order_relaxed);
        return b <= t;
    }

private:
    struct CircularBuffer {
        std::vector<T> data;
        
        explicit CircularBuffer(size_t cap) : data(cap) {}
        
        size_t capacity() const { return data.size(); }
        
        T& get(int64_t i) { 
            return data[i % data.size()]; 
        }
        
        void set(int64_t i, T item) { 
            data[i % data.size()] = std::move(item); 
        }
    };
    
    CircularBuffer* resize(CircularBuffer* old_buf, int64_t b, int64_t t) {
        CircularBuffer* new_buf = new CircularBuffer(old_buf->capacity() * 2);
        
        for (int64_t i = t; i < b; ++i) {
            new_buf->set(i, old_buf->get(i));
        }
        
        buffer_.store(new_buf, std::memory_order_release);
        // Note: 古いバッファのメモリリークは、実際の実装ではEpochベースのGCで解決
        return new_buf;
    }
    
    std::atomic<CircularBuffer*> buffer_;
    std::atomic<int64_t> top_;
    std::atomic<int64_t> bottom_;
};

メモリオーダリングの解説

このコード、memory_orderだらけで意味不明だよね。整理しよう。

メモリオーダー 意味 使いどころ
relaxed 順序保証なし 単純なカウンター
acquire これ以降の読み書きが前に移動しない ロックの取得
release これ以前の読み書きが後に移動しない ロックの解放
seq_cst 全スレッドで一貫した順序 最も安全(最も遅い)
// push()の最後
std::atomic_thread_fence(std::memory_order_release);
bottom_.store(b + 1, std::memory_order_relaxed);

これは「データを書き込んでからbottomを更新する」順序を保証してる。逆だと泥棒が未初期化のデータを読んでしまう。

ワークスティーリング・スレッドプール

Chase-Lev Dequeを使って、本格的なスレッドプールを作ろう。

#include <thread>
#include <functional>
#include <random>
#include <iostream>

class WorkStealingThreadPool {
public:
    using Task = std::function<void()>;
    
    explicit WorkStealingThreadPool(size_t num_threads = 0) 
        : num_threads_(num_threads == 0 ? std::thread::hardware_concurrency() : num_threads)
        , queues_(num_threads_)
        , stop_(false)
    {
        // 各スレッドを起動
        for (size_t i = 0; i < num_threads_; ++i) {
            workers_.emplace_back([this, i] { worker_loop(i); });
        }
    }
    
    ~WorkStealingThreadPool() {
        stop_.store(true, std::memory_order_release);
        
        for (auto& worker : workers_) {
            worker.join();
        }
    }
    
    // タスクを投入(ラウンドロビンで分配)
    void submit(Task task) {
        static std::atomic<size_t> next_queue{0};
        size_t idx = next_queue.fetch_add(1, std::memory_order_relaxed) % num_threads_;
        queues_[idx].push(std::move(task));
    }
    
    // 特定のキューに投入
    void submit_to(size_t worker_id, Task task) {
        queues_[worker_id % num_threads_].push(std::move(task));
    }

private:
    void worker_loop(size_t my_id) {
        // 乱数生成器(盗む相手を選ぶ用)
        std::mt19937 rng(my_id);
        std::uniform_int_distribution<size_t> dist(0, num_threads_ - 1);
        
        while (!stop_.load(std::memory_order_acquire)) {
            Task task;
            
            // 1. まず自分のキューから取る
            if (auto t = queues_[my_id].pop()) {
                task = std::move(*t);
            }
            // 2. なければ他のキューから盗む
            else {
                bool found = false;
                for (size_t attempt = 0; attempt < num_threads_ * 2; ++attempt) {
                    size_t victim = dist(rng);
                    if (victim != my_id) {
                        if (auto t = queues_[victim].steal()) {
                            task = std::move(*t);
                            found = true;
                            break;
                        }
                    }
                }
                
                if (!found) {
                    // 仕事がない → 少し待つ
                    std::this_thread::yield();
                    continue;
                }
            }
            
            // タスク実行
            task();
        }
    }
    
    size_t num_threads_;
    std::vector<WorkStealingQueue<Task>> queues_;
    std::vector<std::thread> workers_;
    std::atomic<bool> stop_;
};

使い方

int main() {
    WorkStealingThreadPool pool(4);
    
    std::atomic<int> counter{0};
    
    // 1000個のタスクを投入
    for (int i = 0; i < 1000; ++i) {
        pool.submit([&counter] {
            ++counter;
            std::this_thread::sleep_for(std::chrono::microseconds(100));
        });
    }
    
    // 完了を待つ(実際はFuture/Promiseを使う)
    while (counter.load() < 1000) {
        std::this_thread::sleep_for(std::chrono::milliseconds(10));
    }
    
    std::cout << "All tasks completed: " << counter.load() << "\n";
    return 0;
}

スティーリング戦略

どのスレッドから盗むかはパフォーマンスに大きく影響する。

ランダム選択(今の実装)

size_t victim = dist(rng);  // ランダムに選ぶ
  • ✅ シンプル
  • ✅ 偏りがない
  • ❌ 空のキューにアクセスする無駄がある

ラウンドロビン

size_t victim = (my_id + attempt) % num_threads_;
  • ✅ 全キューを順番にチェック
  • ❌ 特定のキューにアクセスが集中することがある

隣接スレッド優先

// CPUキャッシュの局所性を考慮
size_t victim = (my_id + 1) % num_threads_;
  • ✅ キャッシュヒット率が上がる可能性
  • ❌ 負荷分散が偏る可能性

実践的なアプローチ

// 最初は近くのスレッドから探し、徐々に遠くを探す
size_t victim;
if (attempt < num_threads_) {
    victim = (my_id + attempt + 1) % num_threads_;
} else {
    victim = dist(rng);  // それでもなければランダム
}

スレッドローカルなタスク追加

タスクの中から新しいタスクを追加する場合、自分のキューに直接追加したほうが効率的。

// スレッドローカルストレージで自分のワーカーIDを保持
thread_local size_t tls_worker_id = SIZE_MAX;

void worker_loop(size_t my_id) {
    tls_worker_id = my_id;  // 自分のIDを記録
    // ...
}

void submit_local(Task task) {
    if (tls_worker_id != SIZE_MAX) {
        // ワーカースレッドから呼ばれた → 自分のキューに追加
        queues_[tls_worker_id].push(std::move(task));
    } else {
        // 外部から呼ばれた → 通常のsubmit
        submit(std::move(task));
    }
}

これで再帰的なタスク分割(Fork-Joinパターン)が効率的になる。

性能比較

簡単なベンチマークを取ってみよう。

#include <chrono>

void benchmark(const std::string& name, auto&& pool, int num_tasks) {
    std::atomic<int> counter{0};
    
    auto start = std::chrono::high_resolution_clock::now();
    
    for (int i = 0; i < num_tasks; ++i) {
        pool.submit([&counter] {
            ++counter;
        });
    }
    
    while (counter.load() < num_tasks) {
        std::this_thread::yield();
    }
    
    auto end = std::chrono::high_resolution_clock::now();
    auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(end - start);
    
    std::cout << name << ": " << duration.count() << "ms\n";
}
共有キュー方式: 156ms
ワークスティーリング: 47ms  ← 3倍速い!

タスクが軽量で数が多いほど、ワークスティーリングの効果が大きくなる。

まとめ

トピック ポイント
ワークスティーリング 各スレッドが専用キュー + 暇なら盗む
Chase-Lev Deque Lock-freeで所有者/泥棒が共存
メモリオーダリング acquire/releaseで正しい順序を保証
スティーリング戦略 ランダム or 隣接優先
パフォーマンス 共有キューより数倍高速

次回はFuture/Promiseを実装して、タスクの結果を受け取れるようにするよ。

この記事が役に立ったら、いいね・ストックしてもらえると嬉しいです!

8
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
8
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?