コードリーディングしている時にBlockingCollection
というものが出てきてよくわからなかったので、調べてみました。主にBlockingCollection in C# - Introduction and Examplesを参考にさせていただいて、自分のコードを書いてみました。
Blocking Collection とは
BlockingCollection
はProducer-Consumer パターンの実装です。IPublisherConsumerCollection<T>
というインターフェイスがありますが、そのスレッドセーフの実装です。つまり、コンカレントな状況においてもちゃん動いてくれるものです。
デフォルトでは内部的にConcurrentQueue
を用いているようですが、コンストラクタでIProducerConsumerCollection<T>
を実装したクラスを渡すことで、そちらのクラスを使用するように変えることもできるようです。インターフェイスは次のものになっています。
IProducerConsumerCollection
public interface IProducerConsumerCollection<T> : IEnumerable<T>, IEnumerable, ICollection
{
void CopyTo(T[] array, int index);
T[] ToArray();
bool TryAdd(T item);
bool TryTake([MaybeNullWhen(false)] out T item);
}
コンストラクタ
実際に挙動を確認してみましょう。BlockingCollection
クラスを単純にインスタンス生成します。boundedCapacity
を渡すことで、このコレクションの最大値を指定することができます。先ほど述べた通り、ここで、ConcurrentQueue
以外の実装を渡すことも可能です。
var blockingCollection = new BlockingCollection<string>(boundedCapacity: 3);
Producer
コンカレントOKとの事なので、スレッドを生成して、そこで、Add
を使って要素を足していきます。ここでは、コンソールから入力したものを渡しています。挙動としてのポイントは、上記のコンストラクタで指定した、boundedCapacity
を超えると、Add
メソッドがブロックして、Consumer がアイテムを取得してくれるのを待ちます。
Task producerThread = Task.Factory.StartNew(() =>
{
while (true)
{
string command = Console.ReadLine();
if (command.Contains("quit")) break;
blockingCollection.Add(command); // blocked if it reach the capacity
}
});
この待ち受けの挙動が好きではない場合、TryAdd
メソッドもあります。このメソッドの場合、一定の時間ブロックされたら、「失敗した」とみなして処理をさせることも可能です。CancellationToken
を持つオーバーロードもあります。
if (blockingCollection.TryAdd(command, TimeSpan.FromSeconds(1)))
{
// it works!
}
else
{
Console.WriteLine($"It reached boundedCapacity: {capacity} couldn't add {command}");
}
Consumer
Take
メソッドにより、1件のアイテムを取得することができます。もし、BlockingCollection
のインスタンスに1件もなかったら、ここでブロックされます。blockingCollection.IsComplete
メソッドで、BlockingCollection
が終了したことの通知を受け取ることができます。
NOTE
ちなみにこのサンプルで、.GetAwaiter().GetResult()
みたいなクソださなことをしているかというと、Task.Factory.StartNew(async () => {}
にすると、async のため labmda の実行がブロックされて、すぐに終了したとみなされて、後に出てくる WaitAll
メソッドでこのスレッドの終了を待ち受ける処理がうまく動作しなくなるからです。正直もっといいやり方がありそう。本番では、async/await を使うので直接スレッドを起動していないので、問題にはなっていませんが、、ダサさを何とかしたい。
Task consumerAThread = Task.Factory.StartNew(() =>
{
while (true)
{
if (blockingCollection.IsCompleted) break;
string command = blockingCollection.Take();
Console.WriteLine($"ConsumerA: Take Received: {command}");
Task.Delay(TimeSpan.FromSeconds(10)).GetAwaiter().GetResult();
}
});
TryTake
メソッドは、ブロック中に、一定の時間がたつと失敗させてくれるメソッドです。
Task consumerBThread = Task.Factory.StartNew(() =>
{
while (true)
{
if (blockingCollection.IsCompleted) break;
string command;
if (blockingCollection.TryTake(out command, TimeSpan.FromSeconds(5)))
{
Console.WriteLine($"ConsumerB: TryTake Received: {command}");
}
else
{
Console.WriteLine($"consumerB: Can't take now.");
}
Task.Delay(TimeSpan.FromSeconds(10)).GetAwaiter().GetResult();
}
});
CancellationToken
TryXXX
メソッドは、CancellationToken
をサポートしているので、活用することもできます。CancellationToken
が発行されたら、OperationCanceledException
がスローされます。
CancellationTokenSource source = new CancellationTokenSource();
Task consumerBThread = Task.Factory.StartNew(() =>
{
while (true)
{
if (blockingCollection.IsCompleted) break;
string command;
try
{
if (blockingCollection.TryTake(out command, (int)TimeSpan.FromSeconds(5).TotalMilliseconds, source.Token))
{
Console.WriteLine($"ConsumerB: TryTake Received: {command}");
}
else
{
Console.WriteLine($"consumerB: Can't take now.");
}
} catch (OperationCanceledException e)
{
Console.WriteLine($"ConsumerB: Task is cancelled.: {e.Message}");
break;
}
Task.Delay(TimeSpan.FromSeconds(10)).GetAwaiter().GetResult();
}
});
プログラムの全体像をシェアしておきます。実行して、何かを入力すると、ProducerがBlockingCollection
にアイテムを代入していきます。Producer
, ConsumerA
, ConsumerB
のブロッキングの振る舞いを観察することができます。cancel
とタイプすると、CancellationToken
が発行されて終了します。もしくは、quit
で終了します。
class Program
{
static void Main(string[] args)
{
int capacity = 3;
// Blocking Collection
var blockingCollection = new BlockingCollection<string>(boundedCapacity: capacity);
CancellationTokenSource source = new CancellationTokenSource();
Task producerThread = Task.Factory.StartNew(() =>
{
while (true)
{
string command = Console.ReadLine();
if (command.Contains("quit")) break;
if (command.Contains("cancel"))
{
Console.WriteLine("Cancelling ...");
source.Cancel();
break;
}
// blockingCollection.Add(command); // blocked if it reach the capacity
if (blockingCollection.TryAdd(command, TimeSpan.FromSeconds(1)))
{
// it works!
}
else
{
Console.WriteLine($"It reached boundedCapacity: {capacity} couldn't add {command}");
}
}
});
Task consumerAThread = Task.Factory.StartNew(() =>
{
while (true)
{
if (blockingCollection.IsCompleted) break;
string command = blockingCollection.Take();
Console.WriteLine($"ConsumerA: Take Received: {command}");
Task.Delay(TimeSpan.FromSeconds(10)).GetAwaiter().GetResult();
}
});
Task consumerBThread = Task.Factory.StartNew(() =>
{
while (true)
{
if (blockingCollection.IsCompleted) break;
string command;
try
{
if (blockingCollection.TryTake(out command, (int)TimeSpan.FromSeconds(5).TotalMilliseconds, source.Token))
{
Console.WriteLine($"ConsumerB: TryTake Received: {command}");
}
else
{
Console.WriteLine($"consumerB: Can't take now.");
}
} catch (OperationCanceledException e)
{
Console.WriteLine($"ConsumerB: Task is cancelled.: {e.Message}");
break;
}
Task.Delay(TimeSpan.FromSeconds(10)).GetAwaiter().GetResult();
}
});
Task.WaitAll(producerThread, consumerAThread, consumerBThread);
}
}
Source
今回のサンプルです。