記憶力の悪い自分のために、C# における非同期処理の書き方を記述しておきたいと思います。
専門家ではないので、不出来な内容でもあしからず。
目次
- Task
- async / await
- 並列処理
- System.Threading の便利なクラス
Task
皆さんおなじみの、非同期操作を表すクラスです。
生成および実行の方法が何通りかあります。
Task.Factory.StartNew
次のようにして Task
を生成、実行します。
using System;
using System.Threading.Tasks;
class Program
{
static void Main(string[] args)
{
var task = Task.Factory.StartNew(() => Console.WriteLine("OK"));
Console.ReadLine();
}
}
上記コードは、下記と同じ意味のようです。
using System;
using System.Threading;
using System.Threading.Tasks;
class Program
{
static void Main(string[] args)
{
Task.Factory.StartNew(
() => Console.WriteLine("OK"),
CancellationToken.None,
TaskCreationOptions.None,
TaskScheduler.Default);
Console.ReadLine();
}
}
TaskCreationOptions
については、次節で触れます。
TaskScheduler.Default
は、ThreadPool
を使用してスケジューリングするという意味になります。
Task.Run
StartNew
より記述が短いですね。
using System;
using System.Threading.Tasks;
class Program
{
static void Main(string[] args)
{
Task.Run(() => Console.WriteLine("OK"));
Console.ReadLine();
}
}
上記コードは、下記と同じ意味です。
using System;
using System.Threading;
using System.Threading.Tasks;
class Program
{
static void Main(string[] args)
{
Task.Factory.StartNew(
() => Console.WriteLine("OK"),
CancellationToken.None,
TaskCreationOptions.DenyChildAttach,
TaskScheduler.Default);
Console.ReadLine();
}
}
前節の StartNew
との違いは、第 3 引数の TaskCreationOptions.DenyChildAttach
の部分です。
Run
は子スレッドに親へのアタッチを禁止します。
前節の StartNew
は禁止しません。
親スレッドへのアタッチは、StartNew
メソッドに TaskCreationOptions.AttachedToParent
を指定することで実現できます。
実際の動作を比較してみましょう。
StartNew
の場合。
using System;
using System.Threading;
using System.Threading.Tasks;
class Program
{
static void Main(string[] args)
{
Task.Factory.StartNew(() =>
{
Task.Factory.StartNew(() =>
{
Thread.Sleep(1000);
Console.WriteLine("Child");
}, TaskCreationOptions.AttachedToParent);
}).Wait();
Console.WriteLine("Parent");
Console.ReadLine();
}
}
Child
Parent
Run
の場合。
using System;
using System.Threading;
using System.Threading.Tasks;
class Program
{
static void Main(string[] args)
{
Task.Run(() =>
{
Task.Factory.StartNew(() =>
{
Thread.Sleep(1000);
Console.WriteLine("Child");
}, TaskCreationOptions.AttachedToParent);
}).Wait();
Console.WriteLine("Parent");
Console.ReadLine();
}
}
Parent
Child
Run
の場合は、子スレッドの終了を待たずに親スレッドが終了していることがわかります。
Run
の子スレッドは親にアタッチできていないということですね。
new Task
コンストラクタから生成する場合です。
Start
メソッドで起動されます。
using System;
using System.Threading.Tasks;
class Program
{
static void Main(string[] args)
{
var task = new Task(() => Console.WriteLine("OK"));
task.Start();
Console.ReadLine();
}
}
この場合、Task
は TaskScheduler.Current
というものを使用してスケジューリングされます。
TaskScheduler
TaskScheduler
について少し見てみましょう。
TaskScheduler
は、Task
の実行を管理する役割を持つクラスです。
現在の TaskScheduler
オブジェクトは、TaskScheduler.Current
によって取得できます。
既定では、TaskScheduler.Default
という、ThreadPool
を使ってスケジューリングするものが設定されています。
Task.Factory.StartNew
の他、前節の Start
メソッドにも TaskScheduler
を指定することが可能です。
下記は、TaskScheduler.Current
の値を調べるサンプルです。
using System;
using System.Threading;
using System.Threading.Tasks;
class Program
{
static void Main(string[] args)
{
Console.WriteLine(ReferenceEquals(TaskScheduler.Current, TaskScheduler.Default)); // true
SynchronizationContext.SetSynchronizationContext(new SynchronizationContext()); // 現在の SynchronizationContext を設定
var scheduler = TaskScheduler.FromCurrentSynchronizationContext(); // SynchronizationContext から TaskScheduler を生成
var task = new Task(() => Console.WriteLine(ReferenceEquals(TaskScheduler.Current, scheduler))); // true
task.Start(scheduler);
Console.ReadLine();
}
}
TaskScheduler.FromCurrentSynchronizationContext()
により、現在の SynchronizationContext
をもとにスケジューリングを行なう TaskScheduler
を生成できます。
SynchronizationContext
については、async / await の章で述べます。
Task の完了
Task#IsCompleted
となった Task
は、Task#Status
が次のいずれかに落ち着きます。
-
TaskStatus.RanToCompletion
: 正常終了した -
TaskStatus.Canceled
: キャンセルされた -
TaskStatus.Faulted
: 例外が発生した
正常終了した際、Task
が Generic 型であれば、処理結果の戻り値を Task#Result
により取得できます。
同様に、例外発生で完了した Task
であれば、Task#Exception
から例外を取得できます。
キャンセルを発生させる場合については次節で見てみることにしましょう。
なお、完了済み Task
を以下のようにして取得することが可能です。
-
Task.CompletedTask
: 正常終了したTask
-
Task.FromResult(t)
: 正常終了しt
を返すTask<T>
-
Task.FromCanceled(cancel)
: キャンセルされたTask
-
Task.FromExcception(exception)
: 例外が発生したTask
また、指定ミリ秒後に正常終了する Task
を Task.Delay
により取得できます。
Task のキャンセル
Task
のキャンセルには、CancellationToken
を使用します。
CancellationToken
は、まず CancellationTokenSource
を生成した上で、CancellationTokenSource#Token
より取得します。
キャンセルを要求するには CancellationTokenSource#Cancel
を呼びます。
Task
内でキャンセル要求を受理するには、CancellationToken#ThrowIfCancellationRequested
メソッドを呼び、キャンセル要求を受信している場合に OperationCanceledException
例外を発生させることで Task
を終了させます。
Task
は、OperationCanceledException
を確認すると、Task
をキャンセル状態に移行させます。
このとき、Task#Wait
、Task<T>#Result
、await
などの操作を行なうと、TaskCanceledException
をラップした AggregateException
が発生します。
また、Task
開始前に CancellationTokenSource#Cancel
が呼ばれた場合、Task
が開始と同時に終了するようにするために、Task.Run
などの引数として CancellationToken
を渡しておきましょう。
実際の例です。
using System;
using System.Threading;
using System.Threading.Tasks;
class Program
{
static void Main(string[] args)
{
var tokenSource = new CancellationTokenSource();
var token = tokenSource.Token;
var task = Task.Run(() =>
{
while (true)
{
token.ThrowIfCancellationRequested();
Console.WriteLine("Loop");
Thread.Sleep(1000);
}
}, token);
Thread.Sleep(3000);
tokenSource.Cancel();
try
{
task.Wait();
}
catch { }
Console.WriteLine(task.IsCanceled); // true
tokenSource.Dispose();
Console.ReadLine();
}
}
Wait と When
Task#Wait
を呼ぶと、現在のスレッドをブロックして Task
の完了を待ちます。
Task.WaitAll
は、渡された Task
配列に格納された全ての Task
が完了するまで、現在のスレッドをブロックします。
using System;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
class Program
{
static void Main(string[] args)
{
var tasks = Enumerable.Range(1, 10).Select(x => Task.Run(() => { Thread.Sleep(1000); Console.WriteLine(x); })).ToArray();
Task.WaitAll(tasks);
Console.ReadLine();
}
}
Task.WaitAny
は、全てではなくどれか 1 つの Task
の完了を待ちます。
Task#WaitXX
は現在のスレッドをブロックして Task
の完了を待ちますが、対して Task#WhenXX
は、渡された複数の Task
が完了するのを待つ Task
を生成します。
複数の Task
を 1 つにまとめる感じですね。
Task#WhenAll
は全ての Task
を、Task#WhenAny
はどれか 1 つの Task
を待ちます。
using System;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
class Program
{
static void Main(string[] args)
{
var tasks = Enumerable.Range(1, 10).Select(x => Task.Run(() => { Thread.Sleep(1000); return x; }));
var all = Task.WhenAll(tasks);
all.Result.ForEach(Console.WriteLine);
Console.ReadLine();
}
}
ここで使用している Enumerable.ForEach
は、System.Interactive
のものです。
ContinueWith
Task#ContinueWith
は、自身の Task
に付加される継続 Task
を生成します。
返される Task
は、元の Task
完了後に継続 Task
が実行される Task
です。
ContinueWith
の引数に指定するデリゲートには、元の Task
が渡されます。
using System;
using System.Threading.Tasks;
class Program
{
static void Main(string[] args)
{
Task.FromResult("OK")
.ContinueWith(task => Console.WriteLine(task.Result));
Console.ReadLine();
}
}
継続 Task
は、元の Task
と同じスレッドになるとは限らないことに注意してください。
ここでも TaskScheduler
が使用されています。
既定では TaskScheduler.Default
が使われます。
また、TaskScheduler
を使用せず、元の Task
と同じスレッドで継続 Task
を実行するには、TaskContinuationOptions.ExecuteSynchronously
を指定します。
この TaskContinuationOptions
ですが、指定することで、条件によって継続 Task
を実行するかどうかを決めることができます。
以下に例を示します。
- OnlyOnRanToCompletion : 正常終了した場合に継続する
- NotOnRanToCompletion : 正常終了しなかった場合に継続する
- OnlyOnCanceled : キャンセルされた場合に継続する
- OnlyOnFaulted : 例外が発生した場合に継続する
using System;
using System.Threading;
using System.Threading.Tasks;
class Program
{
static void Main(string[] args)
{
var tokenSource = new CancellationTokenSource();
tokenSource.Cancel();
Task.CompletedTask
.ContinueWith(task => Console.WriteLine("OK"), TaskContinuationOptions.OnlyOnRanToCompletion);
Task.FromCanceled(tokenSource.Token)
.ContinueWith(task => Console.WriteLine("NG"), TaskContinuationOptions.OnlyOnRanToCompletion);
Task.FromException(new Exception())
.ContinueWith(task => Console.WriteLine("NG"), TaskContinuationOptions.OnlyOnRanToCompletion);
Task.CompletedTask
.ContinueWith(task => Console.WriteLine("NG"), TaskContinuationOptions.NotOnRanToCompletion);
Task.FromCanceled(tokenSource.Token)
.ContinueWith(task => Console.WriteLine("OK"), TaskContinuationOptions.NotOnRanToCompletion);
Task.FromException(new Exception())
.ContinueWith(task => Console.WriteLine("OK"), TaskContinuationOptions.NotOnRanToCompletion);
Task.CompletedTask
.ContinueWith(task => Console.WriteLine("NG"), TaskContinuationOptions.OnlyOnCanceled);
Task.FromCanceled(tokenSource.Token)
.ContinueWith(task => Console.WriteLine("OK"), TaskContinuationOptions.OnlyOnCanceled);
Task.FromException(new Exception())
.ContinueWith(task => Console.WriteLine("NG"), TaskContinuationOptions.OnlyOnCanceled);
Task.CompletedTask
.ContinueWith(task => Console.WriteLine("NG"), TaskContinuationOptions.OnlyOnFaulted);
Task.FromCanceled(tokenSource.Token)
.ContinueWith(task => Console.WriteLine("NG"), TaskContinuationOptions.OnlyOnFaulted);
Task.FromException(new Exception())
.ContinueWith(task => Console.WriteLine("OK"), TaskContinuationOptions.OnlyOnFaulted);
Console.ReadLine();
}
}
TaskCompletionSource
TaskCompletionSource
は、何らかの結果を返す外部の(非同期)処理に対し、Task
によるアクセスを提供します。
非同期処理を記述する側と、非同期処理の結果を取得する側を、Task
によって仲介する感じですね。
非同期処理の結果を取得する側では、TaskCompletionSource#Task
で Task
を取得し、この Task
を使って待機したり結果を得たりすることができます。
非同期処理を記述する側では、以下のメソッドを使用して、TaskCompletionSource
に結果を通知します。
-
SetResult
: 正常終了 -
SetCanceled
: キャンセル -
SetException
: 例外発生
下記のコードは、ThreadPool
により生成したスレッドへのアクセスを TaskCompletionSource
によって管理している例です。
using System;
using System.Threading;
using System.Threading.Tasks;
class Program
{
enum Status
{
None,
Successful,
Canceled,
Failed,
}
TaskCompletionSource<int> _completionSource = new TaskCompletionSource<int>();
volatile Status _status = Status.None;
void RunAsync()
{
ThreadPool.QueueUserWorkItem(_ =>
{
while (true)
{
switch (_status)
{
case Status.Successful:
_completionSource.SetResult(10);
goto End;
case Status.Canceled:
_completionSource.SetCanceled();
goto End;
case Status.Failed:
_completionSource.SetException(new Exception());
goto End;
}
Thread.Sleep(100);
}
End:;
});
}
static void Main(string[] args)
{
var p = new Program();
p.RunAsync();
p._status = Status.Successful;
var task = p._completionSource.Task;
try
{
task.Wait();
}
catch { }
Console.WriteLine(task.Status);
Console.ReadLine();
}
}
async / await
async メソッド
async
メソッドは、最初の await
に達すると制御を返します。
using System;
using System.Threading.Tasks;
class Program
{
static void Main(string[] args)
{
var task = AsyncMethod();
Console.WriteLine("Started");
task.Wait();
Console.WriteLine("Completed");
Console.ReadLine();
}
static async Task AsyncMethod()
{
await Task.Delay(1000);
Console.WriteLine("AsyncMethod");
await Task.Delay(1000);
}
}
Started
AsyncMethod
Completed
await
でメソッドが返るため、呼び出し側のメソッドと待機した Task
が非同期に実行されている、という状況が生まれます。
await とスレッド
次は、await
が実行されたとき、および await
実行後に、各処理がどのスレッドで動作しているか見てみましょう。
スレッド ID を表示させてみます。
using System;
using System.Threading;
using System.Threading.Tasks;
class Program
{
static void Main(string[] args)
{
AsyncMethod();
Console.ReadLine();
}
static async void AsyncMethod()
{
Console.WriteLine(Thread.CurrentThread.ManagedThreadId);
await Task.Run(() => Console.WriteLine(Thread.CurrentThread.ManagedThreadId));
Console.WriteLine(Thread.CurrentThread.ManagedThreadId);
}
}
9
6
6
コンソールアプリケーションとして実行したところ、このような結果になりました。
最初は 9
というスレッドで実行されており、その後の Task.Run
で生成された Task
は 6
というスレッドになっています。
ここまでは普通ですね。
しかし、await
から復帰した部分のスレッドが 9
ではなく 6
になっています。
元のスレッドに戻らず、Task
のスレッドが使いまわされていますね。
では、GUI アプリケーションとして同様の処理を実行したらどうなるでしょう?
8
9
8
今度は先程とは対照的に、await
後の処理が最初と同じスレッドで実行されていることがわかります。
この現象には、SynchronizationContext
というものが関係しています。
SynchronizationContext
SynchronizationContext
は、await
から復帰するタイミングなど、スレッドが同期をとる際に必要となるコンテキストです。
コンソールアプリケーションでは、SynchronizationContext.Current
が null
になっています。
この場合、スレッドは ThreadPool
から割り当てられます。
よって、どのスレッドで実行されるかは特に保証がありません。
前述の例で、元のスレッドに復帰できなかったのは、SynchronizationContext.Current
が復帰のための知識を有していなかったのが理由です。
一方で、GUI アプリケーションでは、SynchronizationContext.Current
に同期コンテキストが設定されています。
Windows Forms の場合、同期コンテキストには System.Windows.Forms.WindowsFormsSynchronizationContext
が設定されます。
WPF の場合、System.Windows.Threading.DispatcherSynchronizationContext
が設定されます。
UWP の場合、System.Threading.WinRTSynchronizationContext
が設定されます。
これらの同期コンテキストが、await
から復帰するための情報を持っています。
従って、GUI アプリケーションの場合は、前述の例の通り await
後に元のスレッドへと復帰することができるのです。
Task.Yield
Task.Yield
メソッドを await
することにより、async
メソッドは制御を返し、await
自体は即座に待機を終了して同期コンテキストにより復帰する、という処理を記述することが可能です。
using System;
using System.Threading;
using System.Threading.Tasks;
class Program
{
static void Main(string[] args)
{
Console.WriteLine($"main : {Thread.CurrentThread.ManagedThreadId}");
AsyncMethod().Wait();
Console.ReadLine();
}
static async Task AsyncMethod()
{
await Task.Yield();
Console.WriteLine($"await : {Thread.CurrentThread.ManagedThreadId}");
}
}
main : 9
await : 10
await の内部実装
参考 : http://ufcpp.net/study/csharp/sp5_awaitable.html
上記ページによると、await
は以下のように動作しているそうです。
// http://ufcpp.net/study/csharp/sp5_awaitable.html より引用
state = State1; // 次に復帰するときのための状態の記録
var task = RunAsync();
var awaiter = task.GetAwaiter();
if (!awaiter.IsCompleted)
{
awaiter.OnCompleted(a); // タスクが未完の場合だけ、継続登録して一度 return
return;
}
case State1: // 次に呼ばれたときに続きから処理するためのラベル
var y = awaiter.GetReslt(); // タスクの結果を受け取り
awaiter = default(T); // ガベージ コレクションが働きやすくなるように null 代入
ポイントはこの部分です。
awaiter.OnCompleted(a);
a
というのは、await
が記述されているメソッド自身です。
Task
が未完のとき、自身のメソッドが OnCompleted(a)
により、継続として登録されます。
すると、Task
が完了した際に自身のメソッドが再度呼び出されますが、このときには状態 state
が State1
変化しており、case State1:
から処理が再開されるというわけです。
並列処理
並列処理を簡単に行なう方法として、System.Threading.Tasks.Parallel
と Parallel LINQ があります。
Parallel
Parallel.For
は、for
文のような繰り返し処理の各ループを並列に実行することができます。
using System;
using System.Threading;
using System.Threading.Tasks;
class Program
{
static void Main(string[] args)
{
int total = 0;
Parallel.For(0, 10, x =>
{
Thread.Sleep(1000);
Interlocked.Add(ref total, x);
});
Console.WriteLine(total);
Console.ReadLine();
}
}
for
文というと break
を使うことがあると思います。
Parallel.For
でも Break
を使うことは可能です。
下記のようにします。
using System;
using System.Threading;
using System.Threading.Tasks;
class Program
{
static void Main(string[] args)
{
int total = 0;
var result = Parallel.For(0, 100, (x, state) =>
{
Thread.Sleep(1000);
if (x % 2 == 0)
{
state.Break();
}
Interlocked.Add(ref total, x);
});
Console.WriteLine(result.IsCompleted); // false
Console.ReadLine();
}
}
state
は ParalellLoopState
型で、Break
を呼び出すことにより、Parallel.For
をできるだけ早く終了させるようにします。
result
は ParallelLoopResult
型で、IsComplete
でループが全て完了したか途中で中断したかを取得できます。
紹介しませんが、foreach
を並列処理する Parallel.ForEach
もあります。
また、複数の Action
を並列実行する Parallel.Invoke
もあります。
Parallel LINQ
みんな大好き LINQ の並列実行版です。
AsParallel
メソッドを挟むだけで並列化されます。
using System;
using System.Linq;
using System.Threading;
class Program
{
static void Main(string[] args)
{
Enumerable.Range(1, 10).AsParallel().Select(x => x * 10).ForEach(Console.WriteLine);
Console.ReadLine();
}
}
これですと、当然順序がバラバラになります。
順序を保持したいときは、AsOrdered
メソッドを挟みます。
using System;
using System.Linq;
using System.Threading;
class Program
{
static void Main(string[] args)
{
Enumerable.Range(1, 10).AsParallel().AsOrdered().Select(x =>
{
Thread.Sleep(1000);
return x * 10;
})
.ForEach(Console.WriteLine);
Console.ReadLine();
}
}
非同期から同期処理に切り替えるには、AsSequential
メソッドを挟みます。
using System;
using System.Linq;
using System.Threading;
class Program
{
static void Main(string[] args)
{
Enumerable.Range(1, 10).AsParallel().Select(x =>
{
Thread.Sleep(1000);
return x * 10;
})
.AsSequential().Select(x =>
{
Thread.Sleep(1000);
return x * 10;
})
.ForEach(Console.WriteLine);
Console.ReadLine();
}
}
System.Threading の便利なクラス
Interlocked
基本型の簡単な演算をスレッドセーフに行なえます。
以下のようなメソッドがあります。
-
Read
: 読み取り -
Increment
: インクリメント -
Decrement
: デクリメント -
Add
: 加算 -
Exchange
: 交換 -
CompareExchange
: 比較
ThreadLocal と AsyncLocal
ThreadLocal
クラスは、各スレッドに対してローカルなストレージを提供します。
コンストラクタに Func
を渡すと、その戻り値が ThreadLocal#Value
を遅延初期化します。
また、コンストラクタに true
を渡すと、全ローカル値の一覧である ThreadLocal#Values
を取得できるようになります。
以下のように使います。
using System;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
class Program
{
static void Main(string[] args)
{
var threadLocal = new ThreadLocal<int>(() => Thread.CurrentThread.ManagedThreadId, true);
Action action = () => Console.WriteLine(threadLocal.Value);
Parallel.Invoke(action, action, action, action);
var values = threadLocal.Values.Select(x => x.ToString()).Aggregate((x, y) => $"{x}, {y}");
Console.WriteLine(values);
Console.ReadLine();
}
}
一方 AsyncLocal
クラスは、現在のスレッドが await
から復帰する際、スレッド ID が変化してしまっても同一スレッドとみなし、ストレージにアクセスできるようにしたものです。
以下に例を示します。
using System;
using System.Threading;
using System.Threading.Tasks;
class Program
{
static void Main(string[] args)
{
var asyncLocal = new AsyncLocal<int>();
asyncLocal.Value = 10;
Task.Run(async () =>
{
asyncLocal.Value = 100;
Console.WriteLine($"Id : {Thread.CurrentThread.ManagedThreadId}, Value : {asyncLocal.Value}");
await Task.Delay(1000);
Console.WriteLine($"Id : {Thread.CurrentThread.ManagedThreadId}, Value : {asyncLocal.Value}");
}).Wait();
Console.WriteLine(asyncLocal.Value);
Console.ReadLine();
}
}
Id : 6, Value : 100
Id : 11, Value : 100
10
スレッド ID が変化しても値が保持されていることがわかります。
Mutex
所有権を持つスレッドだけが処理を実行し、他のスレッドは Mutex
を所有できるまで待機する、といった動作を可能にします。
プロセス間の同期にも使えます。
Mutex#WaitOne
により、Mutex
を取得、または取得できるまで待機します。
using System;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
class Program
{
static void Main(string[] args)
{
var mutex = new Mutex();
Func<int, Task> generateTask = x => Task.Run(() =>
{
Console.WriteLine($"Thread {x} is retaining Mutex.");
mutex.WaitOne();
Console.WriteLine($"Thread {x} retained Mutex.");
Thread.Sleep(1000);
mutex.ReleaseMutex();
Console.WriteLine($"Thread {x} released Mutex.");
});
var tasks = Enumerable.Range(1, 3).Select(generateTask);
Task.WhenAll(tasks).Wait();
mutex.Dispose();
Console.ReadLine();
}
}
Thread 1 is retaining Mutex.
Thread 2 is retaining Mutex.
Thread 3 is retaining Mutex.
Thread 1 retained Mutex.
Thread 1 released Mutex.
Thread 3 retained Mutex.
Thread 3 released Mutex.
Thread 2 retained Mutex.
Thread 2 released Mutex.
Barrier
全ての参加スレッドが信号を発信するまで、参加スレッドが待機する、という動作を可能にします。
コンストラクタで参加者数を設定しておきます。
そして Barrier#SignalAndWait
により、信号を発信して待機します。
using System;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
class Program
{
static void Main(string[] args)
{
var barrier = new Barrier(3, b => Console.WriteLine(b.CurrentPhaseNumber));
Func<int, Task> generateTask = x => Task.Run(() =>
{
Console.WriteLine($"Thread {x} : Phase {barrier.CurrentPhaseNumber}");
Thread.Sleep(1000 * x);
barrier.SignalAndWait();
Console.WriteLine($"Thread {x} : Phase {barrier.CurrentPhaseNumber}");
Thread.Sleep(1000 * x);
barrier.SignalAndWait();
Console.WriteLine($"Thread {x} : Phase {barrier.CurrentPhaseNumber}");
});
var tasks = Enumerable.Range(1, 3).Select(generateTask);
Task.WhenAll(tasks).Wait();
barrier.Dispose();
Console.ReadLine();
}
}
Thread 2 : Phase 0
Thread 3 : Phase 0
Thread 1 : Phase 0
0
Thread 1 : Phase 1
Thread 3 : Phase 1
Thread 2 : Phase 1
1
Thread 2 : Phase 2
Thread 3 : Phase 2
Thread 1 : Phase 2
CountdownEvent
保持しているカウント値が 0
になるまで待機する動作を可能にします。
コンストラクタでカウント値を与え、CountdownEvent#Signal
によりカウントをデクリメントします。
using System;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
class Program
{
static void Main(string[] args)
{
var countdown = new CountdownEvent(3);
Func<int, Task> generateTask = x => Task.Run(() =>
{
Thread.Sleep(1000 * x);
Console.WriteLine($"Thread {x} is signaling.");
countdown.Signal();
countdown.Wait();
Console.WriteLine($"Thread {x} completed.");
});
var tasks = Enumerable.Range(1, 3).Select(generateTask);
Task.WhenAll(tasks).Wait();
countdown.Dispose();
Console.ReadLine();
}
}
Thread 1 is signaling.
Thread 2 is signaling.
Thread 3 is signaling.
Thread 3 completed.
Thread 2 completed.
Thread 1 completed.
SemaphoreSlim
Mutex
に似ていますが、こちらは同時に複数のスレッドが動作できる点が異なります。
コンストラクタで動作可能なスレッドの数を指定します。
現在の動作可能スレッド数は、CurrentCount
で取得できます。
Wait
または WaitAsync
を呼び出すと、CurrentCount
が 0
なら待機し、0
でなくなったら CurrentCount
をデクリメントして待機を抜けます。
Release
メソッドは、一度に複数のカウントをリリースできます。
using System;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
class Program
{
static void Main(string[] args)
{
var semaphore = new SemaphoreSlim(0);
Func<int, Task> generateTask = x => Task.Run(async () =>
{
Console.WriteLine($"Thread {x} is waiting.");
await semaphore.WaitAsync();
Console.WriteLine($"Thread {x} is working.");
Thread.Sleep(1000);
semaphore.Release(2);
});
var tasks = Enumerable.Range(1, 3).Select(generateTask);
var whenTask = Task.WhenAll(tasks);
Thread.Sleep(1000);
semaphore.Release();
whenTask.Wait();
semaphore.Dispose();
Console.ReadLine();
}
}
Thread 2 is waiting.
Thread 3 is waiting.
Thread 1 is waiting.
Thread 2 is working.
Thread 3 is working.
Thread 1 is working.
ManualResetEventSlim
信号が Set 状態であれば待機せず、Reset 状態であれば待機するという動作を可能にします。
信号の設定は Set
および Reset
メソッドを使用します。
複数のスレッドを交互に動かす、などができます。
using System;
using System.Threading;
using System.Threading.Tasks;
class Program
{
static void Main(string[] args)
{
var resetEvent = new ManualResetEventSlim();
var task1 = Task.Run(() =>
{
Console.WriteLine("Thread 1 reset.");
resetEvent.Reset();
resetEvent.Wait();
Thread.Sleep(1000);
Console.WriteLine("Thread 1 set.");
resetEvent.Set();
Console.WriteLine("Thread 1 completed.");
});
var task2 = Task.Run(() =>
{
Thread.Sleep(1000);
Console.WriteLine("Thread 2 set.");
resetEvent.Set();
Console.WriteLine("Thread 2 reset.");
resetEvent.Reset();
resetEvent.Wait();
Console.WriteLine("Thread 2 completed.");
});
Task.WhenAll(task1, task2).Wait();
resetEvent.Dispose();
Console.ReadLine();
}
}
Thread 1 reset.
Thread 2 set.
Thread 2 reset.
Thread 1 set.
Thread 1 completed.
Thread 2 completed.
ReaderWriterLockSlim
データベースのような読み取り・書き込み用ロックを実現できます。
読み取りは EnterReadLock
および ExitReadLock
、書き込みは EnterWriteLock
および ExitWriteLock
です。
using System;
using System.Threading;
using System.Threading.Tasks;
class Program
{
ReaderWriterLockSlim _rwLock = new ReaderWriterLockSlim();
volatile int _value1 = 0;
int Value1
{
get
{
_rwLock.EnterReadLock();
try
{
Thread.Sleep(1000);
return _value1;
}
finally
{
_rwLock.ExitReadLock();
}
}
set
{
_rwLock.EnterWriteLock();
try
{
Thread.Sleep(3000);
_value1 = value;
}
finally
{
_rwLock.ExitWriteLock();
}
}
}
static void Main(string[] args)
{
var p = new Program();
var task1 = Task.Run(() =>
{
Console.WriteLine(p.Value1);
});
var task2 = Task.Run(() =>
{
Console.WriteLine(p.Value1);
});
Task.WhenAll(task1, task2).Wait();
task1 = Task.Run(() =>
{
Thread.Sleep(500);
Console.WriteLine(p.Value1);
});
task2 = Task.Run(() =>
{
p.Value1 = 10;
});
Task.WhenAll(task1, task2).Wait();
Console.ReadLine();
}
}
終わりに
System.Threading.Tasks.Dataflow
については、不勉強のため今回は紹介を見送りました。
また、System.Reactive
でも非同期に関する面白い機能がありますが、今回は割愛です。
長々と失礼しました。