Task, Parallel, PLINQの使い方集。ついでに.NETの非同期APIの歴史。
Task
Taskの使い方
同期コードをTaskを使って非同期コードにする
同期バージョン(ボタンを押した後ダウンロードが終わるまで画面が固まる)
private void button1_Click(object sender, EventArgs e)
{
WebClient wc = new WebClient();
var html = wc.DownloadString("http://www.google.co.jp/");
textBox1.Text = html;
}
非同期バージョン(画面が固まらない)
private async void button1_Click(object sender, EventArgs e)
{
WebClient wc = new WebClient();
var html = await wc.DownloadStringTaskAsync("http://www.google.co.jp/");
textBox1.Text = html;
}
- メソッドに async を付ける。
- TaskAsyncメソッドを使い、await を付ける。
- メソッドの戻り値を Task にする ※上記のコードはイベントハンドラーなので仕方なくvoidのまま
非同期Taskを並行実行する
複数のWebサイトから同時にダウンロードするサンプル。
async Task ConcurrentDownload()
{
var urls = new[]{
"http://www.amazon.com/",
"http://www.apple.com/",
"http://www.facebook.com/",
"http://www.google.com/",
"http://www.microsoft.com/",
"http://www.twitter.com/",
};
var hc = new HttpClient();
var downloadTasks = urls.Select(url => hc.GetStringAsync(url));
string[] htmls = await Task.WhenAll(downloadTasks);
}
最大同時並行数を抑えたい場合は、Semaphoreの仕組みが便利。
async Task ConcurrentDownloadThrottle()
{
var urls = new[]{
"http://www.amazon.com/",
"http://www.apple.com/",
"http://www.facebook.com/",
"http://www.google.com/",
"http://www.microsoft.com/",
"http://www.twitter.com/",
};
var hc = new HttpClient();
var sem = new SemaphoreSlim(2); // 最大同時実行数:2
var downloadTasks = urls.Select(async url =>
{
await sem.WaitAsync();
Debug.WriteLine($"Start: {url}");
try
{
return await hc.GetStringAsync(url);
}
finally
{
Debug.WriteLine($"Completed: {url}");
sem.Release();
}
});
string[] htmls = await Task.WhenAll(downloadTasks);
}
Taskの待ち方
C# Taskの待ちかた集 に書いた
Taskの作り方
CPU主体の処理
CPU主体の処理には Task.Run() が便利。
Task task = Task.Run(() =>
{
HeavyWork();
);
IO待ち主体の処理
IO主体の処理には別の手段が用意されている。※Task.Run() を使うとスレッドプールを有効活用できなくなる
/// 時刻 time になったら起こしてくれる Task
public Task<string> WakeOnTime(DateTime time)
{
if (time < DateTime.Now)
{
// すぐに終わらせられる(完了している)なら Task.FromResult() を使う
return Task.FromResult("起きろ!");
}
// 待ち時間が発生するようなタスクには TaskCompletionSource<string> を使う
var tcs = new TaskCompletionSource<string>();
Timer timer = null;
timer = new Timer(delegate
{
timer.Dispose();
tcs.TrySetResult("起きて");
});
int waitMilliseconds = (int)(time - DateTime.Now).TotalMilliseconds;
timer.Change(waitMilliseconds, Timeout.Infinite);
return tcs.Task;
}
ほかの便利なもの:
- Task.FromException() や Task.FromCanceled() もある
- TaskCompletionSource<TResult>も同じように SetCanceled() や SetException() がある
- 「完了しているタスク」Task.CompletedTask なんてのがあったり
Parallel
CPU主体の処理を並列実行したいときに有用。
Parallel.Invoke(
new ParallelOptions() { MaxDegreeOfParallelism = 4 }, // 最大同時並列数:4
HeavyWorkA, // Invoke の引数は Action
() => { HeavyWorkB(100); }, // メソッドがActionでない場合は置き換えればよい
() =>
{
// ここに直接処理を書いても良い
int n = 400;
HeavyWorkC(n);
}
);
上記のコードを実行すると、HeavyWorkA,B,Cが同時並行に動く。
※CPUのコア数やスレッドプールの空き状況によっては、同時に動かないこともある。
なお、Parallel クラスには Parallel.Invoke のほかに、Parallel.For や Parallel.ForEach もある。また、第一引数の ParallelOptions は省略可能。
ParallelEnumerable (PLINQ)
LINQで並列処理ができる。
// ファイルのハッシュ値を計算するプログラム
var files = Directory.GetFiles(Environment.SystemDirectory, "*.exe");
var filehash = files
.Select(f => new { File = f, Data = File.ReadAllBytes(f) }) // A
.AsParallel()
.WithDegreeOfParallelism(4) // 最大同時並列数:4
.Select(f => new { File = f.File, Hash = SHA256.Create().ComputeHash(f.Data)}) // B
.ToArray();
PLINQではAsParallel()以降のコードが並列化される。上記のコードでは、A(ファイル読み)はシングルスレッドで実行され、B(ハッシュ値計算)はマルチスレッドで実行される。
なお、AsParallel() のあと のWithDegreeOfParallelism() は省略可能。
例外処理
TaskやParallelの中で発生した例外は AggregateException としてcatchできる。
try
{
Parallel.Invoke(
() => throw new ArgumentException(),
() => throw new InvalidOperationException(),
() => throw new FormatException()
);
}
catch (AggregateException ae)
{
// 原因となった Exception を知りたい場合は
// Flatten() や InnerExceptions が便利
var exceptions = ae.Flatten().InnerExceptions;
foreach (var ex in exceptions)
{
Debug.WriteLine(ex.GetType());
}
}
ただし Task を await する場合は元の Exception で catch できる。
try
{
var addresses = await Dns.GetHostAddressesAsync("example.jp");
}
catch(SocketException se)
{
Debug.WriteLine(se.ToString());
}
Taskの中で発生した(catch されてない)例外をまとめて処理したいときは TaskScheduler.UnobservedTaskException
TaskScheduler.UnobservedTaskException += (sender, e) =>
{
AggregateException ae = e.Exception;
Debug.WriteLine(ae.ToString());
e.SetObserved(); // .NET Framework 4.0だとこれをしないとアプリが死ぬようになってた
};
非同期パターンの歴史
.NETには古い非同期APIも残されている。間違って使ってしまうことがないよう、簡単に紹介しておく。
詳細は MSDN:Asynchronous Programming Patterns で。
.NET Framework 1.0時代:Asynchronous Programming Model (APM)
Beginなんとか~Endなんとか。使うな
var fs = new FileStream(@"test.jpg", FileMode.Open);
byte[] buf = new byte[65536];
fs.BeginRead(buf, 0, buf.Length,
(ar) =>
{
int readbytes = fs.EndRead(ar);
buf.Dump();
}, null);
.NET Framework 2.0~3.5時代:Event-based Asynchronous Pattern (EAP)
なんとかCompletedイベントとか。使うな
WebClient wc = new WebClient();
wc.DownloadStringCompleted += (sender, args) =>
{
var html = args.Result;
Console.WriteLine(html);
};
wc.DownloadStringAsync(new Uri("http://www.google.co.jp/"));
.NET Framework 4.5以降:Task-based Asynchronous Pattern (TAP)
async/await. 今から始めるならこれを使えばよい。 上2つのコードはTAPではこうなる。
var fs = new FileStream(@"test.jpg", FileMode.Open);
byte[] buf = new byte[65536];
int readbytes = await fs.ReadAsync(buf, 0, buf.Length);
string html = await wc.DownloadDataTaskAsync("http://www.google.co.jp/");
Console.WriteLine(html);