#何がしたいのか
LINQのSelectの操作に、重い処理が入ってしまっている場合、残念ながらParalell.ForEach使っても重いSelectで詰まってしまって並列化があんまりうまく行かない
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
namespace TestBench
{
internal class Program
{
internal static void Main()
{
IEnumerable<int> sequence = Enumerable.Range(0, 10)
.Select(x =>
{
//重たい処理の代用
if (x%2 == 0) Thread.Sleep(1000);
return x + 100;
});
Console.WriteLine("Use foreach");
Stopwatch chrono = new Stopwatch();
chrono.Start();
foreach (var i in sequence)
{
Console.WriteLine(i);
}
chrono.Stop();
Console.WriteLine(chrono.Elapsed);
Console.WriteLine();
Console.WriteLine("Use Parallel.ForEach");
chrono.Restart();
Parallel.ForEach(sequence, x => Console.WriteLine(x));
chrono.Stop();
Console.WriteLine(chrono.Elapsed);
}
}
}
かかった時間は以下の通り
Use foreach
00:00:05.0069716
Use Parallel.ForEach
00:00:05.0657797
これは、Paralell.ForEachでは、bodyで渡した処理を並列化することでパフォーマンスを向上させるのに、その上流にあるSelectメソッドの処理がボトルネックになってしまって完全にスポイルされている形になっている。
こいつを何とかして高速化できないかってのが、今回のお題
#その1:重たい処理をParalell.Foreach側に持って行く
コレは非常にわかりやすい例。重たい処理を並列化の内方に書いてしまえば良いじゃない!って考え方
書き換えたコードは以下の通り。
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
namespace TestBench
{
internal class Program
{
internal static void Main()
{
Func<int, int> process = x =>
{
if (x%2 == 0) Thread.Sleep(1000);
return x + 100;
};
IEnumerable<int> sequence = Enumerable.Range(0, 10)
.Select(x => process(x));
Console.WriteLine("Use foreach");
Stopwatch chrono = new Stopwatch();
chrono.Start();
foreach (var i in sequence)
{
Console.WriteLine(i);
}
chrono.Stop();
Console.WriteLine(chrono.Elapsed);
Console.WriteLine();
Console.WriteLine("Use Parallel.ForEach");
chrono.Restart();
Parallel.ForEach(Enumerable.Range(0,10), x =>
{
Console.WriteLine(process(x));
});
chrono.Stop();
Console.WriteLine(chrono.Elapsed);
}
}
}
実行結果は以下の通り。
Use foreach
100
101
102
103
104
105
106
107
108
109
00:00:05.0067171
Use Parallel.ForEach
103
105
101
109
107
100
106
104
102
108
00:00:01.0559463
並列化が効いて、高速化が図られている反面、投入したシーケンスの順序と、出力されるシーケンスの順序が一致しないことになるので、この点は注意が必要。
#その2:Selectの中身をTaskにくるんでしまう
先の例では、Parallel.ForEachの方へ、重い処理を移動させることで並列化のうまみを引き出していた。
けれど、状況によってはSelectの中身を移動できないこともある。このような場合は、以下のようにSelectの処理そのものをTaskにしてしまって、処理するのも有りじゃないかと。
但し、せっかくTaskにしても、以下のように書いてしまうとすっごく残念な結果になる。
using System;
using System.Collections;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
namespace TestBench
{
internal class Program
{
internal static void Main()
{
IEnumerable<Task<int>> sequence = Enumerable.Range(0, 10)
.Select(async x =>
{
if (x%2 == 0) await Task.Delay(1000);
return x + 100;
} );
Console.WriteLine("Use foreach");
Stopwatch chrono = new Stopwatch();
chrono.Start();
foreach (var task in sequence)
{
task.Wait();
Console.WriteLine(task.Result);
}
chrono.Stop();
}
}
}
実行結果は以下の通り
Use foreach
100
101
102
103
104
105
106
107
108
109
00:00:05.0224940
ちょっと考えれば当たり前で、せっかくTaskにしたのに、foreachの中で、逐次待機をしてしまって、うまみが完全にスポイルされている。
##その2-1:配列とかリストにしてしまって、一括待機する
逐次待機でだめなら、どこかでプリロードして一括待機すれば良いんじゃね?ってアプローチが以下の事例。
using System;
using System.Collections;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
namespace TestBench
{
internal class Program
{
internal static void Main()
{
IEnumerable<Task<int>> sequence = Enumerable.Range(0, 10)
.Select(async x =>
{
if (x%2 == 0) await Task.Delay(1000);
return x + 100;
} );
Stopwatch chrono = new Stopwatch();
chrono.Start();
var array = sequence.ToArray();
Task.WaitAll(array);
foreach (var task in array)
{
Console.WriteLine(task.Result);
}
chrono.Stop();
Console.WriteLine(chrono.Elapsed);
}
}
}
実行結果は以下の通り
100
101
102
103
104
105
106
107
108
109
00:00:01.0605011
きちんと並列化されてるし、しかも順序が一致していてコレはコレでとても使いやすい方法だと思う。
但し、一度ToArrayで配列化しているので、要素数が膨大であったり、無限の場合、この方法は使えなくなってしまう。
##その2-2:Paralellの中で待機する
要素数のサイズがでかすぎてToArrayするコトが出来ない場合、Paralell.ForEachの中で待機してしまうのも一つの方法だと思う。
using System;
using System.Collections;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
namespace TestBench
{
internal class Program
{
internal static void Main()
{
IEnumerable<Task<int>> sequence = Enumerable.Range(0, 10)
.Select(async x =>
{
if (x%2 == 0) await Task.Delay(1000);
return x + 100;
} );
Stopwatch chrono = new Stopwatch();
chrono.Start();
Parallel.ForEach(sequence, x =>
{
x.Wait();
Console.WriteLine(x.Result);
});
chrono.Stop();
Console.WriteLine(chrono.Elapsed);
}
}
}
実行結果は以下の通り
101
105
109
103
107
108
102
104
106
100
00:00:01.1083322
順序が不一致なのは完了した順序でForEachのbodyを処理しているため。
プリロードできない事情があり、順不同でかまわないのであれば、これでもいいと思う。
##余談:Selectを複数回適用する方法
SelectでTask化したモノを、さらにSelectしたい場合、以下のようにするのがスマートじゃないかなと思った。
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
namespace TestBench
{
internal class Program
{
internal static void Main()
{
IEnumerable<Task<int>> sequence = Enumerable.Range(0, 10)
.Select(async x =>
{
if (x%2 == 0) await Task.Delay(1000);
return x + 100;
} );
IEnumerable<Task<string>> stringSeq = sequence.Select(x => x.ContinueWith(y => y.Result.ToString()));
Parallel.ForEach(stringSeq, x =>
{
x.Wait();
Console.WriteLine(x.Result);
});
}
}
}