C# 非同期、覚え書き。

  • 55
    いいね
  • 0
    コメント

記憶力の悪い自分のために、C# における非同期処理の書き方を記述しておきたいと思います。
専門家ではないので、不出来な内容でもあしからず。

目次

  • Task
  • async / await
  • 並列処理
  • System.Threading の便利なクラス

Task

皆さんおなじみの、非同期操作を表すクラスです。
生成および実行の方法が何通りかあります。

Task.Factory.StartNew

次のようにして Task を生成、実行します。

Sample.cs
using System;
using System.Threading.Tasks;

class Program
{
    static void Main(string[] args)
    {
        var task = Task.Factory.StartNew(() => Console.WriteLine("OK"));

        Console.ReadLine();
    }
}

上記コードは、下記と同じ意味のようです。

Sample.cs
using System;
using System.Threading;
using System.Threading.Tasks;

class Program
{
    static void Main(string[] args)
    {
        Task.Factory.StartNew(
            () => Console.WriteLine("OK"),
            CancellationToken.None,
            TaskCreationOptions.None,
            TaskScheduler.Default);

        Console.ReadLine();
    }
}

TaskCreationOptions については、次節で触れます。
TaskScheduler.Default は、ThreadPool を使用してスケジューリングするという意味になります。

Task.Run

StartNew より記述が短いですね。

Sample.cs
using System;
using System.Threading.Tasks;

class Program
{
    static void Main(string[] args)
    {
        Task.Run(() => Console.WriteLine("OK"));

        Console.ReadLine();
    }
}

上記コードは、下記と同じ意味です。

Sample.cs
using System;
using System.Threading;
using System.Threading.Tasks;

class Program
{
    static void Main(string[] args)
    {
        Task.Factory.StartNew(
            () => Console.WriteLine("OK"),
            CancellationToken.None,
            TaskCreationOptions.DenyChildAttach,
            TaskScheduler.Default);

        Console.ReadLine();
    }
}

前節の StartNew との違いは、第 3 引数の TaskCreationOptions.DenyChildAttach の部分です。
Run は子スレッドに親へのアタッチを禁止します。
前節の StartNew は禁止しません。
親スレッドへのアタッチは、StartNew メソッドに TaskCreationOptions.AttachedToParent を指定することで実現できます。

実際の動作を比較してみましょう。

StartNew の場合。

Sample.cs
using System;
using System.Threading;
using System.Threading.Tasks;

class Program
{
    static void Main(string[] args)
    {
        Task.Factory.StartNew(() =>
        {
            Task.Factory.StartNew(() =>
            {
                Thread.Sleep(1000);
                Console.WriteLine("Child");
            }, TaskCreationOptions.AttachedToParent);
        }).Wait();
        Console.WriteLine("Parent");

        Console.ReadLine();
    }
}
output
Child
Parent

Run の場合。

Sample.cs
using System;
using System.Threading;
using System.Threading.Tasks;

class Program
{
    static void Main(string[] args)
    {
        Task.Run(() =>
        {
            Task.Factory.StartNew(() =>
            {
                Thread.Sleep(1000);
                Console.WriteLine("Child");
            }, TaskCreationOptions.AttachedToParent);
        }).Wait();
        Console.WriteLine("Parent");

        Console.ReadLine();
    }
}
output
Parent
Child

Run の場合は、子スレッドの終了を待たずに親スレッドが終了していることがわかります。
Run の子スレッドは親にアタッチできていないということですね。

new Task

コンストラクタから生成する場合です。
Start メソッドで起動されます。

Sample.cs
using System;
using System.Threading.Tasks;

class Program
{
    static void Main(string[] args)
    {
        var task = new Task(() => Console.WriteLine("OK"));
        task.Start();

        Console.ReadLine();
    }
}

この場合、TaskTaskScheduler.Current というものを使用してスケジューリングされます。

TaskScheduler

TaskScheduler について少し見てみましょう。
TaskScheduler は、Task の実行を管理する役割を持つクラスです。
現在の TaskScheduler オブジェクトは、TaskScheduler.Current によって取得できます。
既定では、TaskScheduler.Default という、ThreadPool を使ってスケジューリングするものが設定されています。

