6
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

C++スレッドプール自作シリーズ

Part1 std::thread Part2 ワークスティーリング Part3 Future/Promise Part4 ベンチマーク
👈 Now -

はじめに

前回までで、タスクを並列に実行できるようになった。

でも、タスクの結果を受け取れないという問題がある。

pool.submit([]{
    return heavy_calculation();  // この結果、どうやって受け取る?
});

今回はFuture/Promiseパターンを実装して、タスクの完了を待ったり結果を受け取ったりできるようにする。

Future/Promiseパターンとは

Promise: 値を「いつか設定する」という約束
Future: その値を「いつか受け取る」という権利

┌────────────────────────────────────────────────────────────┐
│ Future/Promise パターン                                    │
│                                                            │
│   Producer Thread          Consumer Thread                 │
│        │                        │                          │
│   Promise作成 ─────────→ Future取得                        │
│        │                        │                          │
│   計算を実行                     │                          │
│        │                        │ future.get()で待機       │
│   promise.set_value(結果)       │                          │
│        │ ─────────────────────→ │ 結果を受け取る          │
└────────────────────────────────────────────────────────────┘

標準ライブラリのstd::future

C++11にはstd::futurestd::promiseがある。

#include <future>
#include <iostream>

int main() {
    std::promise<int> promise;
    std::future<int> future = promise.get_future();
    
    std::thread t([&promise] {
        std::this_thread::sleep_for(std::chrono::seconds(1));
        promise.set_value(42);  // 値を設定
    });
    
    std::cout << "Waiting...\n";
    int result = future.get();  // ブロックして待つ
    std::cout << "Result: " << result << "\n";
    
    t.join();
    return 0;
}
Waiting...
(1秒後)
Result: 42

std::asyncも便利

#include <future>
#include <iostream>

int heavy_calculation() {
    std::this_thread::sleep_for(std::chrono::seconds(1));
    return 42;
}

int main() {
    // 非同期でタスクを開始
    std::future<int> future = std::async(std::launch::async, heavy_calculation);
    
    std::cout << "Doing other work...\n";
    
    int result = future.get();
    std::cout << "Result: " << result << "\n";
    
    return 0;
}

自作Future/Promise

標準ライブラリのものを使えばいいじゃん、と思うかもしれないけど、自作スレッドプールと統合するには自作したほうがいい。

SharedStateの設計

FutureとPromiseは**共有状態(SharedState)**を介して通信する。

template<typename T>
struct SharedState {
    std::mutex mutex;
    std::condition_variable cv;
    
    std::optional<T> value;
    std::exception_ptr exception;
    bool ready = false;
};

Promise実装

template<typename T>
class Promise {
public:
    Promise() : state_(std::make_shared<SharedState<T>>()) {}
    
    // Futureを取得(1回だけ)
    Future<T> get_future() {
        if (future_retrieved_) {
            throw std::runtime_error("Future already retrieved");
        }
        future_retrieved_ = true;
        return Future<T>(state_);
    }
    
    // 値を設定
    void set_value(T value) {
        std::lock_guard<std::mutex> lock(state_->mutex);
        
        if (state_->ready) {
            throw std::runtime_error("Value already set");
        }
        
        state_->value = std::move(value);
        state_->ready = true;
        state_->cv.notify_all();
    }
    
    // 例外を設定
    void set_exception(std::exception_ptr e) {
        std::lock_guard<std::mutex> lock(state_->mutex);
        
        if (state_->ready) {
            throw std::runtime_error("Value already set");
        }
        
        state_->exception = e;
        state_->ready = true;
        state_->cv.notify_all();
    }

private:
    std::shared_ptr<SharedState<T>> state_;
    bool future_retrieved_ = false;
};

Future実装

template<typename T>
class Future {
public:
    explicit Future(std::shared_ptr<SharedState<T>> state) 
        : state_(std::move(state)) {}
    
    // 結果を取得(ブロック)
    T get() {
        std::unique_lock<std::mutex> lock(state_->mutex);
        
        state_->cv.wait(lock, [this] { return state_->ready; });
        
        if (state_->exception) {
            std::rethrow_exception(state_->exception);
        }
        
        return std::move(*state_->value);
    }
    
    // タイムアウト付きで待つ
    template<typename Rep, typename Period>
    bool wait_for(const std::chrono::duration<Rep, Period>& timeout) {
        std::unique_lock<std::mutex> lock(state_->mutex);
        return state_->cv.wait_for(lock, timeout, [this] { 
            return state_->ready; 
        });
    }
    
    // 準備完了かチェック(ブロックしない)
    bool is_ready() const {
        std::lock_guard<std::mutex> lock(state_->mutex);
        return state_->ready;
    }
    
