概要
C++でファイル読み込みを高速化する
こちらの記事の続きとなります。
ファイル読み込み<文字列の変換 のように片方のタスクが早く終る場合、メッセージブロックがどんどん溜まり、メモリ使用量が必要以上に増えてしまいます。
そこで共有バッファに送信するメッセージブロックの数を制限することでメモリ使用量を制限します。
ソースコード
非同期エージェントライブラリのベストプラクティス
ソースコードはこちらの”調整メカニズムを使用してデータ パイプライン内のメッセージ数を制限する”をそのまま使用しています。
// A Semaphore type that uses cooperative blocking semantics.
class Semaphore
{
public:
explicit Semaphore(long long capacity)
: _semaphore_count(capacity)
{
}
// Acquires access to the Semaphore.
void acquire()
{
// The capacity of the Semaphore is exceeded when the Semaphore count
// falls below zero. When this happens, add the current context to the
// back of the wait queue and block the current context.
if (--_semaphore_count < 0)
{
_waiting_contexts.push(Context::CurrentContext());
Context::Block();
}
}
// Releases access to the Semaphore.
void release()
{
// If the Semaphore count is negative, unblock the first waiting context.
if (++_semaphore_count <= 0)
{
// A call to acquire might have decremented the counter, but has not
// yet finished adding the context to the queue.
// Create a spin loop that waits for the context to become available.
Context* waiting = NULL;
while (!_waiting_contexts.try_pop(waiting))
{
(Context::Yield)(); // <windows.h> defines Yield as a macro. The parenthesis around Yield prevent the macro expansion so that Context::Yield() is called.
}
// Unblock the context.
waiting->Unblock();
}
}
private:
// The Semaphore count.
atomic<long long> _semaphore_count;
// A concurrency-safe queue of contexts that must wait to
// acquire the Semaphore.
concurrent_queue<Context*> _waiting_contexts;
};
Semaphoreクラスを作成する場合にはコンストラクタにメッセージの許容数を指定します。
その後メッセージを送受信するAgentに同じSemaphoreのインスタンスを渡し、sendの前にacquire(),receive()の後にrelease()を実行します
//Agentを用いてファイルの読み込みと変換を並列実行する
concurrency::concurrent_vector<lineInfo>& ReadWithAgent() {
shared_ptr<Semaphore> semaphore = make_shared<Semaphore>(4);//メッセージブロックを4個に制限する
unbounded_buffer<shared_ptr<vector<string>>> buffer;
FileRead_agent reader( buffer, ReadFileName,1,gSentinel,semaphore );//同じsemaphoreを渡す
Converter_Agent converter( buffer,false,gSentinel,semaphore );//同じsemaphoreを渡す
reader.start();
converter.start();
agent::wait( &reader );
agent::wait( &converter );
return converter.GetInfos();
}
送信時
_semapho->acquire();
send( _target, buffers );
受信時
//次のブロックを読み込む
buffers = receive( _source );
_semapho->release();
結果
4個にメッセージを制限した場合、Context::Block()でブレークポイントを置くとtargetのメッセージカウントが4つになっていることがわかります。
処理の流れ
構造自体はとてもシンプルで、acquire()を実行すると_semaphore_countが-1され、0未満となればそのコンテキストがブロックされます。
_semaphore_countには制限したい個数が入っているので、4が入っていたならば4回のacquire()まで許容する、といった形です。
逆にrelease()が実行されると_semaphore_countが+1され、またメッセージ送信ができるようになります。
あとがき
並列処理を知ることでatomic変数やshared_ptrなどの使い方が少しづつわかってきたように感じます。
今回はあっさりした記事でしたが、いいねや補足訂正コメントなどいただけるととっても嬉しいです。
コンテキスト周りの理解が浅いのでそのあたりを追記したい・・・