Task.Factory.StartNew の他、前節の Start メソッドにも TaskScheduler を指定することが可能です。

下記は、TaskScheduler.Current の値を調べるサンプルです。

Sample.cs
using System;
using System.Threading;
using System.Threading.Tasks;

class Program
{
    static void Main(string[] args)
    {
        Console.WriteLine(ReferenceEquals(TaskScheduler.Current, TaskScheduler.Default));  // true

        SynchronizationContext.SetSynchronizationContext(new SynchronizationContext());  // 現在の SynchronizationContext を設定
        var scheduler = TaskScheduler.FromCurrentSynchronizationContext();  // SynchronizationContext から TaskScheduler を生成
        var task = new Task(() => Console.WriteLine(ReferenceEquals(TaskScheduler.Current, scheduler)));  // true
        task.Start(scheduler);

        Console.ReadLine();
    }
}

TaskScheduler.FromCurrentSynchronizationContext() により、現在の SynchronizationContext をもとにスケジューリングを行なう TaskScheduler を生成できます。
SynchronizationContext については、async / await の章で述べます。

Task の完了

Task#IsCompleted となった Task は、Task#Status が次のいずれかに落ち着きます。

  • TaskStatus.RanToCompletion : 正常終了した
  • TaskStatus.Canceled : キャンセルされた
  • TaskStatus.Faulted : 例外が発生した

正常終了した際、Task が Generic 型であれば、処理結果の戻り値を Task#Result により取得できます。
同様に、例外発生で完了した Task であれば、Task#Exception から例外を取得できます。
キャンセルを発生させる場合については次節で見てみることにしましょう。

なお、完了済み Task を以下のようにして取得することが可能です。

  • Task.CompletedTask : 正常終了した Task
  • Task.FromResult(t) : 正常終了し t を返す Task<T>
  • Task.FromCanceled(cancel) : キャンセルされた Task
  • Task.FromExcception(exception) : 例外が発生した Task

また、指定ミリ秒後に正常終了する TaskTask.Delay により取得できます。

Task のキャンセル

Task のキャンセルには、CancellationToken を使用します。
CancellationToken は、まず CancellationTokenSource を生成した上で、CancellationTokenSource#Token より取得します。

キャンセルを要求するには CancellationTokenSource#Cancel を呼びます。
Task 内でキャンセル要求を受理するには、CancellationToken#ThrowIfCancellationRequested メソッドを呼び、キャンセル要求を受信している場合に OperationCanceledException 例外を発生させることで Task を終了させます。
Task は、OperationCanceledExceptionを確認すると、Task をキャンセル状態に移行させます。
このとき、Task#WaitTask<T>#Resultawait などの操作を行なうと、TaskCanceledException をラップした AggregateException が発生します。

また、Task 開始前に CancellationTokenSource#Cancel が呼ばれた場合、Task が開始と同時に終了するようにするために、Task.Run などの引数として CancellationToken を渡しておきましょう。

実際の例です。

Sample.cs
using System;
using System.Threading;
using System.Threading.Tasks;

class Program
{
    static void Main(string[] args)
    {
        var tokenSource = new CancellationTokenSource();
        var token = tokenSource.Token;
        var task = Task.Run(() =>
        {
            while (true)
            {
                token.ThrowIfCancellationRequested();
                Console.WriteLine("Loop");
                Thread.Sleep(1000);
            }
        }, token);

        Thread.Sleep(3000);
        tokenSource.Cancel();
        try
        {
            task.Wait();
        }
        catch { }
        Console.WriteLine(task.IsCanceled);  // true
        tokenSource.Dispose();

        Console.ReadLine();
    }
}

Wait と When

Task#Wait を呼ぶと、現在のスレッドをブロックして Task の完了を待ちます。

Task.WaitAll は、渡された Task 配列に格納された全ての Task が完了するまで、現在のスレッドをブロックします。

Sample.cs
using System;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

