0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

[C++][Windows][mutex]順番を保障したlockクラスを作る4読者様からのご提案 condition_variable編

Posted at

この記事の対象読者

 私は「順番を保ったロックオブジェクト」を作ることにしました。その奮闘記に興味のある方向けの記事です。
 今回は読者様から「MemoryLoanなんて、訳が判らないクラスを使うより、std:: condition_variableを使った方がいいんじゃないの?」という提案を頂き、使い方のサンプルコードも提供して下さり、いやーこんなことできるのかーと思い、挑戦した記事です。なんと、アセンブラでプログラムを書かれる事もある凄腕の方で、駆け出しライターの記事にわざわざコメントをして下さり、ありがたい思いです。また、5000スレッドのロックをご所望の様ですので、これまでに作成したOrderedLockクラスも5000スレッド対応に改良しました。
 また、ベンチマークも行っており、方式の違いによる速度の違いを共有できれば幸いです。

本文

順番を保障したロックオブジェクトとは

.cpp
std::mutex mtx;
void add_value(int value){
    std::lock_guard<std::mutex> lock(mtx);
    // 何か作業をする。
}

 上記のコードは1つのスレッドだけ通すコードです。後から来たスレッドはstd::lock_guard<std::mutex> lock(mtx)で待たされる事になります。
 この時、最初に待たされたスレッドが、次にmutexの所有権を持てそうな気しますが、実は、次にmutexを所有できるスレッドの順番は保障されていません。 Windowsでは、CriticalSectionがXPまでは、順番が保障されていましたが、現在は保障されていません。
 ですから、その機能が欲しい場合、手作りするしかなさそうです。webで検索してもそういうのは見つかりませんでした。
 1回から3回までの実験を紹介します。

この記事で使うソースコードへのリンク

GitHubへのリンクはここです。Visual Studio 2022用に設定されたslnファイルもあります。
 TestProjectをスタートアッププロジェクトに設定し、ソリューションエクスプローラーからTestOrdered2.cppを選択し、プロパティの設定で全般->ビルドから除外項目をいいえに設定し、TestOrdered2.cpp以外はいに設定し、ターゲットCPUをx64に設定し、F5を押下すると実行できます。

TestOrdered2.png

 また、記事中でいくつかのソースファイルを使いますが、その都度そのファイルを全般->ビルドから除外 の設定を変更して、一つのファイルをビルド対象にしていただければ、ビルド可能になります。
 ソースコードの中にはデバッグ用のライブラリ、また、表示のギャップ時間を計測するクラスOrCoutも含んでいます。本質ではない為、今回は説明を割愛いたします。

 この記事で紹介しているソースコードは、公開した時点から変更を加えている事があります。そのため、元の記事とは異なる結果を得る場合があります。また、ソースコードを機能別にディレクトリを分ける等の、改善を行う可能性があります。

../CommonLib/ordered_lock_cvクラスの紹介

 std::condition_variableと、std::queue<std::thread::id>を使い、次にブロックを解除するスレッドを選択する方式のクラスです。ソースコードは以下の様になっています。非常にコンパクトです。しかもWindowsに依存しません。

../CommonLib/ordered_lock_cv.h

ordered_lock_cv.h
#include <memory>
#include <mutex>
#include <atomic>
#include <exception>
#include <condition_variable>
#include <thread>
#include <queue>

#pragma once
class ordered_lock_cv{
	static constexpr unsigned NUM_LOCKS = 0x8000;
public:
	ordered_lock_cv();
	ordered_lock_cv(const ordered_lock_cv&) = delete;
	ordered_lock_cv& operator=(const ordered_lock_cv&) = delete;
	ordered_lock_cv& operator()(const ordered_lock_cv&) = delete;
	ordered_lock_cv(ordered_lock_cv&&)noexcept = delete;
	ordered_lock_cv& operator=(ordered_lock_cv&&)noexcept = delete;
	ordered_lock_cv& operator()(ordered_lock_cv&&)noexcept = delete;
	~ordered_lock_cv();
	void Lock();
	void UnLock();
private:
	std::mutex mtx;
	std::mutex mtx_guard;
	std::condition_variable cv;
	std::queue<std::thread::id> que_ids;
	std::thread::id target_id;
	bool is_locked{ false };
	void (* const p_push_and_acquire)(ordered_lock_cv* pthis, std::thread::id id, bool* pb);
	void (* const p_pop_and__notification)(ordered_lock_cv* pthis);
};

../CommonLib/ordered_lock_cv.cpp

../CommonLib/ordered_lock_cv.cpp
#include "ordered_lock_cv.h"

