この記事の対象読者
私は「順番を保ったロックオブジェクト」を作ることにしました。その奮闘記に興味のある方向けの記事です。
今回は読者様から「MemoryLoan
なんて、訳が判らないクラスを使うより、std:: condition_variable
を使った方がいいんじゃないの?」という提案を頂き、使い方のサンプルコードも提供して下さり、いやーこんなことできるのかーと思い、挑戦した記事です。なんと、アセンブラでプログラムを書かれる事もある凄腕の方で、駆け出しライターの記事にわざわざコメントをして下さり、ありがたい思いです。また、5000スレッドのロックをご所望の様ですので、これまでに作成したOrderedLock
クラスも5000スレッド対応に改良しました。
また、ベンチマークも行っており、方式の違いによる速度の違いを共有できれば幸いです。
本文
順番を保障したロックオブジェクトとは
std::mutex mtx;
void add_value(int value){
std::lock_guard<std::mutex> lock(mtx);
// 何か作業をする。
}
上記のコードは1つのスレッドだけ通すコードです。後から来たスレッドはstd::lock_guard<std::mutex> lock(mtx)
で待たされる事になります。
この時、最初に待たされたスレッドが、次にmutexの所有権を持てそうな気しますが、実は、次にmutexを所有できるスレッドの順番は保障されていません。 Windowsでは、CriticalSection
がXPまでは、順番が保障されていましたが、現在は保障されていません。
ですから、その機能が欲しい場合、手作りするしかなさそうです。webで検索してもそういうのは見つかりませんでした。
1回から3回までの実験を紹介します。
この記事で使うソースコードへのリンク
GitHubへのリンクはここです。Visual Studio 2022用に設定されたslnファイルもあります。
TestProjectをスタートアッププロジェクトに設定し、ソリューションエクスプローラーからTestOrdered2.cppを選択し、プロパティの設定で全般->ビルドから除外項目をいいえに設定し、TestOrdered2.cpp以外ははいに設定し、ターゲットCPUをx64に設定し、F5
を押下すると実行できます。
また、記事中でいくつかのソースファイルを使いますが、その都度そのファイルを全般->ビルドから除外 の設定を変更して、一つのファイルをビルド対象にしていただければ、ビルド可能になります。
ソースコードの中にはデバッグ用のライブラリ、また、表示のギャップ時間を計測するクラスOrCout
も含んでいます。本質ではない為、今回は説明を割愛いたします。
この記事で紹介しているソースコードは、公開した時点から変更を加えている事があります。そのため、元の記事とは異なる結果を得る場合があります。また、ソースコードを機能別にディレクトリを分ける等の、改善を行う可能性があります。
../CommonLib/ordered_lock_cvクラスの紹介
std::condition_variable
と、std::queue<std::thread::id>
を使い、次にブロックを解除するスレッドを選択する方式のクラスです。ソースコードは以下の様になっています。非常にコンパクトです。しかもWindowsに依存しません。
../CommonLib/ordered_lock_cv.h
#include <memory>
#include <mutex>
#include <atomic>
#include <exception>
#include <condition_variable>
#include <thread>
#include <queue>
#pragma once
class ordered_lock_cv{
static constexpr unsigned NUM_LOCKS = 0x8000;
public:
ordered_lock_cv();
ordered_lock_cv(const ordered_lock_cv&) = delete;
ordered_lock_cv& operator=(const ordered_lock_cv&) = delete;
ordered_lock_cv& operator()(const ordered_lock_cv&) = delete;
ordered_lock_cv(ordered_lock_cv&&)noexcept = delete;
ordered_lock_cv& operator=(ordered_lock_cv&&)noexcept = delete;
ordered_lock_cv& operator()(ordered_lock_cv&&)noexcept = delete;
~ordered_lock_cv();
void Lock();
void UnLock();
private:
std::mutex mtx;
std::mutex mtx_guard;
std::condition_variable cv;
std::queue<std::thread::id> que_ids;
std::thread::id target_id;
bool is_locked{ false };
void (* const p_push_and_acquire)(ordered_lock_cv* pthis, std::thread::id id, bool* pb);
void (* const p_pop_and__notification)(ordered_lock_cv* pthis);
};
../CommonLib/ordered_lock_cv.cpp
#include "ordered_lock_cv.h"
ordered_lock_cv::ordered_lock_cv():
p_push_and_acquire{ [](
ordered_lock_cv* pthis
, std::thread::id id
,bool* pb){
std::lock_guard < std::mutex> lk(pthis->mtx_guard);
if( !pthis->is_locked ){
pthis->is_locked = true;
*pb = true;
} else{
*pb = false;
pthis->que_ids.push(id);
}
} }
, p_pop_and__notification{ [](ordered_lock_cv* pthis){
std::lock_guard < std::mutex> lk(pthis->mtx_guard);
if( pthis->que_ids.size() ){
pthis->target_id = pthis->que_ids.front();
pthis->que_ids.pop();
pthis->cv.notify_all();
return;
} else{
pthis->is_locked = false;
}
} }
{
}
ordered_lock_cv::~ordered_lock_cv(){
}
void ordered_lock_cv::Lock(){
std::thread::id id = std::this_thread::get_id();
bool is_acquire{ false };
{
std::thread thread(p_push_and_acquire, this, id ,&is_acquire);
thread.join();
}
if( is_acquire ){
return;
}else {
std::unique_lock<std::mutex> mtx_lk(mtx);
cv.wait(mtx_lk, [this, id]{return id == target_id; });
}
return;
}
void ordered_lock_cv::UnLock(){
std::thread thread(p_pop_and__notification, this);
thread.join();
}
パブリックメンバの解説
void ordered_lock_cv::Lock()
一つのスレッドだけ通します。std::lock_guard
と同じように使いますが、待たせたスレッドの順番を維持します。
void ordered_lock_cv::UnLock()
ロックを解放します。次のスレッドのブロッキングが解除され、Lock()メンバの次の箇所からスレッドが走ります。
../CommonLib/OrderedLockクラスの紹介
で、解説したクラスです。大量のロックを抱えると、デッドロックが、発生する事が判り、修正しました。骨格となるフローチャートはそのままで、Lock
、UnLock
の処理をAPCキューによって行うように3スレッド体制にし、デッドロックしないようにしました。さらに、5000を超えるスレッドをロックできる仕様にしました。
ソースコードは以下の様になります。これもシンプルなコードになっています。
../CommonLib/OrderedLock.h
#include <Windows.h>
#include <process.h>
#include <memory>
#include <exception>
#include "../CommonLib/MemoryLoan.h"
#include "../Debug_fnc/debug_fnc.h"
#pragma comment(lib, "../Debug_fnc/" STRINGIZE($CONFIGURATION) "/Debug_fnc-" STRINGIZE($CONFIGURATION) ".lib")
#pragma once
class OrderedLock{
static constexpr DWORD NUM_LOCKS = 0x4000;
public:
OrderedLock();
OrderedLock(const OrderedLock&) = delete;
OrderedLock& operator=(const OrderedLock&) = delete;
OrderedLock& operator()(const OrderedLock&) = delete;
OrderedLock(OrderedLock&&)noexcept = delete;
OrderedLock& operator=(OrderedLock&&)noexcept = delete;
OrderedLock& operator()(OrderedLock&&)noexcept = delete;
~OrderedLock();
void Lock();
void UnLock();
private:
struct bucket{
bucket();
bucket(const bucket&) = delete;
bucket(bucket&&) = delete;
bucket& operator =(const bucket&) = delete;
bucket& operator =(bucket&&)noexcept = delete;
bucket& operator ()(const bucket&) = delete;
bucket& operator ()(bucket&&)noexcept = delete;
OrderedLock* self{};
std::unique_ptr<std::remove_pointer_t< HANDLE>, decltype(CloseHandle)*> hEvent;
} *__pBucket;
MemoryLoan<bucket> __mlBuckets;
PAPCFUNC const __pAPCLock;
PAPCFUNC const __pAPCUnLock;
_beginthreadex_proc_type const __pThreadOperationProc;
_beginthreadex_proc_type const __pThreadWorkerProc;
std::unique_ptr<std::remove_pointer_t<HANDLE>, decltype(CloseHandle)*> __hEvWorkerGate;
std::unique_ptr<std::remove_pointer_t<HANDLE>, decltype(CloseHandle)*> __hEvEndWorkerThread;
std::unique_ptr<std::remove_pointer_t<HANDLE>, decltype(CloseHandle)*> __hEvEndOpThread;
HANDLE __hThreadHost;
HANDLE __hThreadOp;
bucket* __pCurrentBucket{};
};
../CommonLib/OrderedLock.cpp
#include "OrderedLock.h"
OrderedLock::OrderedLock():
__pBucket{ new bucket[NUM_LOCKS]}
,__mlBuckets(__pBucket, NUM_LOCKS)
,__hEvEndWorkerThread{[](){
HANDLE h;
if( !(h = CreateEvent(NULL, TRUE, FALSE, NULL)) ){
throw std::exception(_MES("CreateEvent").c_str());
} return h; }(), CloseHandle }
, __hEvEndOpThread{ [](){
HANDLE h;
if( !(h = CreateEvent(NULL, TRUE, FALSE, NULL)) ){
throw std::exception(_MES("CreateEvent").c_str());
} return h; }(), CloseHandle }
, __hEvWorkerGate{ [](){
HANDLE h;
if( !(h = CreateEvent(NULL, TRUE, FALSE, NULL)) ){
throw std::exception(_MES("CreateEvent").c_str());
} return h; }(), CloseHandle }
, __pAPCLock{ [](ULONG_PTR Parameter){
bucket *pBucket = reinterpret_cast<bucket*>(Parameter);
pBucket->self->__pCurrentBucket = pBucket;
ResetEvent(pBucket->self->__hEvWorkerGate.get());
SetEvent(pBucket->hEvent.get());
WaitForSingleObject(pBucket->self->__hEvWorkerGate.get(), INFINITE);
} }
, __pAPCUnLock{ [](ULONG_PTR Parameter){
bucket* pBucket = reinterpret_cast<bucket*>(Parameter);
pBucket->self->__mlBuckets.Return(pBucket);
SetEvent(pBucket->self->__hEvWorkerGate.get());
} }
, __pThreadOperationProc{ [](LPVOID pvoid)->unsigned{
OrderedLock* pThis = reinterpret_cast<OrderedLock*>(pvoid);
for( ;;){
DWORD dw = ::WaitForSingleObjectEx(pThis->__hEvEndOpThread.get(), INFINITE, TRUE);
switch( dw ){
case WAIT_IO_COMPLETION:
{
continue;
}
case WAIT_OBJECT_0:
{
_D("OperationProc end.");
return 0;
}
default:
throw std::exception(_MES("__pThreadOperationProc").c_str());
}
}
} }
, __pThreadWorkerProc{ [](LPVOID pvoid)->unsigned{
OrderedLock* pThis = reinterpret_cast<OrderedLock*>(pvoid);
for( ;;){
DWORD dw = ::WaitForSingleObjectEx(pThis->__hEvEndWorkerThread.get(), INFINITE, TRUE);
switch( dw ){
case WAIT_IO_COMPLETION:
{
continue;
}
case WAIT_OBJECT_0:
{
_D("WorkerProc end.");
return 0;
}
default:
throw std::exception(_MES("__pThreadWorkerProc").c_str());
}
}
} }
{
if( !(__hThreadOp = (HANDLE)_beginthreadex(
NULL
, 0
, __pThreadOperationProc
, this
, 0
, NULL)) ){
throw std::exception(_MES("CreateThread").c_str());
};
if( !(__hThreadHost = (HANDLE)_beginthreadex(
NULL
, 0
, __pThreadWorkerProc
, this
, 0
, NULL)) ){
throw std::exception(_MES("CreateThread").c_str());
};
}
OrderedLock::~OrderedLock(){
::SetEvent(__hEvEndWorkerThread.get());
::SetEvent(__hEvEndOpThread.get());
::WaitForSingleObject(__hThreadHost, INFINITE);
::CloseHandle(__hThreadHost);
::WaitForSingleObject(__hThreadOp, INFINITE);
::CloseHandle(__hThreadOp);
delete[]__pBucket;
}
void OrderedLock::Lock(){
bucket* pBucket = __mlBuckets.Lend();
pBucket->self = this;
::ResetEvent(pBucket->hEvent.get());
::QueueUserAPC(__pAPCLock, __hThreadHost, (ULONG_PTR)pBucket);
::WaitForSingleObject(pBucket->hEvent.get(),INFINITE);
return ;
}
void OrderedLock::UnLock(){
::QueueUserAPC(__pAPCUnLock, __hThreadOp, (ULONG_PTR)__pCurrentBucket);
}
OrderedLock::bucket::bucket():
hEvent{ [](){HANDLE h; if( !(h = CreateEvent(NULL,TRUE,FALSE,NULL)) ){
throw std::exception(ENOut(GetLastError()).c_str());} return h;}()
,CloseHandle }
{}
パブリックメンバ
パブリックメンバはordered_lock_cv
クラスと同じ、Lock
、UnLock
です。この為、次のベンチマークで簡単に切り替える事が出来ます。
ベンチマーク
2つの方式の順番を保ったロッククラスのベンチマークが取れるコードを用意しました。この2つのクラスのインターフェースは揃えてありますので、
TestOrderedCout2.cpp
// using lock_class = OrderedLock;// 1
using lock_class = ordered_lock_cv; // 2
の、using
行のどちらかをアンコメントすると、使用するクラスを変更できます。以下のような条件にしました。
- 5000スレッドで行う
- 一つのスレッドにつき、3回
Lock
、UnLock
を行う - リリースビルドで行う
- コマンドプロンプトから単独で実行する
- /O2、/Ob2、/Oiオプションを使う
さて、どうなるのでしょう。
ベンチマークの結果
ordered_lock_cvクラス
thread4953 3 times total pass count 14998 0.362 msec
thread4908 3 times total pass count 14999 0.354 msec
thread873 3 times total pass count 15000 0.354 msec
Main:total elapsed time: 141268.956 msec
OrderedLockクラス
thread2277 3 times total pass count 14998 0.375 msec
thread2282 3 times total pass count 14999 0.413 msec
thread2226 3 times total pass count 15000 0.377 msec
Main:total elapsed time: 7067.092 msec
・・・結構差が出ました。ピークのメモリー使用量もordered_lock_cv
クラスの方が、倍近く多い事が判りました。OrderedLock
のメモリーの使用推移を見ると、最初にドカンときて、後は凄く少なくなっています。多分ですが、APCのキューのメモリーじゃないかと予想しています。実際のところ、メモリー使用量が少なくなった時点でキューは空っぽになっていると思われます。後の終了までの時間は、表示に時間が取られているのでしょうね。ギャップタイムに違いが無いのになぜ?と、お思いかもしれませんので、2 timesあたりのギャップタイムを見てみましょう。
ordered_lock_cvクラスの2times辺りのギャップタイム
thread85 2 times total pass count 9980 3.200 msec
thread3289 2 times total pass count 9981 16.652 msec
thread3538 2 times total pass count 9982 7.994 msec
OrderedLockクラスの2times辺りのギャップタイム
thread490 2 times total pass count 9586 0.365 msec
thread489 2 times total pass count 9587 0.357 msec
thread491 2 times total pass count 9588 2.221 msec
ordered_lock_cv
クラスのギャップタイムが、大きいのが判ります。おそらくnofity_all
の時に、比較対象が多くて処理に時間がかかっていて、3times目の終盤になると、比較対象が、少なくなり時間が掛からなくなっていると考えられます。
また、ordered_lock_cv
は、Lock
、UnLock
する度に、オペレーション用スレッドを作るのですが、この処理をCPUが立て込んでいる時に行うと、更に時間が掛かるのでしょう。
一方、OrderedLockクラスはキューから読みだして、対象のイベントオブジェクトをセットするだけですから、割と時間は掛かっていません。Lock
、UnLock
時のオペレーションも、APC機能を利用しますので、新たにスレッドを立ち上げたりはしません。この辺りは以下のリンクも参照して頂けると、理解が深まると思います。
むしろ、OrderedCOutクラスが時間を食っているようにも思えます。
まとめ
- 読者の凄腕プログラマの方から
condition_variable
を使ったらどうかとアドバイスがあり、condition_variable
のLinuxでも動かせる使用例を、示して頂いた - なるほど、こんな仕掛けで出来るのか!と、思い興味を持った
- 取り掛かるも、
wait
とnotify
のタイミング調整のアイディアが思い浮かばず、時間が過ぎて行った - 5000スレッドの処理も、デッドロックがランダムなタイミングで起きる事が判った
- Windowプロシージャでのデッドロック回避手順を取り入れ、不具合解消を図り、意図した動作をするようになった
-
ordered_lock_cv
クラスもWindows.hを外せて、ポータビリティが向上した - 処理速度を計測してみると、
condition_variable
は遅い事が判った -
notify_all
の対象先が多くて時間が掛かっているものと思われる - メモリのピーク使用量も
condition_variable
の方が、多い事が判った
終わりに
「[C++][Windows][mutex]順番を保障したlockクラスを作る4読者様からのご提案 condition_variable編」の解説は以上となります。この記事が皆様の閃きや発想のきっかけになりましたら幸いです。
また、ご意見、ご感想、ご質問など、お待ちしております。