LoginSignup
7

More than 5 years have passed since last update.

TPL Dataflow を用いて最大並列数を制限した非同期 foreach を実行する

Last updated at Posted at 2016-05-08

Palallel.ForEach には Func<T, Task> を渡せるオーバーロードが存在しないため、内部で非同期メソッドを呼び出す場合は困ってしまう。Result とれって?

Parallel.ForEach(Enumerable.Range(0, 10), new ParallelOptions { MaxDegreeOfParallelism = 3 },
                 async i => // async void になるから完了を待機しない!
{
    Console.WriteLine(await httpClient.GetStringAsync($"http://sample.com/?hoge={i}"));
});

同時実行数の制御が不要で、比較的少量のシーケンスに対して非同期処理して待機するだけなら Select して Task.WhenAll で事足りるのですが、同時実行数を制御したくなってくると SemaphoreSlim で独自に色々やらないといけなくなってきたりで面倒(この辺りは記事最後のリンク記事を参照)。

ということで TPL Dataflow を使うとスマートに解決できるらしいけど、いつも書き方を忘れてしまうのでメモ。

NuGet から Microsoft.Tpl.Dataflow をインストール。

Install-Package Microsoft.Tpl.Dataflow

using System.Threading.Tasks.Dataflow; して以下の様なコードを書く。

public static class ParallelEx
{
    public static async Task ForEachAsync<T>(this IEnumerable<T> source, Func<T, Task> action, int maxDegreeOfParallelism)
    {
        var block = new ActionBlock<T>(action, new ExecutionDataflowBlockOptions
        {
            MaxDegreeOfParallelism = maxDegreeOfParallelism,
        });

        foreach (var item in source)
        {
            await block.SendAsync(item);
        }

        block.Complete(); // これ以上は Send しないよ、的なメッセージ

        await block.Completion; // 完了を待機
    }
}

CancellationToken を使うとか TaskScheduler を指定するとかお好みで。私は情弱なのでよくわかりません。
ExecutionDataflowBlockOptions ごと受け取れるようにしておくと無難かもしれない。

こうしておくと、呼び出し側で以下のように呼び出せる。

await Enumerable.Range(0, 10).ForEachAsync(async i =>
{
    Console.WriteLine(await httpClient.GetStringAsync($"http://sample.com/?hoge={i}"));

}, maxDegreeOfParallelism: 3);

関係あるかもしれない記事

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
7