class Program
{
    static void Main(string[] args)
    {
        var tasks = Enumerable.Range(1, 10).Select(x => Task.Run(() => { Thread.Sleep(1000); Console.WriteLine(x); })).ToArray();
        Task.WaitAll(tasks);

        Console.ReadLine();
    }
}

Task.WaitAny は、全てではなくどれか 1 つの Task の完了を待ちます。

Task#WaitXX は現在のスレッドをブロックして Task の完了を待ちますが、対して Task#WhenXX は、渡された複数の Task が完了するのを待つ Task を生成します。
複数の Task を 1 つにまとめる感じですね。
Task#WhenAll は全ての Task を、Task#WhenAny はどれか 1 つの Task を待ちます。

Sample.cs
using System;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

class Program
{
    static void Main(string[] args)
    {
        var tasks = Enumerable.Range(1, 10).Select(x => Task.Run(() => { Thread.Sleep(1000); return x; }));
        var all = Task.WhenAll(tasks);
        all.Result.ForEach(Console.WriteLine);

        Console.ReadLine();
    }
}

ここで使用している Enumerable.ForEach は、System.Interactive のものです。

ContinueWith

Task#ContinueWith は、自身の Task に付加される継続 Task を生成します。
返される Task は、元の Task 完了後に継続 Task が実行される Task です。

ContinueWith の引数に指定するデリゲートには、元の Task が渡されます。

Sample.cs
using System;
using System.Threading.Tasks;

class Program
{
    static void Main(string[] args)
    {
        Task.FromResult("OK")
            .ContinueWith(task => Console.WriteLine(task.Result));

        Console.ReadLine();
    }
}

継続 Task は、元の Task と同じスレッドになるとは限らないことに注意してください。
ここでも TaskScheduler が使用されています。
既定では TaskScheduler.Default が使われます。
また、TaskScheduler を使用せず、元の Task と同じスレッドで継続 Task を実行するには、TaskContinuationOptions.ExecuteSynchronously を指定します。

この TaskContinuationOptions ですが、指定することで、条件によって継続 Task を実行するかどうかを決めることができます。
以下に例を示します。

  • OnlyOnRanToCompletion : 正常終了した場合に継続する
  • NotOnRanToCompletion : 正常終了しなかった場合に継続する
  • OnlyOnCanceled : キャンセルされた場合に継続する
  • OnlyOnFaulted : 例外が発生した場合に継続する
Sample.cs
using System;
using System.Threading;
using System.Threading.Tasks;

class Program
{
    static void Main(string[] args)
    {
        var tokenSource = new CancellationTokenSource();
        tokenSource.Cancel();

        Task.CompletedTask
            .ContinueWith(task => Console.WriteLine("OK"), TaskContinuationOptions.OnlyOnRanToCompletion);
        Task.FromCanceled(tokenSource.Token)
            .ContinueWith(task => Console.WriteLine("NG"), TaskContinuationOptions.OnlyOnRanToCompletion);
        Task.FromException(new Exception())
            .ContinueWith(task => Console.WriteLine("NG"), TaskContinuationOptions.OnlyOnRanToCompletion);

        Task.CompletedTask
            .ContinueWith(task => Console.WriteLine("NG"), TaskContinuationOptions.NotOnRanToCompletion);
        Task.FromCanceled(tokenSource.Token)
            .ContinueWith(task => Console.WriteLine("OK"), TaskContinuationOptions.NotOnRanToCompletion);
        Task.FromException(new Exception())
            .ContinueWith(task => Console.WriteLine("OK"), TaskContinuationOptions.NotOnRanToCompletion);

        Task.CompletedTask
            .ContinueWith(task => Console.WriteLine("NG"), TaskContinuationOptions.OnlyOnCanceled);
        Task.FromCanceled(tokenSource.Token)
            .ContinueWith(task => Console.WriteLine("OK"), TaskContinuationOptions.OnlyOnCanceled);
        Task.FromException(new Exception())
            .ContinueWith(task => Console.WriteLine("NG"), TaskContinuationOptions.OnlyOnCanceled);

        Task.CompletedTask
            .ContinueWith(task => Console.WriteLine("NG"), TaskContinuationOptions.OnlyOnFaulted);
        Task.FromCanceled(tokenSource.Token)
            .ContinueWith(task => Console.WriteLine("NG"), TaskContinuationOptions.OnlyOnFaulted);
        Task.FromException(new Exception())
            .ContinueWith(task => Console.WriteLine("OK"), TaskContinuationOptions.OnlyOnFaulted);

        Console.ReadLine();
    }
}

