かずき師匠にいつもたすけてもらっているが、今日も自分で書いたコードをかずき師匠にレビューしてもらった。自分で学んだ新しいこともあるし、ここに記録していつでも実行できるようにしておきたい。
お題
こんな感じの Extension メソッドがあり、これがあると、同時実行数を制御しながら並列実行できる。ところが、お客様の要件としては、重い処理の大半が並列作業だが、順序実行が必要な軽い処理がある。どうするか?というお題を解く必要があった。
        public static async Task ParallelForEachAsync<T>(this IReadOnlyList<T> items, int maxConcurrency, Func<T, Task> action)
        {
            using (var semaphore = new SemaphoreSlim(maxConcurrency))
            {
                var tasks = new Task[items.Count];
                for (int i = 0; i < items.Count; i++)
                {
                    tasks[i] = InvokeThrottledAction(items[i], action, semaphore);
                }
                await Task.WhenAll(tasks);
            }
        }
        static async Task InvokeThrottledAction<T>(T item, Func<T, Task> action, SemaphoreSlim semaphore)
        {
            await semaphore.WaitAsync();
            try
            {
                await action(item);
            }
            finally
            {
                semaphore.Release();
            }
        }
使用イメージ
            col.ParallelForEachAsync<int>(10, async x =>
            {
                   // Do something heavy
                   // Do something related ordering
            }
1. 最初の作戦:インデックスをつける
最初に考えた方法はこんな感じ。ループにインデックスがあり、それをキーにして、ConcurrentDictionary にぶち込んであとで、そこから順序実行に必要な成果物を取得すればいいんじゃね?と思って書いたコード。こいつはくせぇ。げろ以下のにおいがプンプンしやがるぜ。
        public static async  Task ExecuteAsync() 
        { 
            List<ValueTuple<int, int>> list = Enumerable.Range(1, 30).Select((element, index) =>                          { return (index: index, value: element); }).ToList(); 
            IReadOnlyList<ValueTuple<int,int>> col = new ReadOnlyCollection<ValueTuple<int,int>>(list); 
            var dictionary = new ConcurrentDictionary<int, DateTime>(); 
            // 実行順序は保証できない 
            await col.ParallelForEachAsync(10, async (x) => 
            { 
                Console.WriteLine($"{x.Item1}: Accepted."); 
                await Task.Delay(TimeSpan.FromSeconds(5)); 
                // 成果物があれば、ディクショナリに詰める 
                var dateTime = DateTime.Now; 
                dictionary.TryAdd(x.Item1, dateTime); 
                Console.WriteLine($"{x.Item1}: Done by {dateTime}"); 
             }); 
            Console.WriteLine("---ordered"); 
            // 順番は保証されている。 
            foreach ( var elm  in col) 
            { 
                DateTime result = DateTime.Now; 
                dictionary.TryGetValue(elm.Item1, out result); 
                Console.WriteLine($"index : {elm.Item1} time: {result} "); 
            } 
        } 
しかし、三流プログラマの私はこれを書くだけでも調べる必要があり、例えば、Tuple の書き方ってどんなんだっけ?とか、インデックスをふるLinq はどんなんだっけ?という感じ
Tuple
Tuple の書き方はラベル付きで
var tuple = (name: "ushio", age 47);
という感じ。ラベルなしでもかけて
var tuple = ("ushio", 47);
ラベルなしの場合は、tuple.Item1 tuple.Item2 みたいな感じでアクセスします。
という感じ。ちなみに、Tuple のジェネリクスの型を調べてみると、VS で見るとこんな型で行けそう。
ValueTuple<T, R>
ちなみに、ValueTuple 型以外にも Tuple 型があります。こちらに、比較を書いている人がいました。
インデックス付きにする
通常のリストを Index 付きにする方法は、Linq の Select が使えました。
List<ValueTuple<int, int>> list = Enumerable.Range(1, 30).Select((element, index) =>                          { return (index: index, value: element); }).ToList(); 
Select の引数の関数にindex の第二引数をつけるだけです。これは楽ちん。
スレッドセーフなコレクション
スレッドセーフなコレクションには、キュー、スタック、バック(順番性保証なし)などがありますが、今回は Dictionary をチョイスしてみました。
            var dictionary = new ConcurrentDictionary<int, DateTime>();
                          : 
                dictionary.TryAdd(x.Item1, dateTime); 
                          :
                dictionary.TryGetValue(elm.Item1, out result); 
out
アウトのパラメータがTryGetValue ででてきます。
これでしっかり思った通りの動作をするのですが、あきらかにコードがきな臭いです。何がきな臭いか?というと、index の付与の部分ですが、こんなんせんでも、インデックスをExtension の中で書けばええんちゃうの?
というわけで、書き直してみました。
2. インデックス付き(エクステンションメソッド)
        public static async Task IndexedParallelForEachAsync<T>(this IReadOnlyList<T> items, int maxConcurrency, Func<T, int, Task> action)
        {
            using (var semaphore = new SemaphoreSlim(maxConcurrency))
            {
                var tasks = new Task[items.Count];
                for (int i = 0; i < items.Count; i++)
                {
                    tasks[i] = InvokeIndexedThrottledAction(i, items[i], action, semaphore);
                }
                await Task.WhenAll(tasks);
            }
        }
        static async Task InvokeIndexedThrottledAction<T>(int index, T item, Func<T, int, Task> action, SemaphoreSlim semaphore)
        {
            await semaphore.WaitAsync();
            try
            {
                await action(item, index);
            }
            finally
            {
                semaphore.Release();
            }
        }
これで、渡された関数に、インデックスがわたるので、思うがままです。
FUNC 関数
渡される関数の部分ですが、
Func<T, int, Task>
にしました。最後の一つのみが戻り値で、それ以外が引数です。Funnc の仕様に関してはこちらがいい感じ。
使用イメージ
        public static async  Task ExecuteAsync()
        {
            List<int> list = Enumerable.Range(1, 30).ToList();
            IReadOnlyList<int> col = new ReadOnlyCollection<int>(list);
            var dictionary = new ConcurrentDictionary<int, ValueTuple<int,DateTime>>();
            // 実行順序は保証できない
            await col.IndexedParallelForEachAsync(10, async (x, index) =>
            {
                Console.WriteLine($"{index}: Accepted.");
                await Task.Delay(TimeSpan.FromSeconds(5));
                // 成果物があれば、ディクショナリに詰める
                var dateTime = DateTime.Now;
                ValueTuple<int, DateTime> artifact = (x, dateTime);
                dictionary.TryAdd(index, artifact);
                Console.WriteLine($"{index}: Done by {artifact.Item2}");
           });
            Console.WriteLine("---ordered");
            // 順番は保証されている。
            foreach ( var elm  in col.Select((item, index) => new { item, index }))
            {
                ValueTuple<int, DateTime> result = default(ValueTuple<int, DateTime>);
                dictionary.TryGetValue(elm.index, out result);
                Console.WriteLine($"index : {elm.index} x: {elm.item} time: {result.Item2} ");
            }
        }
    }
