7
12

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

再入門C#:コンカレントコレクション(1)

Posted at

非同期処理に使える様々なコレクションがありますので、そのコレクションの使い方や考え方を調査したいと思います。

コンカレントコレクション

コンカレントコレクションは、非同期用のスレッドセーフに実装されたコレクションで、非同期のユースケースに対応できるようなメソッドがあります。

System.Collections.Concurrent の namespace にはスレッドセーフでスケーラブルないくつかのコレクションクラスがあります。マルチスレッドの状態でも安全にアイテムを追加したり、更新したり、削除したりすることができます。

| コレクションの名前 | 概要                          |
|------------------------- + --------------------------------------------------------------|
| BlockingCollection| IProducerConsumerCollectionのバウンディングや、ブロッキングの機能をもった実装。Blocking Collection Overview参照|
| ConcurrentDictionary| スレッドセーフの Dictionary |
| ConcurrentQueue | スレッドセーフの Queue|
| ConcurrentStack|スレッドセーフの Stack |
| ConcurrentBag | スレッドセーフのコレクションの実装 |
| IProducerConsumerCollection | BlockingCollectionで使われるインターフェイス |

全ての内容を理解していませんが、一つ一つ調査しようと思います。

コンカレントコレクションをつわないとどうするか?

通常のQueueと、ConcurrentQueueを比較してみましょう。

Queue

        static async Task QueueAsync()
        {
            var orders = new Queue<string>();
            Task order1 = Task.Run(() => PlaceOrderQueue(orders, "ushio"));
            Task order2 = Task.Run(() => PlaceOrderQueue(orders, "yamada"));
            Task order3 = Task.Run(() => PlaceOrderQueue(orders, "higuchi"));
            Task order4 = Task.Run(() => PlaceOrderQueue(orders, "taro"));
            Task.WaitAll(order1, order2, order3, order4);

            while(orders.Count != 0)
            {
                Console.WriteLine($"order: {orders.Dequeue()}");
            }
        }

        private static void PlaceOrderQueue(Queue<string> order, string name)
        {
            for (int i = 0; i < 5; i++)
            {
                Thread.Sleep(TimeSpan.FromSeconds(1));
                order.Enqueue($"{name} wants surface pro {i + 1}");
            }
        }

実行結果

通常のコレクションを使うと、スレッドセーフではないので、同じタイミングで更新をしたケースでは、データが欠落しているようです。
ちなみに、ConcurrentDictionaryの代わりに Dictionaryを使うと、Null Reference Exception がでて、ちゃんと実行すらも出来ませんでした。

order: taro wants surface pro 1
order:
order: yamada wants surface pro 1
order: higuchi wants surface pro 1
order:
order:
order:
order: ushio wants surface pro 2
order:
order: yamada wants surface pro 3
order: taro wants surface pro 3
order: ushio wants surface pro 3
order: ushio wants surface pro 4
order:
order: yamada wants surface pro 4
order: higuchi wants surface pro 4
order: higuchi wants surface pro 5
order: yamada wants surface pro 5
order: taro wants surface pro 5
order: ushio wants surface pro 5

Task.Run と Async メソッド

余談ですが、最初のコードはこんな感じで書いていました。ちなみに、PlaceOrderQueueAsyncが1行1行実行されます。まるで同期メソッドのように。なぜかというと、PlaceOrderQueueAsyncは本来Asyncではなくても動くものなので、ふつーに実行されてしまうとのこと。当初は、Taskで返すのと、最後にTask.WaitAllしてるんだから、並列に動くのではと思ったけど、そうなりませんでした。


        private static async Task PlaceOrderQueueAsync(Queue<string> order, string name)
        {
            Console.WriteLine($"Into the method. {name}.");
            for (int i = 0; i < 5; i++)
            {
                Thread.Sleep(TimeSpan.FromSeconds(1));
                order.Enqueue($"{name} wants surface pro {i + 1}");
            }
        }
        static async Task QueueAsync()
        {
            var orders = new Queue<string>();
            Task order1 =  PlaceOrderQueueAsync(orders, "ushio");
            Task order2 = PlaceOrderQueueAsync(orders, "yamada");
            Task order3 = PlaceOrderQueueAsync(orders, "higuchi");
            Task order4 = PlaceOrderQueueAsync(orders, "taro");
            Task.WaitAll(order1, order2, order3, order4);

            while (orders.Count != 0)
            {
                Console.WriteLine($"order: {orders.Dequeue()}");
            }
        }

結果

例では、Into the methodのところが、しばらくまってから、一つ、一つ実行されます。並列実行されるときは、ここは一気に出てくるはずですが、そうはなりません。