ordered_lock_cv::ordered_lock_cv():

	 p_push_and_acquire{ [](
		ordered_lock_cv* pthis
		, std::thread::id id
		,bool* pb){
		
		std::lock_guard < std::mutex> lk(pthis->mtx_guard);
		if( !pthis->is_locked ){
			pthis->is_locked = true;
			*pb = true;
		} else{
			*pb = false;
			pthis->que_ids.push(id);
		}
	
	} }

	, p_pop_and__notification{ [](ordered_lock_cv* pthis){
		std::lock_guard < std::mutex> lk(pthis->mtx_guard);
		if( pthis->que_ids.size() ){
			pthis->target_id = pthis->que_ids.front();
			pthis->que_ids.pop();
			pthis->cv.notify_all();
			return;
		} else{
			pthis->is_locked = false;
		}
	} }
{
}

ordered_lock_cv::~ordered_lock_cv(){
}

void ordered_lock_cv::Lock(){
	std::thread::id id = std::this_thread::get_id();
	bool is_acquire{ false };
	{
		std::thread thread(p_push_and_acquire, this, id ,&is_acquire);
		thread.join();
	}
	if( is_acquire ){
		return;
	}else	{
		std::unique_lock<std::mutex> mtx_lk(mtx);
		cv.wait(mtx_lk, [this, id]{return id == target_id; });
	}
	return;
}

void ordered_lock_cv::UnLock(){
	std::thread thread(p_pop_and__notification, this);
	thread.join();
}

パブリックメンバの解説

void ordered_lock_cv::Lock()

 一つのスレッドだけ通します。std::lock_guardと同じように使いますが、待たせたスレッドの順番を維持します。

void ordered_lock_cv::UnLock()

 ロックを解放します。次のスレッドのブロッキングが解除され、Lock()メンバの次の箇所からスレッドが走ります。

../CommonLib/OrderedLockクラスの紹介

で、解説したクラスです。大量のロックを抱えると、デッドロックが、発生する事が判り、修正しました。骨格となるフローチャートはそのままで、LockUnLockの処理をAPCキューによって行うように3スレッド体制にし、デッドロックしないようにしました。さらに、5000を超えるスレッドをロックできる仕様にしました。

 ソースコードは以下の様になります。これもシンプルなコードになっています。

../CommonLib/OrderedLock.h

../CommonLib/OrderedLock.h
#include <Windows.h>
#include <process.h>
#include <memory>
#include <exception>
#include "../CommonLib/MemoryLoan.h"
#include "../Debug_fnc/debug_fnc.h"
#pragma comment(lib,  "../Debug_fnc/" STRINGIZE($CONFIGURATION) "/Debug_fnc-" STRINGIZE($CONFIGURATION) ".lib")

#pragma once
class OrderedLock{
	static constexpr DWORD NUM_LOCKS = 0x4000;
public:
	OrderedLock();
	OrderedLock(const OrderedLock&) = delete;
	OrderedLock& operator=(const OrderedLock&) = delete;
	OrderedLock& operator()(const OrderedLock&) = delete;
	OrderedLock(OrderedLock&&)noexcept = delete;
	OrderedLock& operator=(OrderedLock&&)noexcept = delete;
	OrderedLock& operator()(OrderedLock&&)noexcept = delete;
	~OrderedLock();
	void Lock();
	void UnLock();
private:
	struct bucket{
		bucket();
		bucket(const bucket&) = delete;
		bucket(bucket&&) = delete;
		bucket& operator =(const bucket&) = delete;
		bucket& operator =(bucket&&)noexcept = delete;
		bucket& operator ()(const	bucket&) = delete;
		bucket& operator ()(bucket&&)noexcept = delete;
		OrderedLock* self{};
		std::unique_ptr<std::remove_pointer_t< HANDLE>, decltype(CloseHandle)*> hEvent;
	}	*__pBucket;
	MemoryLoan<bucket> __mlBuckets;
	PAPCFUNC const __pAPCLock;
	PAPCFUNC const __pAPCUnLock;
	_beginthreadex_proc_type const __pThreadOperationProc;
	_beginthreadex_proc_type const __pThreadWorkerProc;
	std::unique_ptr<std::remove_pointer_t<HANDLE>, decltype(CloseHandle)*> __hEvWorkerGate;
	std::unique_ptr<std::remove_pointer_t<HANDLE>, decltype(CloseHandle)*> __hEvEndWorkerThread;
	std::unique_ptr<std::remove_pointer_t<HANDLE>, decltype(CloseHandle)*> __hEvEndOpThread;

	HANDLE  __hThreadHost;
	HANDLE __hThreadOp;
	bucket* __pCurrentBucket{};
};

../CommonLib/OrderedLock.cpp

../CommonLib/OrderedLock.cpp
#include "OrderedLock.h"

