0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

g++ / clang と std::thread の今

0
Last updated at Posted at 2026-03-06

はじめに

以前 std::threadを生で使っても勝手に使いまわしてくれるらしい と書いたことがあるが,あれから時が経ち,いまはそうでもないらしい.AviUtl ExEdit2 がリリースされ,MYSY2 の UCRT64 版で移植したところ,シングルスレッドで動かしたほうが速いレベルで遅くなっていた.std::thread 生成のオーバーヘッドか,何らかの理由で不要な排他ロックが発生しているかぐらいしか理由は考えられなかったが,どうやらスレッド生成のオーバーヘッドが原因だったらしいとわかった.

そこで,かつて廃案にしたスレッドプール方式を改めて実装することにした.スレッドプールを実装するにあたり,以前よりはスマートにしようと思う.なお,投稿時点の g++ のバージョンは 15.2.0 である.

新しいスレッドプール

以下は,実際の書き方を少し変え,テンプレート化したもの.データはラムダ式で束縛し,ジョブはスレッド数で分割するのではなく,適当に分割して std::atomic のインクリメントを使い,重複なく各引数についてジョブ関数を実行するようにした.atomic なインクリメントができる形なら何でもいいのでテンプレートパラメータにした.実用上は int 固定で良さそうだが,何らかの理由で別の型のほうが自然なケースもあるかもしれない.

class ThreadPool
#include <thread>
#include <mutex>
#include <condition_variable>
#include <functional>
#include <atomic>
#include <vector>
#include <exception>

template<typename T>
class ThreadPool {
private:
	struct Thread {
		std::thread thread;
		std::mutex mx;
		std::condition_variable cv;
		bool ready=false;
	};
	bool alive=true;
	std::vector<Thread> threads;
	std::function<void(T)> func;
	std::atomic<T> current_i=0;
	T max_i=0;
	std::exception_ptr ep;
	void
	listen(Thread *th)
	{
		while (alive) {
			{ // ジョブが来るまで待機
				auto lk=std::unique_lock(th->mx);
				th->cv.wait(lk, [th]{ return th->ready; });
			}
			for ( T i=max_i; current_i<max_i; ) { // ジョブの取り出しと実行
				i = current_i++;
				try {
					if ( i < max_i ) {
						func(i);
					}
				} catch (...) { // func からの例外を捕捉
					ep = std::current_exception();
					current_i = max_i;
				}
			}
			{ // 全ジョブ完了
				auto lk=std::lock_guard(th->mx);
				th->ready = false;
			}
			th->cv.notify_one();
		}
	}
public:
	explicit ThreadPool(T n=std::thread::hardware_concurrency())
		: threads(n)
	{
		for (auto i=0uz; i<threads.size(); i++) {
			threads[i].thread = std::thread([this, i](){listen(&threads[i]);});
		}
	}
	~ThreadPool()
	{
		{
			alive = false;
			for (auto i=0uz; i<size; i++) {
				{
					auto lk=std::lock_guard(threads[i].mx);
					threads[i].ready = true;
				}
				threads[i].cv.notify_one();
			}
		}
		for (auto i=0uz; i<threads.size(); i++) {
			threads[i].thread.join();
		}
	}
	void
	parallel_do(std::function<void(T)> f, T n)
	{
		func = f; // ジョブ関数
		current_i = 0; max_i = n;
		for (auto i=0uz; i<threads.size(); i++) { // ワーカー起動
			{
				auto lk=std::lock_guard(threads[i].mx);
				threads[i].ready = true;
			}
			threads[i].cv.notify_one();
		}
		for (auto i=0uz; i<threads.size(); i++) { // 全ワーカーの終了を待つ
			auto lk=std::unique_lock(threads[i].mx);
			threads[i].cv.wait(lk, [this, i]{ return !(threads[i].ready); });
		}
		func = nullptr;
		if ( ep ) {
			std::rethrow_exception(std::exchange(ep, nullptr));
		}
	}
	void
	parallel_do_batched(std::function<void(int)> f, int n)
	{
		const int m = static_cast<int>(size);
		parallel_do( [&f, n, m](int i){
			const int s=(i*n)/m, e=((i+1)*n)/m;
			for (auto j=s; j<e; j++) {
				f(j);
			}
		}, m );
	}
};
static std::unique_ptr<ThreadPool<int>> TP;
EXTERN_C bool
InitializePlugin(DWORD version)
{
	TP = std::make_unique<ThreadPool<int>>();
	return true;
}
EXTERN_C void
UninitializePlugin()
{
	TP = nullptr;
}
呼び出し側
class ClipResize {
private:
	// 略
public:
	const PIXEL_RGBA *src;
	PIXEL_RGBA *dest;
	XY x, y;
	void
	invoke_set_weights(int i)
	{
		if ( i < x.var ) {
			x.set_weights(i);
		} else {
			y.set_weights(i-x.var);
		}
	}
	void
	invoke_calc_range(int i)
	{
		if ( i < x.dest_size ) {
			x.calc_range(i);
		} else {
			y.calc_range(i-x.dest_size);
		}
	}
	void
	invoke_interpolate(int dy)
	{
		for (int dx=0; dx<(x.dest_size); dx++) {
			interpolate(dx, dy);
		}
	}
};
static void
ksa_clip_resize(SCRIPT_MODULE_PARAM *param)
{
	ClipResize it;
	int i=0;
	it.src = static_cast<PIXEL_RGBA *>(param->get_param_data(i++));
	// 略
	
	TP->parallel_do([&it](int j){ it.invoke_set_weights(j); }, it.x.var + it.y.var);
	TP->parallel_do_batched([&it](int j){ it.invoke_calc_range(j); }, it.dest_sum());
	TP->parallel_do([&it](int j){ it.invoke_interpolate(j); }, it.y.dest_size);
}

おわりに

上記のようなスレッドプール方式を採用し,マルチスレッド処理による高速化を実現できた.

前回記事の時点では,条件変数を使わない方式に合わせて,ジョブをスレッド数に分割していたため,各ジョブの担当範囲がわかりにくくなっていたが,スッキリまとめられたのも良かった.初期案としては,std::queue によるジョブキューを生成していたが,整数引数のみで各ジョブを区別していたので,キューアクセスのためのミューテックスを廃止し,std::atomic<T> でインクリメントすればよく,オーバーヘッドを抑えられた.もっとも,ジョブの粒度が十分粗いので,この差は実行時間にはほとんど現れない.

2026/03/30 追記

もともと threads[i].thread = std::thread(listen, this, &threads[i]); としていたが,これを clang でコンパイルすると,非 static メンバ関数は呼ばれなければならないと怒られるので,ラムダ式にした.

また,各ジョブの粒度が細かすぎる場合,スレッド数個にジョブをまとめるバッチ処理を行う parallel_do_batched を追加.まとめ方を自動調整できればより良いが,とりあえずは十分だろう.

2026/04/08 追記

ワーカースレッドからの例外転送を追加。複数の例外が出ると競合するが、異常であることが parallel_do の呼び出し側に伝わればいいので、十分だと思う。

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?