8
3

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

BlockingCollection の使い方

Posted at

コードリーディングしている時にBlockingCollection というものが出てきてよくわからなかったので、調べてみました。主にBlockingCollection in C# - Introduction and Examplesを参考にさせていただいて、自分のコードを書いてみました。

Blocking Collection とは

BlockingCollection はProducer-Consumer パターンの実装です。IPublisherConsumerCollection<T> というインターフェイスがありますが、そのスレッドセーフの実装です。つまり、コンカレントな状況においてもちゃん動いてくれるものです。

デフォルトでは内部的にConcurrentQueue を用いているようですが、コンストラクタでIProducerConsumerCollection<T> を実装したクラスを渡すことで、そちらのクラスを使用するように変えることもできるようです。インターフェイスは次のものになっています。

IProducerConsumerCollection

public interface IProducerConsumerCollection<T> : IEnumerable<T>, IEnumerable, ICollection
{
    void CopyTo(T[] array, int index);
    T[] ToArray();
    bool TryAdd(T item);
    bool TryTake([MaybeNullWhen(false)] out T item);
}

コンストラクタ

実際に挙動を確認してみましょう。BlockingCollection クラスを単純にインスタンス生成します。boundedCapacity を渡すことで、このコレクションの最大値を指定することができます。先ほど述べた通り、ここで、ConcurrentQueue 以外の実装を渡すことも可能です。

var blockingCollection = new BlockingCollection<string>(boundedCapacity: 3);

Producer

コンカレントOKとの事なので、スレッドを生成して、そこで、Add を使って要素を足していきます。ここでは、コンソールから入力したものを渡しています。挙動としてのポイントは、上記のコンストラクタで指定した、boundedCapacity を超えると、Add メソッドがブロックして、Consumer がアイテムを取得してくれるのを待ちます。

Task producerThread = Task.Factory.StartNew(() =>
{
    while (true)
    {
        string command = Console.ReadLine();
        if (command.Contains("quit")) break;
        blockingCollection.Add(command);   // blocked if it reach the capacity
    }
});

この待ち受けの挙動が好きではない場合、TryAdd メソッドもあります。このメソッドの場合、一定の時間ブロックされたら、「失敗した」とみなして処理をさせることも可能です。CancellationToken を持つオーバーロードもあります。

if (blockingCollection.TryAdd(command, TimeSpan.FromSeconds(1)))
{
        // it works!
}
else
{
    Console.WriteLine($"It reached boundedCapacity: {capacity} couldn't add {command}");
}

Consumer

Take メソッドにより、1件のアイテムを取得することができます。もし、BlockingCollection のインスタンスに1件もなかったら、ここでブロックされます。blockingCollection.IsComplete メソッドで、BlockingCollection が終了したことの通知を受け取ることができます。

NOTE

