C++標準threadの同期処理がめんどくさいのでラップする
この記事はアドカレに参加しています。
C++でのマルチスレッド
C++の標準ライブラリを使用してマルチスレッドをするには、thread
をinclude
して以下のようにします。
#include <iostream>
#include <thread>
#include <windows.h>
void func(int i) {
std::cout << i << std::endl;
}
int main() {
//コンストラクタ(関数, 関数に渡す引数...)
std::thread th(func, 0);
//スレッドの終了を待つ
th.join();
system("pause");
return 0;
}
書き方は様々あって、ラムダ式を使えたり、最初のコンストラクタで初期化する必要がなかったりいろいろです。
int main() {
//コンストラクタ(空のオブジェクト)
std::thread th;
//コンストラクタでスレッドを作成して代入(ラムダ式, ラムダ式に渡す引数...)
th = std::thread([=](int i) {std::cout << i << std::endl; }, 0);
//スレッドの終了を待つ
th.join();
system("pause");
return 0;
}
沢山のスレッドを扱うときはvector
を使ったりしますね。
#include <iostream>
#include <thread>
#include <vector>
#include <windows.h>
int main() {
int n = 10;
std::vector<std::thread> th;
for (int i = 0; i < 10; i++)
th.push_back(std::thread([=](int i)
{
std::cout << i;
}, i));
for (int i = 0; i < 10; i++)
th[i].join();
system("pause");
return 0;
}
変数の保護(ミューテックス)
以下のような処理をすると、実行するタイミングや環境によって結果が異なってしまいます。(n=10000ぐらいで実行すると、nの値とkの値が等しくないときがあります。)
int main() {
int n = 10;
std::vector<std::thread> th;
int k = 0;
for (int i = 0; i < n; i++)
th.push_back(std::thread([=](int* k) {*k += 1; }, &k));
for (int i = 0; i < n; i++)
th[i].join();
std::cout << "k = " << k << std::endl;
system("pause");
return 0;
}
このような書き方では、複数のスレッドがたまたま同じタイミングで*k
の値を読み込んで*k+=1
とした場合にk
の値が本来意図する値よりも小さくなってしまいます。
これを防ぐには、排他制御と呼ばれる処理をする必要があります。C++標準ヘッダーのmutex
を使用します。
#include <iostream>
#include <thread>
#include <mutex>
#include <vector>
#include <windows.h>
int main() {
int n = 10;
std::vector<std::thread> th;
int k = 0;
std::mutex mtx;
for (int i = 0; i < n; i++)
th.push_back(std::thread([=](int* k, std::mutex* mtx)
{
std::unique_lock<std::mutex> uniq_lk(*mtx);
*k += 1;
}, &k, &mtx));
for (int i = 0; i < n; i++)
th[i].join();
std::cout << "k = " << k << std::endl;
system("pause");
return 0;
}
mtx
がロックされている間は、他のスレッドは同じ変数(今の例ではk
)に干渉できなくなります。mtx
のロックが解除されると、他のスレッドは同じ変数に干渉できるようになります。
つまり、複数のスレッドがたまたま同じタイミングで同じ変数を読み込むような事故が起きなくなります。mtx
がロックされている間、複数のスレッドが同じ変数にアクセスするようなことはありません。一つの変数に干渉できるのは一つのスレッドだけです。他のスレッドはロックが解除されるまで待機状態になります。
mtx
をロックするにはunique_lock
のコンストラクタを呼びます。mtx
のロックを解除するにはunique_lock
のデストラクタを呼びます。デストラクタはスコープを抜けたら自動で呼ばれるので、楽でいいですね。
同期処理
さて、本題です。
#include <iostream>
#include <thread>
#include <mutex>
#include <vector>
#include <windows.h>
int main() {
int n = 10;
std::vector<std::thread> th;
for (int i = 0; i < n; i++)
th.push_back(std::thread([=]()
{
std::cout << ">";//(1)
//(2)
std::cout << "<";//(3)
}));
for (int i = 0; i < n; i++)
th[i].join();
system("pause");
return 0;
}
すべてのスレッドが(1)
を実行した後に、(2)
で同期をとって、(3)
を実行してほしいです。が、実際のスレッドは他のスレッドのことなんか知ったこっちゃありません。スレッド同士で足並みを揃えることなく、常に我を通して生きていきます。
僕の環境では><><><><><><><><><><
と出力されました。本当は>>>>>>>>>><<<<<<<<<<
と出力してほしいです。
(2)
で同期処理をするには、condition_variable
を使用します。condition_variable
で他のスレッドが(2)
に到達するまで待機するようにすることで、(2)
ですべてのスレッドの足並みを揃えることができます。
#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <vector>
#include <windows.h>
int main() {
int n = 10;
std::vector<std::thread> th;
int N;
int wait_counter;
std::mutex mtx;
std::condition_variable cv;
N = n;
wait_counter = 0;
for (int i = 0; i < n; i++)
th.push_back(std::thread([=, &N, &wait_counter](std::mutex* mtx, std::condition_variable* cv)
{
std::cout << ">";//(1)
{//(2)
std::unique_lock<std::mutex> uniq_lk(*mtx);
wait_counter += 1;
if (wait_counter == N) cv->notify_all();
else cv->wait(uniq_lk, [&N, &wait_counter] { return wait_counter == N; });
}
std::cout << "<";//(3)
}, &mtx, &cv));
for (int i = 0; i < n; i++)
th[i].join();
system("pause");
return 0;
}
unique_lock
のあるスコープで、N
とwait_counter
に対する他のスレッドからの干渉を防いでいます。wait_counter
では待機状態のスレッドの数を数えています。まだすべてのスレッドが待機状態でなければ、cv->wait()
ですべてのスレッドが待機状態になるまで待機します。すべてのスレッドが待機状態である場合はcv->notify_all()
ですべてのスレッドの待機状態を解除します。待機状態が解除されたスレッドはcv->wait()
直後の処理から進めていきます。
cv->wait(lock, pred)
では内部で以下のような処理になっています。
while(!pred()) {
wait(lock);
}
wait
状態が解除されて、尚且つpred
関数がtrue
を返すときのみ待機状態を解除します。これはSpurious Wakeupという現象に対応するために必要な処理で、興味のある方は参考文献を見てみてください。簡単にいうと、二重チェックしましょうね、みたいなかんじです。(今回はwait_counter == N
の時に待機状態から抜けます。)
同期処理のやり方も分かったので、あとはこれをラップしましょう。
ラップする
スレッド内で同期処理は以下のように書けます。
{
std::unique_lock<std::mutex> uniq_lk(*mtx);
wait_counter += 1;
if (wait_counter == N) cv->notify_all();
else cv->wait(uniq_lk, [&N, &wait_counter] { return wait_counter == N; });
}
二回目以降の同期処理は、以下のように書けますね。
{
std::unique_lock<std::mutex> uniq_lk(*mtx);
if (wait_counter == N) wait_counter = 0;
wait_counter += 1;
if (wait_counter == N) cv->notify_all();
else cv->wait(uniq_lk, [&N, &wait_counter] { return wait_counter == N; });
}
ですが、同期処理を書く度にこれをコピペするのは少し面倒です。ラップしてしまいましょう。
プログラム例
プログラム例です。クラスthread_M
はthread
のコンストラクタとjoin
をそのままラップしています。注目するのはsync_M
関数ですね。スレッド内でsync_M
関数を呼ぶと、そこで簡単に同期することができます。
/*
thread_M.hpp
*/
#pragma once
#include <vector>
#include <thread>
#include <mutex>
#include <condition_variable>
class thread_M {
std::vector<std::thread> th;
int wait_counter = 0;
int thread_num = 0;
public:
std::mutex mtx;
std::condition_variable cv;
thread_M(int n) {
thread_num = n;
th.resize(n);
}
void sync_M() {
std::unique_lock<std::mutex> uniq_lk(mtx);
if (wait_counter >= thread_num) wait_counter = 0;
wait_counter += 1;
if (wait_counter >= thread_num) cv.notify_all();
else cv.wait(uniq_lk, [this] { return wait_counter >= thread_num; });
}
template <class... Args>
void new_(int i, Args... args) {
th[i] = std::thread(args...);
}
void join(int n) {
th[n].join();
}
};
メソッド名 | 引数1 | 引数2 | 備考 |
---|---|---|---|
コンストラクタ | 同期するスレッド数 | / | 初期化 |
sync_M | / | / | thisから参照して使用する |
new_ | スレッド番号 | threadコンストラクタの引数... | スレッドを新しく作成 |
join | スレッド番号 | / | スレッドが終了するまで待機 |
main.cpp
は、ベースとなるスレッドとも同期をとる例です。(1)
で一回目の同期、(2)
で二回目の同期です。同期をとるスレッドの数はコンストラクタの引数と一致する必要があります。
/*
main.cpp
*/
#include <iostream>
#include <windows.h>
#include "thread_M.hpp"
int main() {
std::cout << "start : base" << std::endl;
class thread_M obj(11);
for (int i = 0; i < 10; i++) {
obj.new_(i, [=](int i, class thread_M* obj)
{
std::cout << "start : " << i << std::endl;
obj->sync_M();//(1)
std::cout << "ok : " << i << std::endl;
obj->sync_M();//(2)
std::cout << "finish : " << i << std::endl;
}, i, &obj);
}
system("pause");
obj.sync_M();//(1)
std::cout << "ok : base" << std::endl;
system("pause");
obj.sync_M();//(2)
for (int i = 0; i < 10; i++)
obj.join(i);
std::cout << "finish : base" << std::endl;
system("pause");
return 0;
}
参考文献
・C/C++によるマルチスレッドプログラミング入門 その2
・C++11における同期処理(std::mutex, std::unique_guard, std::lock_guard, std::condition_variable)
・C++ std::condition_variableについて
・C++日本語リファレンス thread
・C++日本語リファレンス mutex
・C++日本語リファレンス unique_lock
・C++日本語リファレンス condition_variable
・C++日本語リファレンス 条件変数の利用方法
・C++日本語リファレンス 可変引数テンプレート
むすび
同期処理のラップの話でした。
2023-12/31 編集
@yaito3014さんにラムダ式のキャプチャについてご指摘いただきました。ありがとうございます。