TaskCompletionSource

TaskCompletionSource は、何らかの結果を返す外部の(非同期)処理に対し、Task によるアクセスを提供します。
非同期処理を記述する側と、非同期処理の結果を取得する側を、Task によって仲介する感じですね。

非同期処理の結果を取得する側では、TaskCompletionSource#TaskTask を取得し、この Task を使って待機したり結果を得たりすることができます。

非同期処理を記述する側では、以下のメソッドを使用して、TaskCompletionSource に結果を通知します。

  • SetResult : 正常終了
  • SetCanceled : キャンセル
  • SetException : 例外発生

下記のコードは、ThreadPool により生成したスレッドへのアクセスを TaskCompletionSource によって管理している例です。

Sample.cs
using System;
using System.Threading;
using System.Threading.Tasks;

class Program
{
    enum Status
    {
        None,
        Successful,
        Canceled,
        Failed,
    }

    TaskCompletionSource<int> _completionSource = new TaskCompletionSource<int>();
    volatile Status _status = Status.None;

    void RunAsync()
    {
        ThreadPool.QueueUserWorkItem(_ =>
        {
            while (true)
            {
                switch (_status)
                {
                    case Status.Successful:
                        _completionSource.SetResult(10);
                        goto End;

                    case Status.Canceled:
                        _completionSource.SetCanceled();
                        goto End;

                    case Status.Failed:
                        _completionSource.SetException(new Exception());
                        goto End;
                }
                Thread.Sleep(100);
            }
        End:;
        });
    }

    static void Main(string[] args)
    {
        var p = new Program();
        p.RunAsync();
        p._status = Status.Successful;

        var task = p._completionSource.Task;
        try
        {
            task.Wait();
        }
        catch { }
        Console.WriteLine(task.Status);

        Console.ReadLine();
    }
}

async / await

async メソッド

async メソッドは、最初の await に達すると制御を返します。

Sample.cs
using System;
using System.Threading.Tasks;

class Program
{
    static void Main(string[] args)
    {
        var task = AsyncMethod();
        Console.WriteLine("Started");
        task.Wait();
        Console.WriteLine("Completed");
        Console.ReadLine();
    }

    static async Task AsyncMethod()
    {
        await Task.Delay(1000);
        Console.WriteLine("AsyncMethod");
        await Task.Delay(1000);
    }
}
output
Started
AsyncMethod
Completed

await でメソッドが返るため、呼び出し側のメソッドと待機した Task が非同期に実行されている、という状況が生まれます。

await とスレッド

次は、await が実行されたとき、および await 実行後に、各処理がどのスレッドで動作しているか見てみましょう。
スレッド ID を表示させてみます。

Sample.cs
using System;
using System.Threading;
using System.Threading.Tasks;

class Program
{
    static void Main(string[] args)
    {
        AsyncMethod();
        Console.ReadLine();
    }

    static async void AsyncMethod()
    {
        Console.WriteLine(Thread.CurrentThread.ManagedThreadId);
        await Task.Run(() => Console.WriteLine(Thread.CurrentThread.ManagedThreadId));
        Console.WriteLine(Thread.CurrentThread.ManagedThreadId);
    }
}
output
9
6
6

コンソールアプリケーションとして実行したところ、このような結果になりました。

最初は 9 というスレッドで実行されており、その後の Task.Run で生成された Task6 というスレッドになっています。
ここまでは普通ですね。
しかし、await から復帰した部分のスレッドが 9 ではなく 6 になっています。
元のスレッドに戻らず、Task のスレッドが使いまわされていますね。

では、GUI アプリケーションとして同様の処理を実行したらどうなるでしょう?

output
8
9
8

