0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

お題は不問!Qiita Engineer Festa 2024で記事投稿!
Qiita Engineer Festa20242024年7月17日まで開催中!

【ファイル読み込み高速化】セマフォを使って共有バッファのメモリを制限する

Last updated at Posted at 2024-07-01

概要

C++でファイル読み込みを高速化する
こちらの記事の続きとなります。
ファイル読み込み<文字列の変換 のように片方のタスクが早く終る場合、メッセージブロックがどんどん溜まり、メモリ使用量が必要以上に増えてしまいます。
そこで共有バッファに送信するメッセージブロックの数を制限することでメモリ使用量を制限します。

ソースコード

非同期エージェントライブラリのベストプラクティス
ソースコードはこちらの”調整メカニズムを使用してデータ パイプライン内のメッセージ数を制限する”をそのまま使用しています。

// A Semaphore type that uses cooperative blocking semantics.
class Semaphore
{
public:
	explicit Semaphore(long long capacity)
		: _semaphore_count(capacity)
	{
	}

	// Acquires access to the Semaphore.
	void acquire()
	{
		// The capacity of the Semaphore is exceeded when the Semaphore count 
		// falls below zero. When this happens, add the current context to the 
		// back of the wait queue and block the current context.
		if (--_semaphore_count < 0)
		{
			_waiting_contexts.push(Context::CurrentContext());
			Context::Block();
		}
	}

	// Releases access to the Semaphore.
	void release()
	{
		// If the Semaphore count is negative, unblock the first waiting context.
		if (++_semaphore_count <= 0)
		{
			// A call to acquire might have decremented the counter, but has not
			// yet finished adding the context to the queue. 
			// Create a spin loop that waits for the context to become available.
			Context* waiting = NULL;
			while (!_waiting_contexts.try_pop(waiting))
			{
				(Context::Yield)(); // <windows.h> defines Yield as a macro. The parenthesis around Yield prevent the macro expansion so that Context::Yield() is called.  
			}

			// Unblock the context.
			waiting->Unblock();
		}
	}

private:
	// The Semaphore count.
	atomic<long long> _semaphore_count;

	// A concurrency-safe queue of contexts that must wait to 
	// acquire the Semaphore.
	concurrent_queue<Context*> _waiting_contexts;
};

Semaphoreクラスを作成する場合にはコンストラクタにメッセージの許容数を指定します。
その後メッセージを送受信するAgentに同じSemaphoreのインスタンスを渡し、sendのにacquire(),receive()のにrelease()を実行します

//Agentを用いてファイルの読み込みと変換を並列実行する
concurrency::concurrent_vector<lineInfo>& ReadWithAgent() {
	shared_ptr<Semaphore> semaphore = make_shared<Semaphore>(4);//メッセージブロックを4個に制限する
	unbounded_buffer<shared_ptr<vector<string>>> buffer;
	FileRead_agent reader( buffer, ReadFileName,1,gSentinel,semaphore );//同じsemaphoreを渡す
	Converter_Agent converter( buffer,false,gSentinel,semaphore );//同じsemaphoreを渡す

	reader.start();
	converter.start();

	agent::wait( &reader );
	agent::wait( &converter );

	return converter.GetInfos();
}

送信時

				_semapho->acquire();
				send( _target, buffers );

受信時

			//次のブロックを読み込む
			buffers = receive( _source );
			_semapho->release();

結果

4個にメッセージを制限した場合、Context::Block()でブレークポイントを置くとtargetのメッセージカウントが4つになっていることがわかります。
acquire.JPG
semaphore.JPG

処理の流れ

構造自体はとてもシンプルで、acquire()を実行すると_semaphore_countが-1され、0未満となればそのコンテキストがブロックされます。
_semaphore_countには制限したい個数が入っているので、4が入っていたならば4回のacquire()まで許容する、といった形です。
逆にrelease()が実行されると_semaphore_countが+1され、またメッセージ送信ができるようになります。

あとがき

並列処理を知ることでatomic変数やshared_ptrなどの使い方が少しづつわかってきたように感じます。
今回はあっさりした記事でしたが、いいねや補足訂正コメントなどいただけるととっても嬉しいです。
コンテキスト周りの理解が浅いのでそのあたりを追記したい・・・

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?