ミニマルにスレッドプールを実装した例があまり見当たらないためサクッとスレッドプール実装したいという方の参考になれば幸いです。
並列コードは書き慣れていないのでミスがあったらご教示ください。
とりあえずコード全文。手元でC++14で開発していたので14と書いてありますが11でも行ける気がします(確認はしてません)。
# ifndef __KTNYT_THREAD_POOL_HPP__
# define __KTNYT_THREAD_POOL_HPP__
# include <thread>
# include <functional>
# include <mutex>
# include <condition_variable>
# include <queue>
# include <vector>
namespace ktnyt {
class thread_pool {
public:
thread_pool(std::size_t size) : stop(false) {
for (std::size_t i = 0; i < size; ++i) {
workers.emplace_back([this] { spawn(); });
}
}
virtual ~thread_pool() {
if (!stop) join();
}
void post(std::function<void()> f) {
{
std::unique_lock<std::mutex> lock(mutex);
tasks.push(f);
}
condition.notify_one();
}
void join() {
{
std::unique_lock<std::mutex> lock(mutex);
stop = true;
}
condition.notify_all();
for (std::size_t i = 0; i < workers.size(); ++i) {
workers[i].join();
}
}
private:
void spawn() {
for (;;) {
std::function<void()> task;
{
std::unique_lock<std::mutex> lock(mutex);
condition.wait(lock, [this] { return !tasks.empty() || stop; });
if (stop && tasks.empty()) return;
task = std::move(tasks.front());
tasks.pop();
}
task();
}
}
std::vector<std::thread> workers;
std::queue<std::function<void()>> tasks;
std::mutex mutex;
std::condition_variable condition;
bool stop;
};
} // namespace ktnyt
# endif // __KTNYT_THREAD_POOL_HPP__
コード解説
この実装のキモはspawn()
メソッドにあります。
void spawn() {
for (;;) {
スレッドプールがjoinされているかどうかはbool stop
で表現しているので偽の間はループします。
std::function<void()> task;
{
std::unique_lock<std::mutex> lock(mutex);
std::unique_lock<std::mutex> lock(mutex);
condition.wait(lock, [this] { return !tasks.empty() || stop; });
まずキュー先頭のタスクを代入するためのstd::function<void()>
を定義しておきます。
タスクキューの中身を確認するためにロックして次の条件のいずれかが満たされるのを待ちます。
- タスクキューにタスクが追加される。
- スレッドプールがjoinされる。
if (stop && tasks.empty()) return;
task = std::move(tasks.front());
tasks.pop();
}
task();
}
}
この時にjoinされていてかつタスクキューが空の場合にはreturnします。
そうでなければタスクキューが空ではないので、先頭のタスクを取得してロックのスコープ外で実行します。
このメソッドをワーカスレッドの数の分だけコンストラクタ内で立ち上げます。
thread_pool(std::size_t size) : stop(false) {
for (std::size_t i = 0; i < size; ++i) {
workers.emplace_back([this] { spawn(); });
}
}
あとでjoinする必要があるためワーカスレッドはベクトルに保持しておきます。
void join() {
{
std::unique_lock<std::mutex> lock(mutex);
stop = true;
}
condition.notify_all();
まず無名スコープでロックして全スレッドにjoinが呼ばれたことを通知します。
for (std::size_t i = 0; i < workers.size(); ++i) {
workers[i].join();
}
}
これで各ワーカスレッドがループから抜けるのであとは個別にjoinします。
virtual ~thread_pool() {
if (!stop) join();
}
デストラクタはすでにjoinされていなければjoinします。
void post(std::function<void()> f) {
{
std::unique_lock<std::mutex> lock(mutex);
tasks.push(f);
}
condition.notify_one();
}
タスクの追加のときにはしっかりtasksを排他制御します。
push
したら通知して待っているスレッドを起こします。