今度は先程とは対照的に、await 後の処理が最初と同じスレッドで実行されていることがわかります。
この現象には、SynchronizationContext というものが関係しています。

SynchronizationContext

SynchronizationContext は、await から復帰するタイミングなど、スレッドが同期をとる際に必要となるコンテキストです。

コンソールアプリケーションでは、SynchronizationContext.Currentnull になっています。
この場合、スレッドは ThreadPool から割り当てられます。
よって、どのスレッドで実行されるかは特に保証がありません。
前述の例で、元のスレッドに復帰できなかったのは、SynchronizationContext.Current が復帰のための知識を有していなかったのが理由です。

一方で、GUI アプリケーションでは、SynchronizationContext.Current に同期コンテキストが設定されています。

Windows Forms の場合、同期コンテキストには System.Windows.Forms.WindowsFormsSynchronizationContext が設定されます。
WPF の場合、System.Windows.Threading.DispatcherSynchronizationContext が設定されます。
UWP の場合、System.Threading.WinRTSynchronizationContext が設定されます。

これらの同期コンテキストが、await から復帰するための情報を持っています。
従って、GUI アプリケーションの場合は、前述の例の通り await 後に元のスレッドへと復帰することができるのです。

Task.Yield

Task.Yield メソッドを await することにより、async メソッドは制御を返し、await 自体は即座に待機を終了して同期コンテキストにより復帰する、という処理を記述することが可能です。

Sample.cs
using System;
using System.Threading;
using System.Threading.Tasks;

class Program
{
    static void Main(string[] args)
    {
        Console.WriteLine($"main : {Thread.CurrentThread.ManagedThreadId}");
        AsyncMethod().Wait();

        Console.ReadLine();
    }

    static async Task AsyncMethod()
    {
        await Task.Yield();
        Console.WriteLine($"await : {Thread.CurrentThread.ManagedThreadId}");
    }
}
output
main : 9
await : 10

await の内部実装

参考 : http://ufcpp.net/study/csharp/sp5_awaitable.html
上記ページによると、await は以下のように動作しているそうです。

Sample.cs
// http://ufcpp.net/study/csharp/sp5_awaitable.html より引用
state = State1;                  // 次に復帰するときのための状態の記録
var task = RunAsync();
var awaiter = task.GetAwaiter();
if (!awaiter.IsCompleted)
{
    awaiter.OnCompleted(a);      // タスクが未完の場合だけ、継続登録して一度 return
    return;
}
case State1:                     // 次に呼ばれたときに続きから処理するためのラベル
var y = awaiter.GetReslt();      // タスクの結果を受け取り
awaiter = default(T);            // ガベージ コレクションが働きやすくなるように null 代入

ポイントはこの部分です。
awaiter.OnCompleted(a);
a というのは、await が記述されているメソッド自身です。
Task が未完のとき、自身のメソッドが OnCompleted(a) により、継続として登録されます。
すると、Task が完了した際に自身のメソッドが再度呼び出されますが、このときには状態 stateState1 変化しており、case State1: から処理が再開されるというわけです。

並列処理

並列処理を簡単に行なう方法として、System.Threading.Tasks.Parallel と Parallel LINQ があります。

Parallel

Parallel.For は、for 文のような繰り返し処理の各ループを並列に実行することができます。

Sample.cs
using System;
using System.Threading;
using System.Threading.Tasks;

class Program
{
    static void Main(string[] args)
    {
        int total = 0;
        Parallel.For(0, 10, x =>
        {
            Thread.Sleep(1000);
            Interlocked.Add(ref total, x);
        });
        Console.WriteLine(total);

        Console.ReadLine();
    }
}

for 文というと break を使うことがあると思います。
Parallel.For でも Break を使うことは可能です。
下記のようにします。

Sample.cs
using System;
using System.Threading;
using System.Threading.Tasks;

class Program
{
    static void Main(string[] args)
    {
        int total = 0;
        var result = Parallel.For(0, 100, (x, state) =>
        {
            Thread.Sleep(1000);
            if (x % 2 == 0)
            {
                state.Break();
            }
            Interlocked.Add(ref total, x);
        });
        Console.WriteLine(result.IsCompleted);  // false

        Console.ReadLine();
    }
}

