16
17

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

C++標準threadの同期処理がめんどくさいのでラップする

Last updated at Posted at 2023-12-29

C++標準threadの同期処理がめんどくさいのでラップする

この記事はアドカレに参加しています。

C++でのマルチスレッド

 C++の標準ライブラリを使用してマルチスレッドをするには、threadincludeして以下のようにします。

#include <iostream>
#include <thread>
#include <windows.h>

void func(int i) {
	std::cout << i << std::endl;
}

int main() {
	//コンストラクタ(関数, 関数に渡す引数...)
	std::thread th(func, 0);

	//スレッドの終了を待つ
	th.join();

	system("pause");
	return 0;
}

書き方は様々あって、ラムダ式を使えたり、最初のコンストラクタで初期化する必要がなかったりいろいろです。

int main() {
	//コンストラクタ(空のオブジェクト)
	std::thread th;

    //コンストラクタでスレッドを作成して代入(ラムダ式, ラムダ式に渡す引数...)
	th = std::thread([=](int i) {std::cout << i << std::endl; }, 0);

	//スレッドの終了を待つ
	th.join();

	system("pause");
	return 0;
}

沢山のスレッドを扱うときはvectorを使ったりしますね。

#include <iostream>
#include <thread>
#include <vector>
#include <windows.h>

int main() {
	int n = 10;
	std::vector<std::thread> th;

	for (int i = 0; i < 10; i++)
		th.push_back(std::thread([=](int i)
			{
				std::cout << i;
			}, i));

	for (int i = 0; i < 10; i++)
		th[i].join();

	system("pause");
	return 0;
}

変数の保護(ミューテックス)

 以下のような処理をすると、実行するタイミングや環境によって結果が異なってしまいます。(n=10000ぐらいで実行すると、nの値とkの値が等しくないときがあります。)

int main() {
	int n = 10;
	std::vector<std::thread> th;

	int k = 0;

	for (int i = 0; i < n; i++)
		th.push_back(std::thread([=](int* k) {*k += 1; }, &k));

	for (int i = 0; i < n; i++)
		th[i].join();

	std::cout << "k = " << k << std::endl;

	system("pause");
	return 0;
}

このような書き方では、複数のスレッドがたまたま同じタイミングで*kの値を読み込んで*k+=1とした場合にkの値が本来意図する値よりも小さくなってしまいます。

これを防ぐには、排他制御と呼ばれる処理をする必要があります。C++標準ヘッダーのmutexを使用します。

#include <iostream>
#include <thread>
#include <mutex>
#include <vector>
#include <windows.h>

int main() {
	int n = 10;
	std::vector<std::thread> th;

	int k = 0;

	std::mutex mtx;
	for (int i = 0; i < n; i++)
		th.push_back(std::thread([=](int* k, std::mutex* mtx)
			{
				std::unique_lock<std::mutex> uniq_lk(*mtx);
				*k += 1;
			}, &k, &mtx));

	for (int i = 0; i < n; i++)
		th[i].join();

	std::cout << "k = " << k << std::endl;

	system("pause");
	return 0;
}

mtxがロックされている間は、他のスレッドは同じ変数(今の例ではk)に干渉できなくなります。mtxのロックが解除されると、他のスレッドは同じ変数に干渉できるようになります。

つまり、複数のスレッドがたまたま同じタイミングで同じ変数を読み込むような事故が起きなくなります。mtxがロックされている間、複数のスレッドが同じ変数にアクセスするようなことはありません。一つの変数に干渉できるのは一つのスレッドだけです。他のスレッドはロックが解除されるまで待機状態になります。

mtxをロックするにはunique_lockのコンストラクタを呼びます。mtxのロックを解除するにはunique_lockのデストラクタを呼びます。デストラクタはスコープを抜けたら自動で呼ばれるので、楽でいいですね。

同期処理

 さて、本題です。

#include <iostream>
#include <thread>
#include <mutex>
#include <vector>
#include <windows.h>

int main() {
	int n = 10;
	std::vector<std::thread> th;

	for (int i = 0; i < n; i++)
		th.push_back(std::thread([=]()
			{
				std::cout << ">";//(1)
				//(2)
				std::cout << "<";//(3)
			}));

	for (int i = 0; i < n; i++)
		th[i].join();

	system("pause");
	return 0;
}

すべてのスレッドが(1)を実行した後に、(2)で同期をとって、(3)を実行してほしいです。が、実際のスレッドは他のスレッドのことなんか知ったこっちゃありません。スレッド同士で足並みを揃えることなく、常に我を通して生きていきます。

僕の環境では><><><><><><><><><><と出力されました。本当は>>>>>>>>>><<<<<<<<<<と出力してほしいです。

(2)で同期処理をするには、condition_variableを使用します。condition_variableで他のスレッドが(2)に到達するまで待機するようにすることで、(2)ですべてのスレッドの足並みを揃えることができます。

#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <vector>
#include <windows.h>

int main() {
	int n = 10;
	std::vector<std::thread> th;

	int N;
	int wait_counter;
	std::mutex mtx;
	std::condition_variable cv;
	N = n;
	wait_counter = 0;

	for (int i = 0; i < n; i++)
		th.push_back(std::thread([=, &N, &wait_counter](std::mutex* mtx, std::condition_variable* cv)
			{
				std::cout << ">";//(1)
				{//(2)
					std::unique_lock<std::mutex> uniq_lk(*mtx);
					wait_counter += 1;
					if (wait_counter == N) cv->notify_all();
					else cv->wait(uniq_lk, [&N, &wait_counter] { return wait_counter == N; });
				}
				std::cout << "<";//(3)
			}, &mtx, &cv));

	for (int i = 0; i < n; i++)
		th[i].join();

	system("pause");
	return 0;
}