    // 有効なFutureか
    bool valid() const {
        return state_ != nullptr;
    }

private:
    std::shared_ptr<SharedState<T>> state_;
};

void特殊化

戻り値がない場合のために、void特殊化も必要。

template<>
struct SharedState<void> {
    std::mutex mutex;
    std::condition_variable cv;
    
    std::exception_ptr exception;
    bool ready = false;
};

template<>
class Promise<void> {
public:
    Promise() : state_(std::make_shared<SharedState<void>>()) {}
    
    Future<void> get_future() {
        if (future_retrieved_) {
            throw std::runtime_error("Future already retrieved");
        }
        future_retrieved_ = true;
        return Future<void>(state_);
    }
    
    void set_value() {
        std::lock_guard<std::mutex> lock(state_->mutex);
        if (state_->ready) {
            throw std::runtime_error("Value already set");
        }
        state_->ready = true;
        state_->cv.notify_all();
    }
    
    void set_exception(std::exception_ptr e) {
        std::lock_guard<std::mutex> lock(state_->mutex);
        if (state_->ready) {
            throw std::runtime_error("Value already set");
        }
        state_->exception = e;
        state_->ready = true;
        state_->cv.notify_all();
    }

private:
    std::shared_ptr<SharedState<void>> state_;
    bool future_retrieved_ = false;
};

template<>
class Future<void> {
public:
    explicit Future(std::shared_ptr<SharedState<void>> state) 
        : state_(std::move(state)) {}
    
    void get() {
        std::unique_lock<std::mutex> lock(state_->mutex);
        state_->cv.wait(lock, [this] { return state_->ready; });
        
        if (state_->exception) {
            std::rethrow_exception(state_->exception);
        }
    }
    
    bool is_ready() const {
        std::lock_guard<std::mutex> lock(state_->mutex);
        return state_->ready;
    }

private:
    std::shared_ptr<SharedState<void>> state_;
};

スレッドプールとの統合

Future/Promiseをスレッドプールと統合しよう。

class ThreadPool {
public:
    // 戻り値のあるタスクを投入
    template<typename F, typename... Args>
    auto submit(F&& f, Args&&... args) 
        -> Future<std::invoke_result_t<F, Args...>>
    {
        using ResultType = std::invoke_result_t<F, Args...>;
        
        auto promise = std::make_shared<Promise<ResultType>>();
        auto future = promise->get_future();
        
        // タスクをラップ
        auto task = [promise = std::move(promise), 
                     f = std::forward<F>(f),
                     ...args = std::forward<Args>(args)]() mutable {
            try {
                if constexpr (std::is_void_v<ResultType>) {
                    std::invoke(f, args...);
                    promise->set_value();
                } else {
                    promise->set_value(std::invoke(f, args...));
                }
            } catch (...) {
                promise->set_exception(std::current_exception());
            }
        };
        
        enqueue(std::move(task));
        return future;
    }

private:
    void enqueue(std::function<void()> task);
    // ... 他のメンバー
};

使い方

int main() {
    ThreadPool pool(4);
    
    // 戻り値を受け取る
    auto future1 = pool.submit([] {
        std::this_thread::sleep_for(std::chrono::milliseconds(100));
        return 42;
    });
    
    // 引数を渡す
    auto future2 = pool.submit([](int a, int b) {
        return a + b;
    }, 10, 20);
    
    // 例外をキャッチ
    auto future3 = pool.submit([] {
        throw std::runtime_error("Oops!");
        return 0;
    });
    
    std::cout << "Result 1: " << future1.get() << "\n";
    std::cout << "Result 2: " << future2.get() << "\n";
    
    try {
        future3.get();
    } catch (const std::exception& e) {
        std::cout << "Exception: " << e.what() << "\n";
    }
    
    return 0;
}
Result 1: 42
Result 2: 30
Exception: Oops!

then()でチェーン処理

JavaScriptのPromiseみたいに、.then()でチェーンできると便利。

template<typename T>
class Future {
public:
    // 完了後に別の処理を実行
    template<typename F>
    auto then(ThreadPool& pool, F&& f) 
        -> Future<std::invoke_result_t<F, T>>
    {
        using NextType = std::invoke_result_t<F, T>;
        
        auto promise = std::make_shared<Promise<NextType>>();
        auto next_future = promise->get_future();
        
        // 現在のFutureの完了を監視するタスクを投入
        pool.submit([state = state_, 
                     promise = std::move(promise),
                     f = std::forward<F>(f)]() mutable {
            try {
                // 元のFutureの結果を待つ
                std::unique_lock<std::mutex> lock(state->mutex);
                state->cv.wait(lock, [&] { return state->ready; });
                
                if (state->exception) {
                    promise->set_exception(state->exception);
                } else {
                    if constexpr (std::is_void_v<NextType>) {
                        f(std::move(*state->value));
                        promise->set_value();
                    } else {
                        promise->set_value(f(std::move(*state->value)));
                    }
                }
            } catch (...) {
                promise->set_exception(std::current_exception());
            }
        });
        
        return next_future;
    }
    