stateParalellLoopState 型で、Break を呼び出すことにより、Parallel.For をできるだけ早く終了させるようにします。
resultParallelLoopResult 型で、IsComplete でループが全て完了したか途中で中断したかを取得できます。

紹介しませんが、foreach を並列処理する Parallel.ForEach もあります。
また、複数の Action を並列実行する Parallel.Invoke もあります。

Parallel LINQ

みんな大好き LINQ の並列実行版です。
AsParallel メソッドを挟むだけで並列化されます。

Sample.cs
using System;
using System.Linq;
using System.Threading;

class Program
{
    static void Main(string[] args)
    {
        Enumerable.Range(1, 10).AsParallel().Select(x => x * 10).ForEach(Console.WriteLine);

        Console.ReadLine();
    }
}

これですと、当然順序がバラバラになります。
順序を保持したいときは、AsOrdered メソッドを挟みます。

Sample.cs
using System;
using System.Linq;
using System.Threading;

class Program
{
    static void Main(string[] args)
    {
        Enumerable.Range(1, 10).AsParallel().AsOrdered().Select(x =>
        {
            Thread.Sleep(1000);
            return x * 10;
        })
        .ForEach(Console.WriteLine);

        Console.ReadLine();
    }
}

非同期から同期処理に切り替えるには、AsSequential メソッドを挟みます。

Sample.cs
using System;
using System.Linq;
using System.Threading;

class Program
{
    static void Main(string[] args)
    {
        Enumerable.Range(1, 10).AsParallel().Select(x =>
        {
            Thread.Sleep(1000);
            return x * 10;
        })
        .AsSequential().Select(x =>
        {
            Thread.Sleep(1000);
            return x * 10;
        })
        .ForEach(Console.WriteLine);

        Console.ReadLine();
    }
}

System.Threading の便利なクラス

Interlocked

基本型の簡単な演算をスレッドセーフに行なえます。
以下のようなメソッドがあります。

  • Read : 読み取り
  • Increment : インクリメント
  • Decrement : デクリメント
  • Add : 加算
  • Exchange : 交換
  • CompareExchange : 比較

ThreadLocal と AsyncLocal

ThreadLocal クラスは、各スレッドに対してローカルなストレージを提供します。
コンストラクタに Func を渡すと、その戻り値が ThreadLocal#Value を遅延初期化します。
また、コンストラクタに true を渡すと、全ローカル値の一覧である ThreadLocal#Values を取得できるようになります。
以下のように使います。

Sample.cs
using System;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

class Program
{
    static void Main(string[] args)
    {
        var threadLocal = new ThreadLocal<int>(() => Thread.CurrentThread.ManagedThreadId, true);
        Action action = () => Console.WriteLine(threadLocal.Value);
        Parallel.Invoke(action, action, action, action);

        var values = threadLocal.Values.Select(x => x.ToString()).Aggregate((x, y) => $"{x}, {y}");
        Console.WriteLine(values);

        Console.ReadLine();
    }
}

一方 AsyncLocal クラスは、現在のスレッドが await から復帰する際、スレッド ID が変化してしまっても同一スレッドとみなし、ストレージにアクセスできるようにしたものです。
以下に例を示します。

Sample.cs
using System;
using System.Threading;
using System.Threading.Tasks;

class Program
{
    static void Main(string[] args)
    {
        var asyncLocal = new AsyncLocal<int>();
        asyncLocal.Value = 10;
        Task.Run(async () =>
        {
            asyncLocal.Value = 100;
            Console.WriteLine($"Id : {Thread.CurrentThread.ManagedThreadId}, Value : {asyncLocal.Value}");
            await Task.Delay(1000);
            Console.WriteLine($"Id : {Thread.CurrentThread.ManagedThreadId}, Value : {asyncLocal.Value}");
        }).Wait();
        Console.WriteLine(asyncLocal.Value);

        Console.ReadLine();
    }
}
output
Id : 6, Value : 100
Id : 11, Value : 100
10

