C++スレッドプール自作シリーズ
| Part1 std::thread | Part2 ワークスティーリング | Part3 Future/Promise | Part4 ベンチマーク |
|---|---|---|---|
| ✅ | ✅ | 👈 Now | - |
はじめに
前回までで、タスクを並列に実行できるようになった。
でも、タスクの結果を受け取れないという問題がある。
pool.submit([]{
return heavy_calculation(); // この結果、どうやって受け取る?
});
今回はFuture/Promiseパターンを実装して、タスクの完了を待ったり結果を受け取ったりできるようにする。
Future/Promiseパターンとは
Promise: 値を「いつか設定する」という約束
Future: その値を「いつか受け取る」という権利
┌────────────────────────────────────────────────────────────┐
│ Future/Promise パターン │
│ │
│ Producer Thread Consumer Thread │
│ │ │ │
│ Promise作成 ─────────→ Future取得 │
│ │ │ │
│ 計算を実行 │ │
│ │ │ future.get()で待機 │
│ promise.set_value(結果) │ │
│ │ ─────────────────────→ │ 結果を受け取る │
└────────────────────────────────────────────────────────────┘
標準ライブラリのstd::future
C++11にはstd::futureとstd::promiseがある。
#include <future>
#include <iostream>
int main() {
std::promise<int> promise;
std::future<int> future = promise.get_future();
std::thread t([&promise] {
std::this_thread::sleep_for(std::chrono::seconds(1));
promise.set_value(42); // 値を設定
});
std::cout << "Waiting...\n";
int result = future.get(); // ブロックして待つ
std::cout << "Result: " << result << "\n";
t.join();
return 0;
}
Waiting...
(1秒後)
Result: 42
std::asyncも便利
#include <future>
#include <iostream>
int heavy_calculation() {
std::this_thread::sleep_for(std::chrono::seconds(1));
return 42;
}
int main() {
// 非同期でタスクを開始
std::future<int> future = std::async(std::launch::async, heavy_calculation);
std::cout << "Doing other work...\n";
int result = future.get();
std::cout << "Result: " << result << "\n";
return 0;
}
自作Future/Promise
標準ライブラリのものを使えばいいじゃん、と思うかもしれないけど、自作スレッドプールと統合するには自作したほうがいい。
SharedStateの設計
FutureとPromiseは**共有状態(SharedState)**を介して通信する。
template<typename T>
struct SharedState {
std::mutex mutex;
std::condition_variable cv;
std::optional<T> value;
std::exception_ptr exception;
bool ready = false;
};
Promise実装
template<typename T>
class Promise {
public:
Promise() : state_(std::make_shared<SharedState<T>>()) {}
// Futureを取得(1回だけ)
Future<T> get_future() {
if (future_retrieved_) {
throw std::runtime_error("Future already retrieved");
}
future_retrieved_ = true;
return Future<T>(state_);
}
// 値を設定
void set_value(T value) {
std::lock_guard<std::mutex> lock(state_->mutex);
if (state_->ready) {
throw std::runtime_error("Value already set");
}
state_->value = std::move(value);
state_->ready = true;
state_->cv.notify_all();
}
// 例外を設定
void set_exception(std::exception_ptr e) {
std::lock_guard<std::mutex> lock(state_->mutex);
if (state_->ready) {
throw std::runtime_error("Value already set");
}
state_->exception = e;
state_->ready = true;
state_->cv.notify_all();
}
private:
std::shared_ptr<SharedState<T>> state_;
bool future_retrieved_ = false;
};
Future実装
template<typename T>
class Future {
public:
explicit Future(std::shared_ptr<SharedState<T>> state)
: state_(std::move(state)) {}
// 結果を取得(ブロック)
T get() {
std::unique_lock<std::mutex> lock(state_->mutex);
state_->cv.wait(lock, [this] { return state_->ready; });
if (state_->exception) {
std::rethrow_exception(state_->exception);
}
return std::move(*state_->value);
}
// タイムアウト付きで待つ
template<typename Rep, typename Period>
bool wait_for(const std::chrono::duration<Rep, Period>& timeout) {
std::unique_lock<std::mutex> lock(state_->mutex);
return state_->cv.wait_for(lock, timeout, [this] {
return state_->ready;
});
}
// 準備完了かチェック(ブロックしない)
bool is_ready() const {
std::lock_guard<std::mutex> lock(state_->mutex);
return state_->ready;
}
// 有効なFutureか
bool valid() const {
return state_ != nullptr;
}
private:
std::shared_ptr<SharedState<T>> state_;
};
void特殊化
戻り値がない場合のために、void特殊化も必要。
template<>
struct SharedState<void> {
std::mutex mutex;
std::condition_variable cv;
std::exception_ptr exception;
bool ready = false;
};
template<>
class Promise<void> {
public:
Promise() : state_(std::make_shared<SharedState<void>>()) {}
Future<void> get_future() {
if (future_retrieved_) {
throw std::runtime_error("Future already retrieved");
}
future_retrieved_ = true;
return Future<void>(state_);
}
void set_value() {
std::lock_guard<std::mutex> lock(state_->mutex);
if (state_->ready) {
throw std::runtime_error("Value already set");
}
state_->ready = true;
state_->cv.notify_all();
}
void set_exception(std::exception_ptr e) {
std::lock_guard<std::mutex> lock(state_->mutex);
if (state_->ready) {
throw std::runtime_error("Value already set");
}
state_->exception = e;
state_->ready = true;
state_->cv.notify_all();
}
private:
std::shared_ptr<SharedState<void>> state_;
bool future_retrieved_ = false;
};
template<>
class Future<void> {
public:
explicit Future(std::shared_ptr<SharedState<void>> state)
: state_(std::move(state)) {}
void get() {
std::unique_lock<std::mutex> lock(state_->mutex);
state_->cv.wait(lock, [this] { return state_->ready; });
if (state_->exception) {
std::rethrow_exception(state_->exception);
}
}
bool is_ready() const {
std::lock_guard<std::mutex> lock(state_->mutex);
return state_->ready;
}
private:
std::shared_ptr<SharedState<void>> state_;
};
スレッドプールとの統合
Future/Promiseをスレッドプールと統合しよう。
class ThreadPool {
public:
// 戻り値のあるタスクを投入
template<typename F, typename... Args>
auto submit(F&& f, Args&&... args)
-> Future<std::invoke_result_t<F, Args...>>
{
using ResultType = std::invoke_result_t<F, Args...>;
auto promise = std::make_shared<Promise<ResultType>>();
auto future = promise->get_future();
// タスクをラップ
auto task = [promise = std::move(promise),
f = std::forward<F>(f),
...args = std::forward<Args>(args)]() mutable {
try {
if constexpr (std::is_void_v<ResultType>) {
std::invoke(f, args...);
promise->set_value();
} else {
promise->set_value(std::invoke(f, args...));
}
} catch (...) {
promise->set_exception(std::current_exception());
}
};
enqueue(std::move(task));
return future;
}
private:
void enqueue(std::function<void()> task);
// ... 他のメンバー
};
使い方
int main() {
ThreadPool pool(4);
// 戻り値を受け取る
auto future1 = pool.submit([] {
std::this_thread::sleep_for(std::chrono::milliseconds(100));
return 42;
});
// 引数を渡す
auto future2 = pool.submit([](int a, int b) {
return a + b;
}, 10, 20);
// 例外をキャッチ
auto future3 = pool.submit([] {
throw std::runtime_error("Oops!");
return 0;
});
std::cout << "Result 1: " << future1.get() << "\n";
std::cout << "Result 2: " << future2.get() << "\n";
try {
future3.get();
} catch (const std::exception& e) {
std::cout << "Exception: " << e.what() << "\n";
}
return 0;
}
Result 1: 42
Result 2: 30
Exception: Oops!
then()でチェーン処理
JavaScriptのPromiseみたいに、.then()でチェーンできると便利。
template<typename T>
class Future {
public:
// 完了後に別の処理を実行
template<typename F>
auto then(ThreadPool& pool, F&& f)
-> Future<std::invoke_result_t<F, T>>
{
using NextType = std::invoke_result_t<F, T>;
auto promise = std::make_shared<Promise<NextType>>();
auto next_future = promise->get_future();
// 現在のFutureの完了を監視するタスクを投入
pool.submit([state = state_,
promise = std::move(promise),
f = std::forward<F>(f)]() mutable {
try {
// 元のFutureの結果を待つ
std::unique_lock<std::mutex> lock(state->mutex);
state->cv.wait(lock, [&] { return state->ready; });
if (state->exception) {
promise->set_exception(state->exception);
} else {
if constexpr (std::is_void_v<NextType>) {
f(std::move(*state->value));
promise->set_value();
} else {
promise->set_value(f(std::move(*state->value)));
}
}
} catch (...) {
promise->set_exception(std::current_exception());
}
});
return next_future;
}
// ... 他のメソッド
};
使い方
int main() {
ThreadPool pool(4);
auto future = pool.submit([] {
return 10;
})
.then(pool, [](int x) {
return x * 2; // 20
})
.then(pool, [](int x) {
return x + 5; // 25
})
.then(pool, [](int x) {
std::cout << "Final: " << x << "\n";
});
future.get(); // チェーン全体の完了を待つ
return 0;
}
Final: 25
when_all: 複数のFutureを待つ
複数のタスクがすべて完了するのを待ちたい場合。
template<typename... Futures>
auto when_all(Futures&&... futures) {
return std::make_tuple(futures.get()...);
}
// vector版
template<typename T>
std::vector<T> when_all(std::vector<Future<T>>& futures) {
std::vector<T> results;
results.reserve(futures.size());
for (auto& future : futures) {
results.push_back(future.get());
}
return results;
}
使い方
int main() {
ThreadPool pool(4);
std::vector<Future<int>> futures;
for (int i = 0; i < 10; ++i) {
futures.push_back(pool.submit([i] {
std::this_thread::sleep_for(std::chrono::milliseconds(100));
return i * i;
}));
}
auto results = when_all(futures);
for (int r : results) {
std::cout << r << " ";
}
std::cout << "\n";
return 0;
}
0 1 4 9 16 25 36 49 64 81
when_any: 最初に完了したものを取得
template<typename T>
std::pair<size_t, T> when_any(std::vector<Future<T>>& futures) {
while (true) {
for (size_t i = 0; i < futures.size(); ++i) {
if (futures[i].is_ready()) {
return {i, futures[i].get()};
}
}
std::this_thread::yield();
}
}
より効率的な実装
上の実装はビジーウェイト。共有の待機機構を使うほうがいい。
template<typename T>
class WhenAnyState {
public:
std::mutex mutex;
std::condition_variable cv;
std::optional<std::pair<size_t, T>> result;
bool done = false;
void set_result(size_t index, T value) {
std::lock_guard<std::mutex> lock(mutex);
if (!done) {
result = {index, std::move(value)};
done = true;
cv.notify_all();
}
}
};
template<typename T>
std::pair<size_t, T> when_any(ThreadPool& pool, std::vector<Future<T>>& futures) {
auto state = std::make_shared<WhenAnyState<T>>();
for (size_t i = 0; i < futures.size(); ++i) {
pool.submit([state, &futures, i] {
T value = futures[i].get();
state->set_result(i, std::move(value));
});
}
std::unique_lock<std::mutex> lock(state->mutex);
state->cv.wait(lock, [&] { return state->done; });
return std::move(*state->result);
}
PackagedTask
std::packaged_task相当の機能も実装しておこう。
template<typename R, typename... Args>
class PackagedTask;
template<typename R, typename... Args>
class PackagedTask<R(Args...)> {
public:
template<typename F>
explicit PackagedTask(F&& f)
: func_(std::forward<F>(f))
, promise_(std::make_shared<Promise<R>>())
{}
Future<R> get_future() {
return promise_->get_future();
}
void operator()(Args... args) {
try {
if constexpr (std::is_void_v<R>) {
func_(std::forward<Args>(args)...);
promise_->set_value();
} else {
promise_->set_value(func_(std::forward<Args>(args)...));
}
} catch (...) {
promise_->set_exception(std::current_exception());
}
}
private:
std::function<R(Args...)> func_;
std::shared_ptr<Promise<R>> promise_;
};
使い方
int main() {
PackagedTask<int(int, int)> task([](int a, int b) {
return a * b;
});
auto future = task.get_future();
std::thread t(std::move(task), 6, 7);
std::cout << "Result: " << future.get() << "\n";
t.join();
return 0;
}
Result: 42
まとめ
| トピック | ポイント |
|---|---|
| SharedState | FutureとPromiseの通信媒体 |
| Promise | 値または例外を設定する側 |
| Future | 値を受け取る側、get()でブロック |
| then() | チェーン処理を可能にする |
| when_all | すべての完了を待つ |
| when_any | 最初の完了を待つ |
次回はベンチマークとチューニングで、実際の性能を測って改善していくよ。
この記事が役に立ったら、いいね・ストックしてもらえると嬉しいです!