    // ... 他のメソッド
};

使い方

int main() {
    ThreadPool pool(4);
    
    auto future = pool.submit([] {
        return 10;
    })
    .then(pool, [](int x) {
        return x * 2;  // 20
    })
    .then(pool, [](int x) {
        return x + 5;  // 25
    })
    .then(pool, [](int x) {
        std::cout << "Final: " << x << "\n";
    });
    
    future.get();  // チェーン全体の完了を待つ
    return 0;
}
Final: 25

when_all: 複数のFutureを待つ

複数のタスクがすべて完了するのを待ちたい場合。

template<typename... Futures>
auto when_all(Futures&&... futures) {
    return std::make_tuple(futures.get()...);
}

// vector版
template<typename T>
std::vector<T> when_all(std::vector<Future<T>>& futures) {
    std::vector<T> results;
    results.reserve(futures.size());
    
    for (auto& future : futures) {
        results.push_back(future.get());
    }
    
    return results;
}

使い方

int main() {
    ThreadPool pool(4);
    
    std::vector<Future<int>> futures;
    
    for (int i = 0; i < 10; ++i) {
        futures.push_back(pool.submit([i] {
            std::this_thread::sleep_for(std::chrono::milliseconds(100));
            return i * i;
        }));
    }
    
    auto results = when_all(futures);
    
    for (int r : results) {
        std::cout << r << " ";
    }
    std::cout << "\n";
    
    return 0;
}
0 1 4 9 16 25 36 49 64 81

when_any: 最初に完了したものを取得

template<typename T>
std::pair<size_t, T> when_any(std::vector<Future<T>>& futures) {
    while (true) {
        for (size_t i = 0; i < futures.size(); ++i) {
            if (futures[i].is_ready()) {
                return {i, futures[i].get()};
            }
        }
        std::this_thread::yield();
    }
}

より効率的な実装

上の実装はビジーウェイト。共有の待機機構を使うほうがいい。

template<typename T>
class WhenAnyState {
public:
    std::mutex mutex;
    std::condition_variable cv;
    std::optional<std::pair<size_t, T>> result;
    bool done = false;
    
    void set_result(size_t index, T value) {
        std::lock_guard<std::mutex> lock(mutex);
        if (!done) {
            result = {index, std::move(value)};
            done = true;
            cv.notify_all();
        }
    }
};

template<typename T>
std::pair<size_t, T> when_any(ThreadPool& pool, std::vector<Future<T>>& futures) {
    auto state = std::make_shared<WhenAnyState<T>>();
    
    for (size_t i = 0; i < futures.size(); ++i) {
        pool.submit([state, &futures, i] {
            T value = futures[i].get();
            state->set_result(i, std::move(value));
        });
    }
    
    std::unique_lock<std::mutex> lock(state->mutex);
    state->cv.wait(lock, [&] { return state->done; });
    
    return std::move(*state->result);
}

PackagedTask

std::packaged_task相当の機能も実装しておこう。

template<typename R, typename... Args>
class PackagedTask;

template<typename R, typename... Args>
class PackagedTask<R(Args...)> {
public:
    template<typename F>
    explicit PackagedTask(F&& f) 
        : func_(std::forward<F>(f))
        , promise_(std::make_shared<Promise<R>>())
    {}
    
    Future<R> get_future() {
        return promise_->get_future();
    }
    
    void operator()(Args... args) {
        try {
            if constexpr (std::is_void_v<R>) {
                func_(std::forward<Args>(args)...);
                promise_->set_value();
            } else {
                promise_->set_value(func_(std::forward<Args>(args)...));
            }
        } catch (...) {
            promise_->set_exception(std::current_exception());
        }
    }

private:
    std::function<R(Args...)> func_;
    std::shared_ptr<Promise<R>> promise_;
};

使い方

int main() {
    PackagedTask<int(int, int)> task([](int a, int b) {
        return a * b;
    });
    
    auto future = task.get_future();
    
    std::thread t(std::move(task), 6, 7);
    
    std::cout << "Result: " << future.get() << "\n";
    
    t.join();
    return 0;
}
Result: 42

まとめ

トピック ポイント
SharedState FutureとPromiseの通信媒体
Promise 値または例外を設定する側
Future 値を受け取る側、get()でブロック
then() チェーン処理を可能にする
when_all すべての完了を待つ
when_any 最初の完了を待つ

次回はベンチマークとチューニングで、実際の性能を測って改善していくよ。

この記事が役に立ったら、いいね・ストックしてもらえると嬉しいです!

6
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
6
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?