OrderedLock::OrderedLock():

	__pBucket{ new bucket[NUM_LOCKS]}

	,__mlBuckets(__pBucket, NUM_LOCKS)

	,__hEvEndWorkerThread{[](){
		HANDLE h;
		if( !(h = CreateEvent(NULL, TRUE, FALSE, NULL)) ){
			throw std::exception(_MES("CreateEvent").c_str());
		} return h; }(), CloseHandle }

	, __hEvEndOpThread{ [](){
		HANDLE h;
		if( !(h = CreateEvent(NULL, TRUE, FALSE, NULL)) ){
			throw std::exception(_MES("CreateEvent").c_str());
		} return h; }(), CloseHandle }

	, __hEvWorkerGate{ [](){
		HANDLE h;
		if( !(h = CreateEvent(NULL, TRUE, FALSE, NULL)) ){
			throw std::exception(_MES("CreateEvent").c_str());
		} return h; }(), CloseHandle }

	, __pAPCLock{ [](ULONG_PTR Parameter){
		bucket *pBucket = reinterpret_cast<bucket*>(Parameter);
		pBucket->self->__pCurrentBucket = pBucket;
		ResetEvent(pBucket->self->__hEvWorkerGate.get());
		SetEvent(pBucket->hEvent.get());
		WaitForSingleObject(pBucket->self->__hEvWorkerGate.get(), INFINITE);
	} }

	, __pAPCUnLock{ [](ULONG_PTR Parameter){
		bucket* pBucket = reinterpret_cast<bucket*>(Parameter);
		pBucket->self->__mlBuckets.Return(pBucket);
		SetEvent(pBucket->self->__hEvWorkerGate.get());
	} }

	, __pThreadOperationProc{ [](LPVOID pvoid)->unsigned{

		OrderedLock* pThis = reinterpret_cast<OrderedLock*>(pvoid);
		for( ;;){
			DWORD dw = ::WaitForSingleObjectEx(pThis->__hEvEndOpThread.get(), INFINITE, TRUE);
			switch( dw ){
				case WAIT_IO_COMPLETION:
				{
					continue;
				}
				case WAIT_OBJECT_0:
				{
					_D("OperationProc end.");
					return 0;
				}
				default:
					throw std::exception(_MES("__pThreadOperationProc").c_str());
			}
		}
	} }

	, __pThreadWorkerProc{ [](LPVOID pvoid)->unsigned{

		OrderedLock* pThis = reinterpret_cast<OrderedLock*>(pvoid);

		for( ;;){
			DWORD dw = ::WaitForSingleObjectEx(pThis->__hEvEndWorkerThread.get(), INFINITE, TRUE);
			switch( dw ){
				case WAIT_IO_COMPLETION:
				{
					continue;
				}
				case WAIT_OBJECT_0:
				{
					_D("WorkerProc end.");
					return 0;
				}
				default:
					throw std::exception(_MES("__pThreadWorkerProc").c_str());
			}
		}
	} }
{
	if( !(__hThreadOp = (HANDLE)_beginthreadex(
		NULL
		, 0
		, __pThreadOperationProc
		, this
		, 0
		, NULL)) ){
		throw std::exception(_MES("CreateThread").c_str());
	};

	if( !(__hThreadHost = (HANDLE)_beginthreadex(
		NULL
		, 0
		, __pThreadWorkerProc
		, this
		, 0
		, NULL)) ){
		throw std::exception(_MES("CreateThread").c_str());
	};
}

OrderedLock::~OrderedLock(){
	::SetEvent(__hEvEndWorkerThread.get());
	::SetEvent(__hEvEndOpThread.get());
	::WaitForSingleObject(__hThreadHost, INFINITE);
	::CloseHandle(__hThreadHost);
	::WaitForSingleObject(__hThreadOp, INFINITE);
	::CloseHandle(__hThreadOp);
	delete[]__pBucket;
}

void OrderedLock::Lock(){
	bucket* pBucket = __mlBuckets.Lend();
	pBucket->self = this;
	::ResetEvent(pBucket->hEvent.get());
	::QueueUserAPC(__pAPCLock, __hThreadHost, (ULONG_PTR)pBucket);
	::WaitForSingleObject(pBucket->hEvent.get(),INFINITE);
	return ;
}

void OrderedLock::UnLock(){
	::QueueUserAPC(__pAPCUnLock, __hThreadOp, (ULONG_PTR)__pCurrentBucket);
}

OrderedLock::bucket::bucket():
	hEvent{ [](){HANDLE h; if( !(h = CreateEvent(NULL,TRUE,FALSE,NULL)) ){
		throw std::exception(ENOut(GetLastError()).c_str());}	return h;}()
	,CloseHandle }

{}