ここで悩みました。ちゃんと動作するのですが、きな臭いです。特にメソッド名が。IndexedParallelForEachAsync(...) は観たことないから絶対にこんな名前にするべきではない。だから、かずきせんせいにレビューをお願いしました。すると斜め上の回答が返ってきました。
かずき師匠の意見
IndexedParallelForEachAsync の中で処理の戻り値をリストかなんかに入れればいいんじゃないんですかねぇ?
そら、そうだよね、なんか途中でコンカレントディクショナリに入れてそれをまたループさせてとってとかダサすぎです。orz ダサさに気づいていませんでした。というわけで案3にトライです。
3. 並列処理の戻り値を返却するように改造
メソッドの名前は同じですが、ジェネリクスの型が違うので共存できます。
        public static async Task<IEnumerable<R>> ParallelForEachAsync<T,R>(this IReadOnlyList<T> items, int maxConcurrency, Func<T, Task<R>> action)
        {
            using (var semaphore = new SemaphoreSlim(maxConcurrency))
            {
                var tasks = new Task<R>[items.Count];
                for (int i = 0; i < items.Count; i++)
                {
                    tasks[i] = InvokeThrottledAction<T,R>(items[i], action, semaphore);
                }
                return await Task.WhenAll<R>(tasks.AsEnumerable());
            }
        }
        static async Task<R> InvokeThrottledAction<T, R>(T item, Func<T, Task<R>> action, SemaphoreSlim semaphore)
        {
            await semaphore.WaitAsync();
            try
            {
                return await action(item);
            }
            finally
            {
                semaphore.Release();
            }
        }
メソッドシグネチャを変更して、戻り値と二つの型を返すようにしています。ここで戻り値は
Task<Enumerable<R>>
にしています。Rが戻り値の型で、そのEnumerableをTaskにくるんで返します。すると、async メソッドの中で、IEnumerable が返ってくるはずなので、後続の処理をLinq でかけたりもするでしょう。
使用イメージ
Linq でつなげて書いてもよかったですが、お客さんにわかりやすいように2つのパートに分けて実施。かなりすっきりしました。
        public static async  Task ExecuteAsync()
        {
            List<int> list = Enumerable.Range(1, 30).ToList();
            IReadOnlyList<int> col = new ReadOnlyCollection<int>(list);
            // 実行順序は保証できない
            var artifacts = await col.ParallelForEachAsync<int, ValueTuple<int, DateTime>>(10, async (x) =>
            {
                Console.WriteLine($"{x}: Accepted.");
                await Task.Delay(TimeSpan.FromSeconds(5));
                // 成果物があれば、ディクショナリに詰める
                var dateTime = DateTime.Now;
                ValueTuple<int, DateTime> artifact = (x, dateTime);
                Console.WriteLine($"{x}: Done by {artifact.Item2}");
                return artifact;
                
            });
            Console.WriteLine("---ordered");
            // 順番は保証されている。
            foreach ( var elm  in artifacts)
            {
                Console.WriteLine($"x: {elm.Item1} time: {elm.Item2} ");
            }
        }
最後に
ちなみに、すっきりして、こんな感じでやりましたーと報告したら、かずきさん曰く
あと、こう書けるかもしれませんね
return Task.WhenAll(items.Select(x => InvokeThrottledActionWithReturnValue(x, action, semaphore));
おお、ということは、エクステンションメソッドの最初のメソッドがワンライナーになるやん!さすがでございます。
ちなみに、以前かずきさんはエクステンションメソッドを最初にみたときに、RX ならこんな感じでかけるのでは?ともいっていました。
rxが使えるなら引数受け取るMergeでできるかもしれないですね
http://neue.cc/2011/09/17_343.html
師匠がいるのは本当にありがたいことです。
リソース
今回のソース。コミットで過去の経緯が見れます。