はじめに
パフォーマンスチューニングのために、頻繁に実行される可能性のある同期メソッドを、Producer-Consumerパターンのようなものに置き換えました。
使用先がそもそもシンプルな動作で十分だったため、必要最低限の実装ですが参考になれば幸いです。
Queueに入るItem
BlockingCollectionは通常Queueの動作をするので、この先Queueと表現します。
Queueに入れるItemはDo()だけを備えたシンプルなクラスです。
namespace Producer_Consumer
{
internal interface IQueueItem
{
void Do();
}
}
namespace Producer_Consumer
{
internal static class QueueItems
{
internal class Example : IQueueItem
{
public void Do()
{
// Heavy process
}
}
}
}
Consumerクラス
主にQueueItemを追加するAddメソッドとQueueの中身を取り出して実行するタスク(Processメソッド)を走らせるStartメソッドがあります。
ProcessメソッドはStopメソッドでCancellationTokenを使ってCancelされます。
また、ManualResetEventを使い、Queueが空の間は止まるようになっています。
using System;
using System.Collections.Concurrent;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
namespace Producer_Consumer
{
internal class QueueConsumer : IDisposable
{
private BlockingCollection<IQueueItem> _queue;
private CancellationTokenSource _processCancellationTokenSource;
private ManualResetEvent _queueAddedEvent;
internal bool IsRunning { get; private set; }
private void Process()
{
while (!(_processCancellationTokenSource?.IsCancellationRequested ?? true))
{
if (_queue?.Count > 0)
{
do
{
IQueueItem item = _queue?.Take();
item?.Do();
if (_processCancellationTokenSource?.IsCancellationRequested ?? true)
{
break;
}
} while (_queue?.Count > 0);
_queueAddedEvent?.Reset();
continue;
}
if (_processCancellationTokenSource != null && _queueAddedEvent != null)
{
Debug.WriteLine("Waiting for add next item...");
int ret = WaitHandle.WaitAny(new[] {_processCancellationTokenSource.Token.WaitHandle, _queueAddedEvent});
if (ret == 0)
{
break;
}
if (_queue?.Count == 0)
{
_queueAddedEvent?.Reset();
}
}
else
{
break;
}
Debug.WriteLine("Queue item arrived");
}
Debug.WriteLine("Process task is finished");
}
internal void Add(IQueueItem item)
{
_queue.Add(item, _processCancellationTokenSource.Token);
_queueAddedEvent?.Set();
}
internal void Start()
{
if (!IsRunning)
{
IsRunning = true;
if (_processCancellationTokenSource != null)
{
_processCancellationTokenSource.Dispose();
_processCancellationTokenSource = null;
}
if (_processCancellationTokenSource == null)
{
_processCancellationTokenSource = new CancellationTokenSource();
}
Task.Run((Action)Process, _processCancellationTokenSource.Token);
}
}
internal void Stop()
{
if (IsRunning)
{
if (!(_processCancellationTokenSource?.IsCancellationRequested ?? true))
{
_processCancellationTokenSource?.CancelAfter(0);
}
IsRunning = false;
}
}
internal QueueConsumer()
{
_queue = new BlockingCollection<IQueueItem>();
_queueAddedEvent = new ManualResetEvent(false);
}
private bool _disposedValue = false;
protected virtual void Dispose(bool disposing)
{
if (!_disposedValue)
{
if (disposing)
{
Stop();
if (_queue != null)
{
_queue.Dispose();
}
if (_processCancellationTokenSource != null)
{
_processCancellationTokenSource.Dispose();
}
if (_queueAddedEvent != null)
{
_queueAddedEvent.Dispose();
}
}
_queue = null;
_processCancellationTokenSource = null;
_queueAddedEvent = null;
_disposedValue = true;
}
}
public void Dispose()
{
Dispose(true);
}
}
}
Producerクラス
QueueにQueueItemを詰めるProducerですが、今回はいつ何を詰めるかという部分はこのクラスを所有するクラスに任せます。
同様にConsumerをいつStartするかというのもProducerを所有するクラスに任せるため、Producerを通してカプセル化しました。
using System;
namespace Producer_Consumer
{
public class QueueProducer : IDisposable
{
private QueueConsumer _consumer;
public bool IsConsumerRunning => _consumer.IsRunning;
public void AddExample()
{
_consumer?.Add(new QueueItems.Example());
}
public void StartConsumer()
{
_consumer.Start();
}
public void StopConsumer()
{
_consumer.Stop();
}
public QueueProducer()
{
_consumer = new QueueConsumer();
}
private bool disposedValue = false;
protected virtual void Dispose(bool disposing)
{
if (!disposedValue)
{
if (disposing)
{
if (_consumer != null)
{
_consumer.Dispose();
}
}
_consumer = null;
disposedValue = true;
}
}
public void Dispose()
{
Dispose(true);
}
}
}
おわりに
本家のProducer-Consumerパターンというものを熟知していないこともありますが、あくまで今回実際に使ったユースケースに合わせて作ったものです。
全く同じものですがGitHubにもUploadしてあります。