パブリックメンバ

 パブリックメンバはordered_lock_cvクラスと同じ、LockUnLockです。この為、次のベンチマークで簡単に切り替える事が出来ます。

ベンチマーク

 2つの方式の順番を保ったロッククラスのベンチマークが取れるコードを用意しました。この2つのクラスのインターフェースは揃えてありますので、
TestOrderedCout2.cpp

TestOrderedCout2.cpp
// using lock_class = OrderedLock;// 1
using lock_class = ordered_lock_cv; // 2

の、using行のどちらかをアンコメントすると、使用するクラスを変更できます。以下のような条件にしました。

  • 5000スレッドで行う
  • 一つのスレッドにつき、3回LockUnLockを行う
  • リリースビルドで行う
  • コマンドプロンプトから単独で実行する
  • /O2、/Ob2、/Oiオプションを使う

 さて、どうなるのでしょう。

ベンチマークの結果

ordered_lock_cvクラス

thread4953 3 times       total pass count 14998 0.362 msec
thread4908 3 times       total pass count 14999 0.354 msec
thread873 3 times       total pass count 15000  0.354 msec
Main:total elapsed time: 141268.956 msec

 メモリーの使用量は次のようになっています。
Memory ordered_lock_cv.png

OrderedLockクラス

thread2277 3 times       total pass count 14998 0.375 msec
thread2282 3 times       total pass count 14999 0.413 msec
thread2226 3 times       total pass count 15000 0.377 msec
Main:total elapsed time: 7067.092 msec

 メモリーの使用量は
MemoryOrderedLock.png

 ・・・結構差が出ました。ピークのメモリー使用量もordered_lock_cvクラスの方が、倍近く多い事が判りました。OrderedLockのメモリーの使用推移を見ると、最初にドカンときて、後は凄く少なくなっています。多分ですが、APCのキューのメモリーじゃないかと予想しています。実際のところ、メモリー使用量が少なくなった時点でキューは空っぽになっていると思われます。後の終了までの時間は、表示に時間が取られているのでしょうね。ギャップタイムに違いが無いのになぜ?と、お思いかもしれませんので、2 timesあたりのギャップタイムを見てみましょう。

ordered_lock_cvクラスの2times辺りのギャップタイム

thread85 2 times       total pass count 9980    3.200 msec
thread3289 2 times       total pass count 9981  16.652 msec
thread3538 2 times       total pass count 9982  7.994 msec

OrderedLockクラスの2times辺りのギャップタイム

thread490 2 times       total pass count 9586   0.365 msec
thread489 2 times       total pass count 9587   0.357 msec
thread491 2 times       total pass count 9588   2.221 msec

 ordered_lock_cvクラスのギャップタイムが、大きいのが判ります。おそらくnofity_allの時に、比較対象が多くて処理に時間がかかっていて、3times目の終盤になると、比較対象が、少なくなり時間が掛からなくなっていると考えられます。
 また、ordered_lock_cvは、LockUnLockする度に、オペレーション用スレッドを作るのですが、この処理をCPUが立て込んでいる時に行うと、更に時間が掛かるのでしょう。

 一方、OrderedLockクラスはキューから読みだして、対象のイベントオブジェクトをセットするだけですから、割と時間は掛かっていません。LockUnLock時のオペレーションも、APC機能を利用しますので、新たにスレッドを立ち上げたりはしません。この辺りは以下のリンクも参照して頂けると、理解が深まると思います。

 むしろ、OrderedCOutクラスが時間を食っているようにも思えます。

まとめ

  • 読者の凄腕プログラマの方からcondition_variableを使ったらどうかとアドバイスがあり、condition_variableのLinuxでも動かせる使用例を、示して頂いた
  • なるほど、こんな仕掛けで出来るのか!と、思い興味を持った
  • 取り掛かるも、waitnotifyのタイミング調整のアイディアが思い浮かばず、時間が過ぎて行った
  • 5000スレッドの処理も、デッドロックがランダムなタイミングで起きる事が判った
  • Windowプロシージャでのデッドロック回避手順を取り入れ、不具合解消を図り、意図した動作をするようになった
  • ordered_lock_cvクラスもWindows.hを外せて、ポータビリティが向上した
  • 処理速度を計測してみると、condition_variableは遅い事が判った
  • notify_allの対象先が多くて時間が掛かっているものと思われる
  • メモリのピーク使用量もcondition_variableの方が、多い事が判った

終わりに

 「[C++][Windows][mutex]順番を保障したlockクラスを作る4読者様からのご提案 condition_variable編」の解説は以上となります。この記事が皆様の閃きや発想のきっかけになりましたら幸いです。
 また、ご意見、ご感想、ご質問など、お待ちしております。

0
0
10

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?