ちなみにこのサンプルで、.GetAwaiter().GetResult() みたいなクソださなことをしているかというと、Task.Factory.StartNew(async () => {} にすると、async のため labmda の実行がブロックされて、すぐに終了したとみなされて、後に出てくる WaitAll メソッドでこのスレッドの終了を待ち受ける処理がうまく動作しなくなるからです。正直もっといいやり方がありそう。本番では、async/await を使うので直接スレッドを起動していないので、問題にはなっていませんが、、ダサさを何とかしたい。

Task consumerAThread = Task.Factory.StartNew(() =>
{
    while (true)
    {
        if (blockingCollection.IsCompleted) break;
        string command = blockingCollection.Take();
        Console.WriteLine($"ConsumerA: Take Received: {command}");
        Task.Delay(TimeSpan.FromSeconds(10)).GetAwaiter().GetResult();
    }
});

TryTake メソッドは、ブロック中に、一定の時間がたつと失敗させてくれるメソッドです。

Task consumerBThread = Task.Factory.StartNew(() =>
{
    while (true)
    {
        if (blockingCollection.IsCompleted) break;
        string command;
        if (blockingCollection.TryTake(out command, TimeSpan.FromSeconds(5)))
        {
            Console.WriteLine($"ConsumerB: TryTake Received: {command}");
        }
        else
        {
            Console.WriteLine($"consumerB: Can't take now.");
        }
        Task.Delay(TimeSpan.FromSeconds(10)).GetAwaiter().GetResult();
    }
});

CancellationToken

TryXXX メソッドは、CancellationTokenをサポートしているので、活用することもできます。CancellationToken が発行されたら、OperationCanceledException がスローされます。

CancellationTokenSource source = new CancellationTokenSource();
Task consumerBThread = Task.Factory.StartNew(() =>
{
    while (true)
    {
        if (blockingCollection.IsCompleted) break;
        string command;
        try
        {
            if (blockingCollection.TryTake(out command, (int)TimeSpan.FromSeconds(5).TotalMilliseconds, source.Token))
            {
                Console.WriteLine($"ConsumerB: TryTake Received: {command}");
            }
            else
            {
                Console.WriteLine($"consumerB: Can't take now.");
            }
        } catch (OperationCanceledException e)
        {
            Console.WriteLine($"ConsumerB: Task is cancelled.: {e.Message}");
            break;
        }
        Task.Delay(TimeSpan.FromSeconds(10)).GetAwaiter().GetResult();
    }
}); 

プログラムの全体像をシェアしておきます。実行して、何かを入力すると、ProducerがBlockingCollection にアイテムを代入していきます。Producer , ConsumerA, ConsumerB のブロッキングの振る舞いを観察することができます。cancel とタイプすると、CancellationToken が発行されて終了します。もしくは、quit で終了します。

class Program
{
    static void Main(string[] args)
    {
        int capacity = 3;
        // Blocking Collection 
        var blockingCollection = new BlockingCollection<string>(boundedCapacity: capacity);
        CancellationTokenSource source = new CancellationTokenSource();

        Task producerThread = Task.Factory.StartNew(() =>
        {
            while (true)
            {
                string command = Console.ReadLine();
                if (command.Contains("quit")) break;
                if (command.Contains("cancel"))
                {
                    Console.WriteLine("Cancelling ...");
                    source.Cancel();
                    break;
                }
                // blockingCollection.Add(command);   // blocked if it reach the capacity
                if (blockingCollection.TryAdd(command, TimeSpan.FromSeconds(1)))
                {
                        // it works!
                }
                else
                {
                    Console.WriteLine($"It reached boundedCapacity: {capacity} couldn't add {command}");
                }
            }
        });
        Task consumerAThread = Task.Factory.StartNew(() =>
        {
            while (true)
            {
                if (blockingCollection.IsCompleted) break;
                string command = blockingCollection.Take();
                Console.WriteLine($"ConsumerA: Take Received: {command}");
                Task.Delay(TimeSpan.FromSeconds(10)).GetAwaiter().GetResult();
            }
        });
        Task consumerBThread = Task.Factory.StartNew(() =>
        {
            while (true)
            {
                if (blockingCollection.IsCompleted) break;
                string command;
                try
                {
                    if (blockingCollection.TryTake(out command, (int)TimeSpan.FromSeconds(5).TotalMilliseconds, source.Token))
                    {
                        Console.WriteLine($"ConsumerB: TryTake Received: {command}");
                    }
                    else
                    {
                        Console.WriteLine($"consumerB: Can't take now.");
                    }
                } catch (OperationCanceledException e)
                {
                    Console.WriteLine($"ConsumerB: Task is cancelled.: {e.Message}");
                    break;
                }
                Task.Delay(TimeSpan.FromSeconds(10)).GetAwaiter().GetResult();
            }
        });
        Task.WaitAll(producerThread, consumerAThread, consumerBThread);
    }
}

Source

今回のサンプルです。
* Sample

8
3
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
8
3

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?