Into the method. ushio.
Into the method. yamada.
Into the method. higuchi.
Into the method. taro.
order: ushio wants surface pro 1
order: ushio wants surface pro 2
order: ushio wants surface pro 3
order: ushio wants surface pro 4
order: ushio wants surface pro 5
order: yamada wants surface pro 1
order: yamada wants surface pro 2
order: yamada wants surface pro 3
order: yamada wants surface pro 4
order: yamada wants surface pro 5
order: higuchi wants surface pro 1
order: higuchi wants surface pro 2
order: higuchi wants surface pro 3
order: higuchi wants surface pro 4
order: higuchi wants surface pro 5
order: taro wants surface pro 1
order: taro wants surface pro 2
order: taro wants surface pro 3
order: taro wants surface pro 4
order: taro wants surface pro 5

解決策

Asyncメソッドが「なんちゃって」なのが問題なのであれば、なんちゃって出無くすと良いです。こしてあげて、本当のAsyncメソッドにしてあげて、待ち受けが実際に発生するようにすると、想定の動作になります。

        private static async Task PlaceOrderQueueAsync(Queue<string> order, string name)
        {
            Console.WriteLine($"Into the method. {name}.");
            for (int i = 0; i < 5; i++)
            {
                await Task.Delay(TimeSpan.FromSeconds(1));
                order.Enqueue($"{name} wants surface pro {i + 1}");
            }
        }

たまたまうまく動いてくれちゃっていますが、、、。何回か実行すると失敗しました。もしくは、Task.Run で実行すると確実に非同期になりますので、なんちゃって、Asyncメソッドは、Asyncでなくしてしまっても良いでしょう。

Into the method. ushio.
Into the method. yamada.
Into the method. higuchi.
Into the method. taro.
order: taro wants surface pro 1
order: yamada wants surface pro 1
order: higuchi wants surface pro 1
order: ushio wants surface pro 1
order: higuchi wants surface pro 2
order: taro wants surface pro 2
order: yamada wants surface pro 2
order: ushio wants surface pro 2
order: higuchi wants surface pro 3
order: taro wants surface pro 3
order: ushio wants surface pro 3
order: yamada wants surface pro 3
order: taro wants surface pro 4
order: higuchi wants surface pro 4
order: ushio wants surface pro 4
order: yamada wants surface pro 4
order: higuchi wants surface pro 5
order: taro wants surface pro 5
order: ushio wants surface pro 5
order: yamada wants surface pro 5

コンカレントキュー

スレッドセーフのキューを使うと問題解決です。インターフェイスも普通のQueueと同じですが、TryDequeue() メソッドが使われます。他のスレッドが、Dequeueを行う可能性があり、失敗する可能性があるのでこのインターフェイスを使います。失敗した場合、戻り値がfalseになります。(ここでは処理を書いていませんが)Dequeueの場合は failの場合、普通にループを抜けると良いでしょう。このロジックでは、他のスレッドで終了いるので、このままで問題ありません。

        static async Task ConcurrentQueueAsync()
        {
            var orders = new ConcurrentQueue<string>();
            Task order1 = Task.Run(() => PlaceOrderConQueue(orders, "ushio"));
            Task order2 = Task.Run(() => PlaceOrderConQueue(orders, "yamada"));
            Task order3 = Task.Run(() => PlaceOrderConQueue(orders, "higuchi"));
            Task order4 = Task.Run(() => PlaceOrderConQueue(orders, "taro"));
            Task.WaitAll(order1, order2, order3, order4);

            while (orders.Count != 0)
            {
                string message;
                orders.TryDequeue(out message);
                Console.WriteLine($"order: {message}");
            }
        }

        private static void PlaceOrderConQueue(ConcurrentQueue<string> order, string name)
        {
            for (int i = 0; i < 5; i++)
            {
                Thread.Sleep(TimeSpan.FromSeconds(1));
                order.Enqueue($"{name} wants surface pro {i + 1}");
            }
        }

結果

order: higuchi wants surface pro 1
order: yamada wants surface pro 1
order: taro wants surface pro 1
order: ushio wants surface pro 1
order: higuchi wants surface pro 2
order: taro wants surface pro 2
order: ushio wants surface pro 2
order: yamada wants surface pro 2
order: yamada wants surface pro 3
order: ushio wants surface pro 3
order: higuchi wants surface pro 3
order: taro wants surface pro 3
order: ushio wants surface pro 4
order: yamada wants surface pro 4
order: higuchi wants surface pro 4
order: taro wants surface pro 4
order: higuchi wants surface pro 5
order: yamada wants surface pro 5
order: ushio wants surface pro 5
order: taro wants surface pro 5

通常のQueueでなんとかする方法

