C++スレッドプール自作シリーズ
| Part1 std::thread | Part2 ワークスティーリング | Part3 Future/Promise | Part4 ベンチマーク |
|---|---|---|---|
| ✅ | 👈 Now | - | - |
はじめに
前回は基本的なスレッドプールを作った。
でもあれには致命的な問題がある。
┌────────────────────────────────────────────────────────┐
│ 全スレッドが1つのキューに群がる │
│ │
│ Thread1 ─┐ │
│ Thread2 ─┼──→ [ 共有キュー ] ← mutex でロック │
│ Thread3 ─┤ │
│ Thread4 ─┘ │
│ │
│ → スレッドが増えるとロック競合が激化 │
└────────────────────────────────────────────────────────┘
これを解決するのがワークスティーリングという技術。
ワークスティーリングとは
各スレッドが自分専用のキューを持つ。暇なスレッドは他のスレッドのキューから仕事を盗む。
┌──────────────────────────────────────────────────────────────┐
│ ワークスティーリング │
│ │
│ Thread1 ──→ [ キュー1: ■■■■■ ] │
│ Thread2 ──→ [ キュー2: ■ ] │
│ Thread3 ──→ [ キュー3: ■■■■■■■ ] │
│ Thread4 ──→ [ キュー4: (空) ] ──→ キュー3から盗む! │
│ │
│ → ロック競合が減り、負荷が自動的に分散される │
└──────────────────────────────────────────────────────────────┘
なぜ効率的なのか
| 方式 | ロック頻度 | 負荷分散 |
|---|---|---|
| 共有キュー | 毎回ロック | 自動(ただし競合) |
| ローカルキューのみ | なし | 手動で振り分け必要 |
| ワークスティーリング | 盗むときだけ | 自動で最適化 |
Lock-Free キューの実装
ワークスティーリングを実現するには、ロックなしで動作するキューが欲しい。
Chase-Lev Deque
ワークスティーリングの定番データ構造。
- 所有者スレッド: 片方の端からpush/pop(高速)
- 泥棒スレッド: 反対側の端からsteal(たまにしか起きない)
┌────────────────────────────────────────────────────────────┐
│ Chase-Lev Deque │
│ │
│ Bottom (所有者が使う) Top (泥棒が使う) │
│ ↓ ↓ │
│ [ Task1 | Task2 | Task3 | Task4 | Task5 ] │
│ ↑ push/pop ↑ steal │
│ (ロック不要) (CASで競合解決) │
└────────────────────────────────────────────────────────────┘
実装コード
#include <atomic>
#include <vector>
#include <optional>
template<typename T>
class WorkStealingQueue {
public:
static constexpr size_t INITIAL_CAPACITY = 1024;
WorkStealingQueue()
: buffer_(new CircularBuffer(INITIAL_CAPACITY))
, top_(0)
, bottom_(0)
{}
~WorkStealingQueue() {
delete buffer_.load(std::memory_order_relaxed);
}
// 所有者スレッドがタスクを追加
void push(T item) {
int64_t b = bottom_.load(std::memory_order_relaxed);
int64_t t = top_.load(std::memory_order_acquire);
CircularBuffer* buf = buffer_.load(std::memory_order_relaxed);
// バッファが満杯なら拡張
if (b - t >= static_cast<int64_t>(buf->capacity()) - 1) {
buf = resize(buf, b, t);
}
buf->set(b, std::move(item));
std::atomic_thread_fence(std::memory_order_release);
bottom_.store(b + 1, std::memory_order_relaxed);
}
// 所有者スレッドがタスクを取得
std::optional<T> pop() {
int64_t b = bottom_.load(std::memory_order_relaxed) - 1;
CircularBuffer* buf = buffer_.load(std::memory_order_relaxed);
bottom_.store(b, std::memory_order_relaxed);
std::atomic_thread_fence(std::memory_order_seq_cst);
int64_t t = top_.load(std::memory_order_relaxed);
if (t <= b) {
// キューは空でない
T item = buf->get(b);
if (t == b) {
// 最後の1つ → 泥棒と競合する可能性
if (!top_.compare_exchange_strong(
t, t + 1,
std::memory_order_seq_cst,
std::memory_order_relaxed)) {
// 泥棒に取られた
bottom_.store(t + 1, std::memory_order_relaxed);
return std::nullopt;
}
bottom_.store(t + 1, std::memory_order_relaxed);
}
return item;
} else {
// キューは空
bottom_.store(t, std::memory_order_relaxed);
return std::nullopt;
}
}
// 泥棒スレッドがタスクを盗む
std::optional<T> steal() {
int64_t t = top_.load(std::memory_order_acquire);
std::atomic_thread_fence(std::memory_order_seq_cst);
int64_t b = bottom_.load(std::memory_order_acquire);
if (t < b) {
// キューは空でない
CircularBuffer* buf = buffer_.load(std::memory_order_consume);
T item = buf->get(t);
if (!top_.compare_exchange_strong(
t, t + 1,
std::memory_order_seq_cst,
std::memory_order_relaxed)) {
// 他の泥棒に取られた
return std::nullopt;
}
return item;
}
return std::nullopt;
}
bool empty() const {
int64_t b = bottom_.load(std::memory_order_relaxed);
int64_t t = top_.load(std::memory_order_relaxed);
return b <= t;
}
private:
struct CircularBuffer {
std::vector<T> data;
explicit CircularBuffer(size_t cap) : data(cap) {}
size_t capacity() const { return data.size(); }
T& get(int64_t i) {
return data[i % data.size()];
}
void set(int64_t i, T item) {
data[i % data.size()] = std::move(item);
}
};
CircularBuffer* resize(CircularBuffer* old_buf, int64_t b, int64_t t) {
CircularBuffer* new_buf = new CircularBuffer(old_buf->capacity() * 2);
for (int64_t i = t; i < b; ++i) {
new_buf->set(i, old_buf->get(i));
}
buffer_.store(new_buf, std::memory_order_release);
// Note: 古いバッファのメモリリークは、実際の実装ではEpochベースのGCで解決
return new_buf;
}
std::atomic<CircularBuffer*> buffer_;
std::atomic<int64_t> top_;
std::atomic<int64_t> bottom_;
};
メモリオーダリングの解説
このコード、memory_orderだらけで意味不明だよね。整理しよう。
| メモリオーダー | 意味 | 使いどころ |
|---|---|---|
relaxed |
順序保証なし | 単純なカウンター |
acquire |
これ以降の読み書きが前に移動しない | ロックの取得 |
release |
これ以前の読み書きが後に移動しない | ロックの解放 |
seq_cst |
全スレッドで一貫した順序 | 最も安全(最も遅い) |
// push()の最後
std::atomic_thread_fence(std::memory_order_release);
bottom_.store(b + 1, std::memory_order_relaxed);
これは「データを書き込んでからbottomを更新する」順序を保証してる。逆だと泥棒が未初期化のデータを読んでしまう。
ワークスティーリング・スレッドプール
Chase-Lev Dequeを使って、本格的なスレッドプールを作ろう。
#include <thread>
#include <functional>
#include <random>
#include <iostream>
class WorkStealingThreadPool {
public:
using Task = std::function<void()>;
explicit WorkStealingThreadPool(size_t num_threads = 0)
: num_threads_(num_threads == 0 ? std::thread::hardware_concurrency() : num_threads)
, queues_(num_threads_)
, stop_(false)
{
// 各スレッドを起動
for (size_t i = 0; i < num_threads_; ++i) {
workers_.emplace_back([this, i] { worker_loop(i); });
}
}
~WorkStealingThreadPool() {
stop_.store(true, std::memory_order_release);
for (auto& worker : workers_) {
worker.join();
}
}
// タスクを投入(ラウンドロビンで分配)
void submit(Task task) {
static std::atomic<size_t> next_queue{0};
size_t idx = next_queue.fetch_add(1, std::memory_order_relaxed) % num_threads_;
queues_[idx].push(std::move(task));
}
// 特定のキューに投入
void submit_to(size_t worker_id, Task task) {
queues_[worker_id % num_threads_].push(std::move(task));
}
private:
void worker_loop(size_t my_id) {
// 乱数生成器(盗む相手を選ぶ用)
std::mt19937 rng(my_id);
std::uniform_int_distribution<size_t> dist(0, num_threads_ - 1);
while (!stop_.load(std::memory_order_acquire)) {
Task task;
// 1. まず自分のキューから取る
if (auto t = queues_[my_id].pop()) {
task = std::move(*t);
}
// 2. なければ他のキューから盗む
else {
bool found = false;
for (size_t attempt = 0; attempt < num_threads_ * 2; ++attempt) {
size_t victim = dist(rng);
if (victim != my_id) {
if (auto t = queues_[victim].steal()) {
task = std::move(*t);
found = true;
break;
}
}
}
if (!found) {
// 仕事がない → 少し待つ
std::this_thread::yield();
continue;
}
}
// タスク実行
task();
}
}
size_t num_threads_;
std::vector<WorkStealingQueue<Task>> queues_;
std::vector<std::thread> workers_;
std::atomic<bool> stop_;
};
使い方
int main() {
WorkStealingThreadPool pool(4);
std::atomic<int> counter{0};
// 1000個のタスクを投入
for (int i = 0; i < 1000; ++i) {
pool.submit([&counter] {
++counter;
std::this_thread::sleep_for(std::chrono::microseconds(100));
});
}
// 完了を待つ(実際はFuture/Promiseを使う)
while (counter.load() < 1000) {
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
std::cout << "All tasks completed: " << counter.load() << "\n";
return 0;
}
スティーリング戦略
どのスレッドから盗むかはパフォーマンスに大きく影響する。
ランダム選択(今の実装)
size_t victim = dist(rng); // ランダムに選ぶ
- ✅ シンプル
- ✅ 偏りがない
- ❌ 空のキューにアクセスする無駄がある
ラウンドロビン
size_t victim = (my_id + attempt) % num_threads_;
- ✅ 全キューを順番にチェック
- ❌ 特定のキューにアクセスが集中することがある
隣接スレッド優先
// CPUキャッシュの局所性を考慮
size_t victim = (my_id + 1) % num_threads_;
- ✅ キャッシュヒット率が上がる可能性
- ❌ 負荷分散が偏る可能性
実践的なアプローチ
// 最初は近くのスレッドから探し、徐々に遠くを探す
size_t victim;
if (attempt < num_threads_) {
victim = (my_id + attempt + 1) % num_threads_;
} else {
victim = dist(rng); // それでもなければランダム
}
スレッドローカルなタスク追加
タスクの中から新しいタスクを追加する場合、自分のキューに直接追加したほうが効率的。
// スレッドローカルストレージで自分のワーカーIDを保持
thread_local size_t tls_worker_id = SIZE_MAX;
void worker_loop(size_t my_id) {
tls_worker_id = my_id; // 自分のIDを記録
// ...
}
void submit_local(Task task) {
if (tls_worker_id != SIZE_MAX) {
// ワーカースレッドから呼ばれた → 自分のキューに追加
queues_[tls_worker_id].push(std::move(task));
} else {
// 外部から呼ばれた → 通常のsubmit
submit(std::move(task));
}
}
これで再帰的なタスク分割(Fork-Joinパターン)が効率的になる。
性能比較
簡単なベンチマークを取ってみよう。
#include <chrono>
void benchmark(const std::string& name, auto&& pool, int num_tasks) {
std::atomic<int> counter{0};
auto start = std::chrono::high_resolution_clock::now();
for (int i = 0; i < num_tasks; ++i) {
pool.submit([&counter] {
++counter;
});
}
while (counter.load() < num_tasks) {
std::this_thread::yield();
}
auto end = std::chrono::high_resolution_clock::now();
auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(end - start);
std::cout << name << ": " << duration.count() << "ms\n";
}
共有キュー方式: 156ms
ワークスティーリング: 47ms ← 3倍速い!
タスクが軽量で数が多いほど、ワークスティーリングの効果が大きくなる。
まとめ
| トピック | ポイント |
|---|---|
| ワークスティーリング | 各スレッドが専用キュー + 暇なら盗む |
| Chase-Lev Deque | Lock-freeで所有者/泥棒が共存 |
| メモリオーダリング | acquire/releaseで正しい順序を保証 |
| スティーリング戦略 | ランダム or 隣接優先 |
| パフォーマンス | 共有キューより数倍高速 |
次回はFuture/Promiseを実装して、タスクの結果を受け取れるようにするよ。
この記事が役に立ったら、いいね・ストックしてもらえると嬉しいです!