スレッド ID が変化しても値が保持されていることがわかります。

Mutex

所有権を持つスレッドだけが処理を実行し、他のスレッドは Mutex を所有できるまで待機する、といった動作を可能にします。
プロセス間の同期にも使えます。
Mutex#WaitOne により、Mutex を取得、または取得できるまで待機します。

Sample.cs
using System;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

class Program
{
    static void Main(string[] args)
    {
        var mutex = new Mutex();
        Func<int, Task> generateTask = x => Task.Run(() =>
        {
            Console.WriteLine($"Thread {x} is retaining Mutex.");
            mutex.WaitOne();
            Console.WriteLine($"Thread {x} retained Mutex.");
            Thread.Sleep(1000);
            mutex.ReleaseMutex();
            Console.WriteLine($"Thread {x} released Mutex.");
        });
        var tasks = Enumerable.Range(1, 3).Select(generateTask);
        Task.WhenAll(tasks).Wait();

        mutex.Dispose();

        Console.ReadLine();
    }
}
output
Thread 1 is retaining Mutex.
Thread 2 is retaining Mutex.
Thread 3 is retaining Mutex.
Thread 1 retained Mutex.
Thread 1 released Mutex.
Thread 3 retained Mutex.
Thread 3 released Mutex.
Thread 2 retained Mutex.
Thread 2 released Mutex.

Barrier

全ての参加スレッドが信号を発信するまで、参加スレッドが待機する、という動作を可能にします。
コンストラクタで参加者数を設定しておきます。
そして Barrier#SignalAndWait により、信号を発信して待機します。

using System;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

class Program
{
    static void Main(string[] args)
    {
        var barrier = new Barrier(3, b => Console.WriteLine(b.CurrentPhaseNumber));
        Func<int, Task> generateTask = x => Task.Run(() =>
        {
            Console.WriteLine($"Thread {x} : Phase {barrier.CurrentPhaseNumber}");
            Thread.Sleep(1000 * x);
            barrier.SignalAndWait();
            Console.WriteLine($"Thread {x} : Phase {barrier.CurrentPhaseNumber}");
            Thread.Sleep(1000 * x);
            barrier.SignalAndWait();
            Console.WriteLine($"Thread {x} : Phase {barrier.CurrentPhaseNumber}");
        });
        var tasks = Enumerable.Range(1, 3).Select(generateTask);
        Task.WhenAll(tasks).Wait();

        barrier.Dispose();

        Console.ReadLine();
    }
}
output
Thread 2 : Phase 0
Thread 3 : Phase 0
Thread 1 : Phase 0
0
Thread 1 : Phase 1
Thread 3 : Phase 1
Thread 2 : Phase 1
1
Thread 2 : Phase 2
Thread 3 : Phase 2
Thread 1 : Phase 2

CountdownEvent

保持しているカウント値が 0 になるまで待機する動作を可能にします。
コンストラクタでカウント値を与え、CountdownEvent#Signal によりカウントをデクリメントします。

Sample.cs
using System;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

class Program
{
    static void Main(string[] args)
    {
        var countdown = new CountdownEvent(3);
        Func<int, Task> generateTask = x => Task.Run(() =>
        {
            Thread.Sleep(1000 * x);
            Console.WriteLine($"Thread {x} is signaling.");
            countdown.Signal();
            countdown.Wait();
            Console.WriteLine($"Thread {x} completed.");
        });
        var tasks = Enumerable.Range(1, 3).Select(generateTask);
        Task.WhenAll(tasks).Wait();

        countdown.Dispose();

        Console.ReadLine();
    }
}
output
Thread 1 is signaling.
Thread 2 is signaling.
Thread 3 is signaling.
Thread 3 completed.
Thread 2 completed.
Thread 1 completed.

SemaphoreSlim

Mutex に似ていますが、こちらは同時に複数のスレッドが動作できる点が異なります。
コンストラクタで動作可能なスレッドの数を指定します。
現在の動作可能スレッド数は、CurrentCount で取得できます。
Wait または WaitAsync を呼び出すと、CurrentCount0 なら待機し、0 でなくなったら CurrentCount をデクリメントして待機を抜けます。
Release メソッドは、一度に複数のカウントをリリースできます。