通常のQueueで何とかしたければ、自分でロックをかければよいです。

        private static object LockObj = new object(); 
        private static void PlaceOrderQueueLock(Queue<string> order, string name)
        {
            for (int i = 0; i < 5; i++)
            {
                Thread.Sleep(TimeSpan.FromSeconds(1));
                lock (LockObj)
                {
                    order.Enqueue($"{name} wants surface pro {i + 1}");
                }
            }
        }

        static async Task QueueLockAsync()
        {
            var orders = new Queue<string>();
            Task order1 = Task.Run(() => PlaceOrderQueueLock(orders, "ushio"));
            Task order2 = Task.Run(() => PlaceOrderQueueLock(orders, "yamada"));
            Task order3 = Task.Run(() => PlaceOrderQueueLock(orders, "higuchi"));
            Task order4 = Task.Run(() => PlaceOrderQueueLock(orders, "taro"));
            Task.WaitAll(order1, order2, order3, order4);

            while (orders.Count != 0)
            {
                Console.WriteLine($"order: {orders.Dequeue()}");
            }
        }

ConcurrentDictionary

コンカレントディクショナリはよく使われるそうで、コンカレントなListのかわりなどにも使われるようです。さて、通常のDictionaryとAPIの差異を確認してみましょう。

Dictionary

        private static void DictionarySample()
        {
            var order = new Dictionary<int, string>()
            {
                {1, "Surface Pro 4"},
                {2, "Surface Book 2"}
            };
            Console.WriteLine($"Current Order number {order.Count}");

            order.Add(3, "Microsoft Mouse");
            order[4] = "Keyboard";
            order.Remove(1);
            foreach (var kv in order)
            {
                Console.WriteLine($"Key: {kv.Key} Value: {kv.Value}");
            }
        }

ConcurrentDictionary

状態の取得、変更の陰には常に別スレッドで変更が行われている可能性が含まれます。ですので基本的にTry...のメソッドになります。
下記の例でも、同じキーのものを追加しようとしたら失敗します。通常のディクショナリの場合だと、気にせず、order[3] = "Microsoft Mouse" とかやりますが、本来ここではお勧めできません。キーがあるかないかもわからないのであれば、AddOrUpdateを使います。AddOrUpdateの第一引数は、キーで、その次の引数が、キーがなかった時に追加される値、そして最後のFunctionは、現在の値がわたってくるので、それを参考にする、もしくは、新しい値をセットするためのものになっています。これも、他のスレッドが更新してないよね?という配慮になります。

        private static void ConcurrentDictionarySample()
        {
            var order = new ConcurrentDictionary<int, string>();
            var success = order.TryAdd(1, "Surface Pro 4");
            Console.WriteLine($"Order 1: succeeded?: {success}");
            success = order.TryAdd(1, "Surface Book 2");
            Console.WriteLine($"Order 2: succeeded?: {success}");
            order[3] = "Microsoft Mouse";
            string message = order.AddOrUpdate(1, "New Order",
                (key, oldValue) => $"{oldValue} is replaced by surface book 3");
            Console.WriteLine($"New Value: {message}");
            string product;
            success = order.TryRemove(3, out product);
            if (success)
                Console.WriteLine($"key3: value: {product} is removed");
            foreach(var kv in order)
            {
                Console.WriteLine($"{kv.Key}: {kv.Value}");
            }

        }

結果

Order 1: succeeded?: True
Order 2: succeeded?: False
New Value: Surface Pro 4 is replaced by surface book 3
key3: value: Microsoft Mouse is removed
1: Surface Pro 4 is replaced by surface book 3

各メソッドでどういう処理を行うべきだろうか?

ConcurrentDictionaryでメソッドが失敗した場合どうしたらいいだろう? マルチスレッドでこれらは起こりうることなので、Exceptionが発生して処理が止まるのがまずいケースもおおいだろう。その場合にTry系のメソッドが役に立つ

TryAdd

これは別メソッドが、既にそのキーを登録しているということなので、別に無視してよければ、何もしなくても良いかもしれない。そうでないばあいは、AddOrUpdateでキーの有無に関係なく登録できるようにするのがよいだろうか?

AddOrUpdate

キーがあるか無いかわからないときは、とにかくこれが良いだろう。

TryRemove

削除しようとしたけど、既にないというケースなので、これも場合によっては無視してしまってもいいだろう。

TryGet

取得しようとしたけど、既になくなっているケースかもしれない。これはまずいので例外か、既に削除されている通知をする必要がありそう。

GetOrAdd

取得しようとして、なければ、値を作る。第二引数は、関数を渡す。キーが渡ってくるので、キーが無い時のバリューをしていして、値をえる
キーが無い時に対処したい場合はこちらだろうか?

var value = order.GetOrAdd(1, (key) => $"New value for {key}");

おわりに

まずは初回オーバービューを書いてみた。次はもっとディープなところや、パターンについても慣れてコンカレントプログラミングに慣れていきたい。

7
12
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
7
12

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?