LoginSignup
8
10

More than 3 years have passed since last update.

C++14で超ミニマルなスレッドプール

Last updated at Posted at 2019-07-09

ミニマルにスレッドプールを実装した例があまり見当たらないためサクッとスレッドプール実装したいという方の参考になれば幸いです。
並列コードは書き慣れていないのでミスがあったらご教示ください。

とりあえずコード全文。手元でC++14で開発していたので14と書いてありますが11でも行ける気がします(確認はしてません)。

#ifndef __KTNYT_THREAD_POOL_HPP__
#define __KTNYT_THREAD_POOL_HPP__

#include <thread>
#include <functional>
#include <mutex>
#include <condition_variable>
#include <queue>
#include <vector>

namespace ktnyt {

class thread_pool {
 public:
  thread_pool(std::size_t size) : stop(false) {
    for (std::size_t i = 0; i < size; ++i) {
      workers.emplace_back([this] { spawn(); });
    }
  }

  virtual ~thread_pool() {
    if (!stop) join();
  }

  void post(std::function<void()> f) {
    {
      std::unique_lock<std::mutex> lock(mutex);
      tasks.push(f);
    }

    condition.notify_one();
  }

  void join() {
    {
      std::unique_lock<std::mutex> lock(mutex);
      stop = true;
    }

    condition.notify_all();

    for (std::size_t i = 0; i < workers.size(); ++i) {
      workers[i].join();
    }
  }

 private:
  void spawn() {
    for (;;) {
      std::function<void()> task;
      {
        std::unique_lock<std::mutex> lock(mutex);
        condition.wait(lock, [this] { return !tasks.empty() || stop; });
        if (stop && tasks.empty()) return;
        task = std::move(tasks.front());
        tasks.pop();
      }
      task();
    }
  }

  std::vector<std::thread> workers;
  std::queue<std::function<void()>> tasks;

  std::mutex mutex;
  std::condition_variable condition;
  bool stop;
};

}  // namespace ktnyt

#endif  // __KTNYT_THREAD_POOL_HPP__

コード解説

この実装のキモはspawn()メソッドにあります。

  void spawn() {
    for (;;) {

スレッドプールがjoinされているかどうかはbool stopで表現しているので偽の間はループします。

      std::function<void()> task;
      {
        std::unique_lock<std::mutex> lock(mutex);
        std::unique_lock<std::mutex> lock(mutex);
        condition.wait(lock, [this] { return !tasks.empty() || stop; });

まずキュー先頭のタスクを代入するためのstd::function<void()>を定義しておきます。
タスクキューの中身を確認するためにロックして次の条件のいずれかが満たされるのを待ちます。
1. タスクキューにタスクが追加される。
2. スレッドプールがjoinされる。

        if (stop && tasks.empty()) return;
        task = std::move(tasks.front());
        tasks.pop();
      }
      task();
    }
  }

この時にjoinされていてかつタスクキューが空の場合にはreturnします。
そうでなければタスクキューが空ではないので、先頭のタスクを取得してロックのスコープ外で実行します。
このメソッドをワーカスレッドの数の分だけコンストラクタ内で立ち上げます。

  thread_pool(std::size_t size) : stop(false) {
    for (std::size_t i = 0; i < size; ++i) {
      workers.emplace_back([this] { spawn(); });
    }
  }

あとでjoinする必要があるためワーカスレッドはベクトルに保持しておきます。

  void join() {
    {
      std::unique_lock<std::mutex> lock(mutex);
      stop = true;
    }

    condition.notify_all();

まず無名スコープでロックして全スレッドにjoinが呼ばれたことを通知します。

    for (std::size_t i = 0; i < workers.size(); ++i) {
      workers[i].join();
    }
  }

これで各ワーカスレッドがループから抜けるのであとは個別にjoinします。

  virtual ~thread_pool() {
    if (!stop) join();
  }

デストラクタはすでにjoinされていなければjoinします。

  void post(std::function<void()> f) {
    {
      std::unique_lock<std::mutex> lock(mutex);
      tasks.push(f);
    }

    condition.notify_one();
  }

タスクの追加のときにはしっかりtasksを排他制御します。
pushしたら通知して待っているスレッドを起こします。

8
10
4

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
8
10