Sample.cs
using System;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

class Program
{
    static void Main(string[] args)
    {
        var semaphore = new SemaphoreSlim(0);
        Func<int, Task> generateTask = x => Task.Run(async () =>
        {
            Console.WriteLine($"Thread {x} is waiting.");
            await semaphore.WaitAsync();
            Console.WriteLine($"Thread {x} is working.");
            Thread.Sleep(1000);
            semaphore.Release(2);
        });
        var tasks = Enumerable.Range(1, 3).Select(generateTask);
        var whenTask = Task.WhenAll(tasks);
        Thread.Sleep(1000);
        semaphore.Release();
        whenTask.Wait();

        semaphore.Dispose();

        Console.ReadLine();
    }
}
output
Thread 2 is waiting.
Thread 3 is waiting.
Thread 1 is waiting.
Thread 2 is working.
Thread 3 is working.
Thread 1 is working.

ManualResetEventSlim

信号が Set 状態であれば待機せず、Reset 状態であれば待機するという動作を可能にします。
信号の設定は Set および Reset メソッドを使用します。
複数のスレッドを交互に動かす、などができます。

Sample.cs
using System;
using System.Threading;
using System.Threading.Tasks;

class Program
{
    static void Main(string[] args)
    {
        var resetEvent = new ManualResetEventSlim();

        var task1 = Task.Run(() =>
        {
            Console.WriteLine("Thread 1 reset.");
            resetEvent.Reset();
            resetEvent.Wait();
            Thread.Sleep(1000);
            Console.WriteLine("Thread 1 set.");
            resetEvent.Set();
            Console.WriteLine("Thread 1 completed.");
        });
        var task2 = Task.Run(() =>
        {
            Thread.Sleep(1000);
            Console.WriteLine("Thread 2 set.");
            resetEvent.Set();
            Console.WriteLine("Thread 2 reset.");
            resetEvent.Reset();
            resetEvent.Wait();
            Console.WriteLine("Thread 2 completed.");
        });
        Task.WhenAll(task1, task2).Wait();

        resetEvent.Dispose();

        Console.ReadLine();
    }
}
output
Thread 1 reset.
Thread 2 set.
Thread 2 reset.
Thread 1 set.
Thread 1 completed.
Thread 2 completed.

ReaderWriterLockSlim

データベースのような読み取り・書き込み用ロックを実現できます。
読み取りは EnterReadLock および ExitReadLock、書き込みは EnterWriteLock および ExitWriteLock です。

Sample.cs
using System;
using System.Threading;
using System.Threading.Tasks;

class Program
{
    ReaderWriterLockSlim _rwLock = new ReaderWriterLockSlim();

    volatile int _value1 = 0;
    int Value1
    {
        get
        {
            _rwLock.EnterReadLock();
            try
            {
                Thread.Sleep(1000);
                return _value1;
            }
            finally
            {
                _rwLock.ExitReadLock();
            }
        }

        set
        {
            _rwLock.EnterWriteLock();
            try
            {
                Thread.Sleep(3000);
                _value1 = value;
            }
            finally
            {
                _rwLock.ExitWriteLock();
            }
        }
    }

    static void Main(string[] args)
    {
        var p = new Program();

        var task1 = Task.Run(() =>
        {
            Console.WriteLine(p.Value1);
        });
        var task2 = Task.Run(() =>
        {
            Console.WriteLine(p.Value1);
        });
        Task.WhenAll(task1, task2).Wait();

        task1 = Task.Run(() =>
        {
            Thread.Sleep(500);
            Console.WriteLine(p.Value1);
        });
        task2 = Task.Run(() =>
        {
            p.Value1 = 10;
        });
        Task.WhenAll(task1, task2).Wait();

        Console.ReadLine();
    }
}

終わりに

System.Threading.Tasks.Dataflow については、不勉強のため今回は紹介を見送りました。
また、System.Reactive でも非同期に関する面白い機能がありますが、今回は割愛です。
長々と失礼しました。