25
31

More than 5 years have passed since last update.

SemaphoreSlim を使って並列実行を制御する

Posted at

たまたま、SemaphoreSlim を使った並列実行の制御をするコードを読んだので、ちゃんと理解するために、サンプルコードを書いて調べてみたので、ブログに書いておく。

Semaphore と SemaphoreSlim

Semaphore という概念はこちらのブログがとってもわかりやすい。

つまるところ、並列実行をするときに、並列実行数を制御できる仕組みだ。数が付いたロックのようなもの。どのような時に使うかというと、並列実行をして処理速度を上げたいんだけど、あまりたくさんの数で並列実行させてしまうと、受け手がパンクする、、、とかいうユースケースが考えられる。

この概念を表す Semaphore と SemaphoreSlim というクラスが存在する。Semaphore の方は、このセマフォの仕組みがWin32セマフォオブジェクトのラッパである。SemaphoreSlim シンプルな実装の方で、ローカル(つまり自分のアプリ内)のみのセマフォをサポートし、Semaphore の方はシステムワイドなNamed Semaphore という概念を実装しているので、システムワイドにセマフォを共有してそれに名前を付けたりできるようだ。.NET でプログラミングするなら大抵の人は SemaphoreSlim で事が済む気がする。詳しくはこちら。

ちなみに非同期:awaitを含むコードをロックするには?(SemaphoreSlim編)[C#、VB]こちらの記事では、async メソッドを使いながらロックを書けるのに使うといわれていたが、個人的には目的が違う気がする。ただ確かにセマフォを1に設定すると、実質ロックをかけているのとかわらないので興味深い。

今回は SemaphoreSlim のオフィシャルページにのっていたサンプルがわかりやすかったのでちょっとだけ変えて実装してみた。

リファレンス実装はこちらから見れる

ちなみに、リファレンス実装にでてくるContract クラスはアサーション。契約による設計というやつでしょうか。

SemaphoreSlim の使い方

とてもシンプルで、

var semaphore = new SemaphoreSlim(0, 3);

セマフォを生成する。第一引数は、初期値で、第二引数は最大値。つまり最初のセマフォは0なので、一切新しいタスクを実行できないが、増やしたら3まで並列実行できるようになる。そして、並列実行させている箇所で次のように書く。

await semaphore.WaitAsync();  // もしくは、semaphore.Wait();
  :
semaphore.Release()

Wait/WaitAsync() の箇所でセマフォが待ち状態に入る。上記のリファレンス実装を見ると内部で lock を使って自分で管理して実装していた。セマフォが使える状態になったら、以降の処理を実行する。Release() でそのセマフォをリリースする。つまり次の並列実行している関数などがセマフォのWait で待っているものが、処理をスタートできるようになる。

単純にこれだけ。じゃあ実際にコードを書いてみてみよう。

サンプル

using System;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

namespace SemaphoreSlimSpike
{
    class Program
    {
        private static SemaphoreSlim semaphore;
        private static int count;
        static void Main(string[] args)
        {
            semaphore = new SemaphoreSlim(0, 3);
            Console.WriteLine($"{semaphore.CurrentCount} taks can enter the semaphore.");

            Task[] tasks = new Task[5];

            foreach(var i in Enumerable.Range(0, 5))
            {

                //// Async
                //tasks[i] = Task.Run(async () =>
                //{
                //    Console.WriteLine($"Task {Task.CurrentId}: {i} begins and wait for the semaphore. {DateTime.Now.ToString()}");
                //    await semaphore.WaitAsync();
                //    Interlocked.Add(ref count, 1);
                //    Console.WriteLine($"Task {Task.CurrentId}: {i} enters the semaphore. {DateTime.Now.ToString()}");
                //    await Task.Delay(TimeSpan.FromSeconds(3 + count)); // It will change the end time for the concurrently started thread.
                //    Console.WriteLine($"Task {Task.CurrentId}: {i} releases the semaphore; previous count {semaphore.Release()}");
                //    Console.WriteLine($"Task {Task.CurrentId}: {i} ends the semaphore. {DateTime.Now.ToString()}");

                //});

                // Sync
                tasks[i] = Task.Run(() =>
                {
                    Console.WriteLine($"Task {Task.CurrentId}: {i} begins and wait for the semaphore. {DateTime.Now.ToString()}");
                    semaphore.Wait();
                    Interlocked.Add(ref count, 1);
                    Console.WriteLine($"Task {Task.CurrentId}: {i} enters the semaphore. {DateTime.Now.ToString()}");
                    Thread.Sleep((3 + count) * 1000); // It will change the end time for the concurrently started thread.
                    Console.WriteLine($"Task {Task.CurrentId}: {i} releases the semaphore; previous count {semaphore.Release()}");
                    Console.WriteLine($"Task {Task.CurrentId}: {i} ends the semaphore. {DateTime.Now.ToString()}");

                });
            }

            // Wait for half a second to allow all the tasks to start and block .
            Thread.Sleep(500);

            Console.WriteLine("Main thread call Release(3)");
            // increase avairable Semaphore
            semaphore.Release(3);
            Console.WriteLine($"{semaphore.CurrentCount} tasks can enter the semaphore.");
            Task.WaitAll(tasks);
            Console.WriteLine("Main thread exits.");
            Console.ReadLine();

        }
    }
}

これがどんな処理をしているかというと、最初はセマフォが使えない(初期値0なので)Task.Run の中でマルチスレッドもしくは、並列で async メソッドを実行する。そして、semaphore.Wait()/WaitAsync() でストップする。セマフォで使えるのがゼロなので。

semapore.Release(3);

の箇所に来るとセマフォが3つリリースされる。つまり、セマフォが使える最大値になる。そしたら、セマフォでブロックされていた3つぶんのスレッドもしくは、async メソッドが動き出す。5つのTaskがあるが同時に動くのは3つまで。それらが終わるのをまって、次のスレッドが動き出すという挙動になる。

interlock

途中で Interlocked というクラスのメソッドが出てくる。これは、ロックをかけるのだが、lock よりもずっと負荷を小さくできるやり方で、中で参照されている変数にロックを書けることができる。

Interlocked.Add(ref count, 1);

実際にロックの方式を比較されている方がいます。

実行結果 (sync)

0 taks can enter the semaphore.
Task 5: 4 begins and wait for the semaphore. 02/08/2018 16:27:54
Task 1: 0 begins and wait for the semaphore. 02/08/2018 16:27:54
Task 4: 3 begins and wait for the semaphore. 02/08/2018 16:27:54
Task 2: 1 begins and wait for the semaphore. 02/08/2018 16:27:54
Task 3: 2 begins and wait for the semaphore. 02/08/2018 16:27:54
Main thread call Release(3)
3 tasks can enter the semaphore.

Task 1: 0 enters the semaphore. 02/08/2018 16:27:54
Task 5: 4 enters the semaphore. 02/08/2018 16:27:54
Task 4: 3 enters the semaphore. 02/08/2018 16:27:54

Task 2: 1 enters the semaphore. 02/08/2018 16:28:00
Task 1: 0 releases the semaphore; previous count 0
Task 5: 4 releases the semaphore; previous count 0
Task 5: 4 ends the semaphore. 02/08/2018 16:28:00

Task 3: 2 enters the semaphore. 02/08/2018 16:28:00
Task 1: 0 ends the semaphore. 02/08/2018 16:28:00
Task 4: 3 releases the semaphore; previous count 0
Task 4: 3 ends the semaphore. 02/08/2018 16:28:00

思った通りの動きをしている。ちなみに、async バージョンを書いてみると、同じように動くが、Task.CurrentIdnullになるのはなんでだろう?

実行結果 (async)

実行結果は同じだが、Task.CurrentId が最初の beginsのところ以降、つまり、Semaphore の有効範囲内では null になっているので表示されない。

0 taks can enter the semaphore.
Task 9: 4 begins and wait for the semaphore. 02/08/2018 16:37:50
Task 3: 1 begins and wait for the semaphore. 02/08/2018 16:37:50
Task 1: 0 begins and wait for the semaphore. 02/08/2018 16:37:50
Task 5: 2 begins and wait for the semaphore. 02/08/2018 16:37:50
Task 7: 3 begins and wait for the semaphore. 02/08/2018 16:37:50
Main thread call Release(3)
0 tasks can enter the semaphore.
Task : 0 enters the semaphore. 02/08/2018 16:37:50
Task : 1 enters the semaphore. 02/08/2018 16:37:50
Task : 4 enters the semaphore. 02/08/2018 16:37:50
Task : 2 enters the semaphore. 02/08/2018 16:37:56
Task : 1 releases the semaphore; previous count 0
Task : 4 releases the semaphore; previous count 0
Task : 4 ends the semaphore. 02/08/2018 16:37:56
Task : 3 enters the semaphore. 02/08/2018 16:37:56
Task : 1 ends the semaphore. 02/08/2018 16:37:56
Task : 0 releases the semaphore; previous count 0
Task : 0 ends the semaphore. 02/08/2018 16:37:56

リソース

他に参考になりそうなリソース

かずき師匠のエントリ。理解するために await クラスを自作するという素晴らしいセンス!

公式のスレッド関係の一覧が載っている

今回のサンプル

25
31
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
25
31