Palallel.ForEach
には Func<T, Task>
を渡せるオーバーロードが存在しないため、内部で非同期メソッドを呼び出す場合は困ってしまう。Result
とれって?
Parallel.ForEach(Enumerable.Range(0, 10), new ParallelOptions { MaxDegreeOfParallelism = 3 },
async i => // async void になるから完了を待機しない!
{
Console.WriteLine(await httpClient.GetStringAsync($"http://sample.com/?hoge={i}"));
});
同時実行数の制御が不要で、比較的少量のシーケンスに対して非同期処理して待機するだけなら Select
して Task.WhenAll
で事足りるのですが、同時実行数を制御したくなってくると SemaphoreSlim
で独自に色々やらないといけなくなってきたりで面倒(この辺りは記事最後のリンク記事を参照)。
ということで TPL Dataflow を使うとスマートに解決できるらしいけど、いつも書き方を忘れてしまうのでメモ。
NuGet から Microsoft.Tpl.Dataflow をインストール。
Install-Package Microsoft.Tpl.Dataflow
using System.Threading.Tasks.Dataflow;
して以下の様なコードを書く。
public static class ParallelEx
{
public static async Task ForEachAsync<T>(this IEnumerable<T> source, Func<T, Task> action, int maxDegreeOfParallelism)
{
var block = new ActionBlock<T>(action, new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = maxDegreeOfParallelism,
});
foreach (var item in source)
{
await block.SendAsync(item);
}
block.Complete(); // これ以上は Send しないよ、的なメッセージ
await block.Completion; // 完了を待機
}
}
CancellationToken
を使うとか TaskScheduler
を指定するとかお好みで。私は情弱なのでよくわかりません。
ExecutionDataflowBlockOptions
ごと受け取れるようにしておくと無難かもしれない。
こうしておくと、呼び出し側で以下のように呼び出せる。
await Enumerable.Range(0, 10).ForEachAsync(async i =>
{
Console.WriteLine(await httpClient.GetStringAsync($"http://sample.com/?hoge={i}"));
}, maxDegreeOfParallelism: 3);
関係あるかもしれない記事