unique_lockのあるスコープで、Nwait_counterに対する他のスレッドからの干渉を防いでいます。wait_counterでは待機状態のスレッドの数を数えています。まだすべてのスレッドが待機状態でなければ、cv->wait()ですべてのスレッドが待機状態になるまで待機します。すべてのスレッドが待機状態である場合はcv->notify_all()ですべてのスレッドの待機状態を解除します。待機状態が解除されたスレッドはcv->wait()直後の処理から進めていきます。

 
cv->wait(lock, pred)では内部で以下のような処理になっています。

while(!pred()) {
    wait(lock);
}

wait状態が解除されて、尚且つpred関数がtrueを返すときのみ待機状態を解除します。これはSpurious Wakeupという現象に対応するために必要な処理で、興味のある方は参考文献を見てみてください。簡単にいうと、二重チェックしましょうね、みたいなかんじです。(今回はwait_counter == Nの時に待機状態から抜けます。)
 

同期処理のやり方も分かったので、あとはこれをラップしましょう。

ラップする

 スレッド内で同期処理は以下のように書けます。

    {
		std::unique_lock<std::mutex> uniq_lk(*mtx);
		wait_counter += 1;
		if (wait_counter == N) cv->notify_all();
		else cv->wait(uniq_lk, [&N, &wait_counter] { return wait_counter == N; });
	}

二回目以降の同期処理は、以下のように書けますね。

    {
		std::unique_lock<std::mutex> uniq_lk(*mtx);
        if (wait_counter == N) wait_counter = 0;
		wait_counter += 1;
		if (wait_counter == N) cv->notify_all();
		else cv->wait(uniq_lk, [&N, &wait_counter] { return wait_counter == N; });
	}

ですが、同期処理を書く度にこれをコピペするのは少し面倒です。ラップしてしまいましょう。

プログラム例

 プログラム例です。クラスthread_Mthreadのコンストラクタとjoinをそのままラップしています。注目するのはsync_M関数ですね。スレッド内でsync_M関数を呼ぶと、そこで簡単に同期することができます。

thread_M.hpp
/*

thread_M.hpp

*/

#pragma once
#include <vector>
#include <thread>
#include <mutex>
#include <condition_variable>

class thread_M {

	std::vector<std::thread> th;
	int wait_counter = 0;
	int thread_num = 0;

public:

	std::mutex mtx;
	std::condition_variable cv;

	thread_M(int n) {
		thread_num = n;
		th.resize(n);
	}

	void sync_M() {
		std::unique_lock<std::mutex> uniq_lk(mtx);
		if (wait_counter >= thread_num) wait_counter = 0;
		wait_counter += 1;
		if (wait_counter >= thread_num) cv.notify_all();
		else cv.wait(uniq_lk, [this] { return wait_counter >= thread_num; });
	}

	template <class... Args>
	void new_(int i, Args... args) {
		th[i] = std::thread(args...);
	}

	void join(int n) {
		th[n].join();
	}

};
メソッド名 引数1 引数2 備考
コンストラクタ 同期するスレッド数 / 初期化
sync_M / / thisから参照して使用する
new_ スレッド番号 threadコンストラクタの引数... スレッドを新しく作成
join スレッド番号 / スレッドが終了するまで待機

main.cppは、ベースとなるスレッドとも同期をとる例です。(1)で一回目の同期、(2)で二回目の同期です。同期をとるスレッドの数はコンストラクタの引数と一致する必要があります。

main.cpp
/*

main.cpp

*/

#include <iostream>
#include <windows.h>
#include "thread_M.hpp"

int main() {
	std::cout << "start : base" << std::endl;

	class thread_M obj(11);
	for (int i = 0; i < 10; i++) {
		obj.new_(i, [=](int i, class thread_M* obj)
			{
				std::cout << "start : " << i << std::endl;
				obj->sync_M();//(1)
				std::cout << "ok : " << i << std::endl;
				obj->sync_M();//(2)
				std::cout << "finish : " << i << std::endl;
			}, i, &obj);
	}

	system("pause");
	obj.sync_M();//(1)

	std::cout << "ok : base" << std::endl;

	system("pause");
	obj.sync_M();//(2)

	for (int i = 0; i < 10; i++)
		obj.join(i);

	std::cout << "finish : base" << std::endl;

	system("pause");
	return 0;
}

参考文献

C/C++によるマルチスレッドプログラミング入門 その2
C++11における同期処理(std::mutex, std::unique_guard, std::lock_guard, std::condition_variable)
C++ std::condition_variableについて
C++日本語リファレンス thread
C++日本語リファレンス mutex
C++日本語リファレンス unique_lock
C++日本語リファレンス condition_variable
C++日本語リファレンス 条件変数の利用方法
C++日本語リファレンス 可変引数テンプレート

むすび

 同期処理のラップの話でした。
 

 
2023-12/31 編集
@yaito3014さんにラムダ式のキャプチャについてご指摘いただきました。ありがとうございます。

16
17
4

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
16
17

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?