Edited at

(C#)Parallel.For, Parallel.ForEach並列処理の挙動確認


はじめに

重たい処理をTaskにしたり、Parallel.For, Parallel.ForEachしたり良くするのですが、細かい挙動が期待通りにならなかったりします。そのた度に実験コードを書いたりしていたのですが、結局実験コードを書くのに時間がかかってしまって本末転倒になりがちです。

そこで、自身用に並列処理の挙動をメモします。新しい例を作る度に追記していきます。


サンプルプログラム

以下のようなお試しプログラムをWinFormで作成しました。昔のデフラグをイメージしました。

app.png

・num は処理するタスク数。小さな■の数です。一つのタスクは初期状態を緑として、灰→青→黄→緑 の順に遷移します。

・error% は指定した確率でエラー判定を起こします。エラーの時、黄→赤と遷移します。本記事ではただ色が違うだけでそれ以外の意味を持ちません。

・taskTime 一つの状態遷移にかかる時間です。毎回Rand() % taskTime 分待機を行います。500の場合は0~500ms待機します。ただし、灰→青は初期化処理を想定し待機時間無しとします。


タスク実行部分

Task.Run()でタスクを実行しています。

実験用アプリなのでエラーチェックは割愛しています。

private async void button1_Click(object sender, EventArgs e)

{
Text = "start";
taskNumItem = int.Parse(textBoxNItem.Text);
taskTime = int.Parse(textBoxTaskTime.Text);
errorRate = int.Parse(textBoxErrorRate.Text);
CreateTask(taskNumItem);//■を並べる
DateTime startTime = DateTime.Now;//時間の測定
![task.zip.jpg](https://qiita-image-store.s3.amazonaws.com/0/98713/124b516d-f94f-2eb5-17e7-c1552197fca4.jpeg)
await Task.Run(() => WorkerThread1(taskNumItem,taskTime));
DateTime endTime = DateTime.Now;//時間の測定

Text = $"finish:{(endTime - startTime).TotalMilliseconds}ms";//かかった時間の表示
}


Case 1 Parallel.For

コードは下記の通り。Sleepでランダム時間(0~taskTime)待機しているだけです。

Parallel.Forで並列化しました。

    private string WorkerThread1(int num, int taskTime)

{
Random rnd = new Random();
int progressCount = 0;
// 時間のかかる処理
Parallel.For(0, num, i =>
{
Control control = flowLayoutPanel1.Controls[i];
changeColor(control, Color.DodgerBlue);//初期化
System.Threading.Thread.Sleep(rnd.Next(taskTime));
changeColor(control, Color.Yellow);//処理中
System.Threading.Thread.Sleep(rnd.Next(taskTime));
Color c = Color.Lime;
if (rnd.Next(100) < errorRate)//超シンプルな確率判定
c = Color.Red;//エラー色
changeColor(control, c);//完了
lock (Thread.CurrentContext)
progressCount++;
reportProgress(progressCount, num);//進捗%更新
});

// このメソッドからの戻り値
return "全て完了";
}

結果は下記のとおりです。オレンジとか変な色が出ているのはgif圧縮によるものです。

task1.gif


Case 2 Parallel.ForEach

Parallel.ForEachで並列化しました。

    private string WorkerThread2(int num, int taskTime)

{
Random rnd = new Random();
int progressCount = 0;
// 時間のかかる処理
var source = flowLayoutPanel1.Controls.Cast<Control>();
Parallel.ForEach(source, control =>
{
changeColor(control, Color.DodgerBlue);//初期化
System.Threading.Thread.Sleep(rnd.Next(taskTime));
changeColor(control, Color.Yellow);//処理中
System.Threading.Thread.Sleep(rnd.Next(taskTime));
Color c = Color.Lime;
if (rnd.Next(100) < errorRate)//超シンプルな確率判定
c = Color.Red;//エラー色
changeColor(control, c);//完了
lock (Thread.CurrentContext)
progressCount++;
reportProgress(progressCount, num);//進捗%更新
});

// このメソッドからの戻り値
return "全て完了";
}

task2.gif


Case 3 Parallel.For + awaitでSleep

Parallel.Forで並列化、Sleep部分はawaitで待機しました。

    private string WorkerThread3(int num, int taskTime)

{
Random rnd = new Random();
int progressCount = 0;
// 時間のかかる処理
Parallel.For(0, num, async i =>
{
Control control = flowLayoutPanel1.Controls[i];
changeColor(control, Color.DodgerBlue);//初期化
await Task.Run(() => System.Threading.Thread.Sleep(rnd.Next(taskTime)));
changeColor(control, Color.Yellow);//処理中
await Task.Run(() => System.Threading.Thread.Sleep(rnd.Next(taskTime)));
Color c = Color.Lime;
if (rnd.Next(100) < errorRate)//超シンプルな確率判定
c = Color.Red;//エラー色
changeColor(control, c);//完了
lock (Thread.CurrentContext)
progressCount++;
reportProgress(progressCount, num);//進捗%更新
});

// このメソッドからの戻り値
return "全て完了";
}

task3.gif

Parallel.For自体は一瞬で完了し、メインスレッド(UI)に戻りました。その後Sleepのタスクが完了しています。

awaitなので

さらに、個々のタスクは最後に登録した物から実行されています。


Case 4 Parallel.ForEach + awaitでSleep

Parallel.ForEachで並列化、Sleep部分はawaitで待機しました。

    private string WorkerThread4(int num, int taskTime)

{
Random rnd = new Random();
int progressCount = 0;
var source = flowLayoutPanel1.Controls.Cast<Control>();
// 時間のかかる処理
Parallel.ForEach(source, async control =>
{
changeColor(control, Color.DodgerBlue);//初期化
await Task.Run(() => System.Threading.Thread.Sleep(rnd.Next(taskTime)));
changeColor(control, Color.Yellow);//処理中
await Task.Run(() => System.Threading.Thread.Sleep(rnd.Next(taskTime)));
Color c = Color.Lime;
if (rnd.Next(100) < errorRate)//超シンプルな確率判定
c = Color.Red;//エラー色
changeColor(control, c);//完了
lock (Thread.CurrentContext)
progressCount++;
reportProgress(progressCount, num);//進捗%更新
});

// このメソッドからの戻り値
return "全て完了";
}

task4.gif

Parallel.Forと同じくForEach自体は一瞬で完了し、メインスレッド(UI)に戻りました。その後Sleepのタスクが完了しています。

Forの時と違うのは、Sleepタスクの実行順に偏りがあることです。


Case 5&6 Parallel.For(Each) + タスク作成後にTask.WhenAll()

Case3, 4は挙動がCase1,2と違ってしまっているため、一旦TaskをListに詰めてからTask.WhenAllしてみました。

(修正 19.1.14)Task生成部分がParallelになっていたのを通常のfor/foreachに変更。

    private Task WorkerThread5(int num, int taskTime)

{
Random rnd = new Random();
int progressCount = 0;
Task<Task> taskList = new Task<Task>();
// 時間のかかる処理
for (int i = 0; i < num; i++)
{
var list = panelList.ToArray();//Clone()
taskList.Add(Task.Run(() =>
{
MyPanel panel = list[i]
changeColor(panel , Color.DodgerBlue);//初期化
System.Threading.Thread.Sleep(rnd.Next(taskTime));
changeColor(panel , Color.Yellow);//処理中
System.Threading.Thread.Sleep(rnd.Next(taskTime));
Color c = Color.Lime;
if (rnd.Next(100) < errorRate)//超シンプルな確率判定
c = Color.Red;//エラー色
changeColor(panel , c);//完了
lock (Thread.CurrentContext)
progressCount++;
reportProgress(progressCount, num);//進捗%更新
}));
}
Task.WhenAll(taskList);
}

private Task WorkerThread6(int num, int taskTime)
{
Random rnd = new Random();
int progressCount = 0;
var list = panelList.ToArray();//Clone
Task<Task> taskList = new Task<Task>();
// 時間のかかる処理
foreach (MyPanel panel in list)
{
taskList.Add( Task.Run(() =>
{
changeColor(panel , Color.DodgerBlue);//初期化
System.Threading.Thread.Sleep(rnd.Next(taskTime));
changeColor(panel , Color.Yellow);//処理中
System.Threading.Thread.Sleep(rnd.Next(taskTime));
Color c = Color.Lime;
if (rnd.Next(100) < errorRate)//超シンプルな確率判定
c = Color.Red;//エラー色
changeColor(panel , c);//完了
lock (Thread.CurrentContext)
progressCount++;
reportProgress(progressCount, num);//進捗%更新
}));
}
// このメソッドからの戻り値
return Task.WhenAll(taskList.ToArray());
}
}

左がCase5, 右がCase6です。青→黄→緑の処理順になりました。

task5.giftask6.gif


Case 7&8 Parallel.For(Each) + タスク作成後にTask.WaitAll()

Case5&6のTask.WaitAll版です。

task7.giftask8.gif


比較

gifだと表示タイミングが合わない...

Case
for
foreach

1,2
通常
task1.gif
task2.gif

3,4
await
task3.gif
task4.gif

5,6
Task前生成
+
WhenAll
task5.gif
task6.gif

7,8
Task前生成
+
WaitAll
task7.gif
task8.gif


終わりに

公平に分配されるのはParallel.Forのようです。動作も感覚通りといった感じです。 分配については1ループの中身を.NETのPartitionerが判断して決定します。 ForEachは動かし方で挙動が結構変わるので、今回のようにテストをしてから使用したほうが良いと思います。思わぬハマり所になる~可能性があります~。 ハマりました。ハマったところを後述します。

Case5~8のように、一旦Listに詰めてから完了を待つ場合は特に挙動の差は生まれませんでした

並列化部分はちょっといじるだけで挙動はどんどん変化しますので、今回のサンプルは自分の勉強に使えそうな気がします。

また、実行環境によっても挙動は変化する可能性があります。重要なのは今回のようなサンプルを都度作りながら動きを確認することだと思います。

今回は8つのケースを作ってみました。場当たり的に作成したので無意味なケースもあるかもしれません。他にも思いついたら情報を足していこうと思います。

Reactive Extensions、TPL Dataflowもこんな感じで可視化出来たら面白そう。

monoではどんな動きするんだろう。

今回のコード(VS2015)(保存した後拡張子のjpgを消してください。)

task.zip.jpg



(追記)Parallel.ForEachでハマった話

地図サイトからある地点の周辺地図を取得する際、中心点から周辺へ徐々に読み込んでいく処理をParallel.ForEachで実装したところ、上手くいきませんでした。

処理の流れとしては

・各タイルに中心点からの距離を格納

・距離が近い順にソート

・近い順に並列処理で読み込んでいく

実行したところ、以下例のように等分割されました。(分かりやすいように2並列)

taskmap1.gif

ForEachは内部でPartitionerという部分でこの分割を制御しており、Parallel.ForEachの引数でPartitionerを指定することができます。PartiionerにはNoBufferingとNoneがあり、デフォルトはNoneで最も最適な処理となるように分割されます。

しかし、今回のようなケースではアプリケーションの意図した分割ではないので、NoBufferingを指定してみます。


Partionerの指定

var list = panelList.ToArray();//何らかのリスト

var partitioner = Partitioner.Create<MyPanel>(list, EnumerablePartitionerOptions.NoBuffering);
Parallel.ForEach(partitioner,new ParallelOptions() { MaxDegreeOfParallelism = 2}, panel =>
{
//何らかの処理
}

上記のPartitioner指定でParallel.ForEachしたところ、以下のように配列の並び順を重視した並列化となりました。

taskmap2.gif


まだある

await Task.Delay

最近だと、Sleep()は使用せずに、await Task.Delay()の使用をよく使用すると思います。

SleepとTask.Delayでも挙動が変わる模様。

後日更新予定。


19.08.13 タイトルをTask.For, Task.ForEach→Parallel.For, Parallel.ForEach に修正