概要
C#のTaskの同時実行つまりTaskSchedulerは、スレッドプールを使って環境や状況に合わせた同時実行数を自動で決めて効率よく実行してくれます。とはいえ、「この処理は負荷を下げたいので、最大2つの同時実行にしたい」など手動で細かい制御をしたい場合も有ると思います。しかし意外とこれを簡単に実現するプロパティなどはなく、TaskSchedulerを継承する必要があったので、そのやり方をまとめます。
最初に結論まとめ
LimitedConcurrencyLevelTaskSchedulerクラスを作りました。これで、次のようにTaskFactoryを作成したら、あとはいつも通りにTaskをStartNewに渡すだけで、同時実行数上限を制限しつつTaskが使えます。
var limitedTaskScheduler = new LimitedConcurrencyLevelTaskScheduler(2);//最大同時実行数2
var limitedTaskFactory = new TaskFactory(limitedTaskScheduler);
limitedTaskFactory.StartNew(() =>
{
//Taskの処理
});
LimitedConcurrencyLevelTaskSchedulerクラスがどんなものかというと、MS LearnのTaskScheduler クラスにサンプルがあります。
しかし本当にただサンプルが置いてあるだけで説明がコメントしか無く、必要最低限の処理なのかどうか、このままのコードを実用で使って良いものか、などがどうにも不安です。自分なりに理解して書き直し、テストコードも足したものを次のGitHubリポジトリに置きました。この記事はこのコードの解説です。
説明
MS LearnのTaskScheduler クラスのサンプルの通り、TaskSchedulerクラスを継承して作っていきます。LimitedConcurrencyLevelTaskSchedulerという名前にしました。
public class LimitedConcurrencyLevelTaskScheduler : TaskScheduler
この後の説明は、クラス継承なのでどうしても断片的なコードになりますが、どうやら全体としてやっていることは次のような発想のようです。
コンストラクタで同時実行数を受け取れるようにして、保持しておきます。これはこのクラスの内部処理に使うものなので、コンストラクタでなくても構いません。
public LimitedConcurrencyLevelTaskScheduler(int maxDegreeOfParallelism){
_maxDegreeOfParallelism = maxDegreeOfParallelism;
}
保持した値は、プロパティをオーバライドして返せるようにします。
public override int MaximumConcurrencyLevel => _maxDegreeOfParallelism;
続けて、メソッドをオーバーライドして動作を書き換えていきます。まずは要となる、Taskを追加した時の動作です。
まず、Taskのリストは自分で管理する必要があるため、キューに入れます。この後の処理もまとめて振り分ける必要があるので、排他ロックします。
private readonly LinkedList<Task> _tasks = new();
private readonly object _tasksLock = new();
protected override void QueueTask(Task task)
{
lock (_tasksLock)
{
_tasks.AddLast(task);
続けて、同時実行数の上限未満であれば、Taskの処理用スレッドをスレッドプールで実行します。
ここで作ったスレッドは、Taskのキューが空になるまでは1つずつキューから取り出してスレッドプールで処理を続ける作りにしています。(そのため1つの処理用「スレッド」ではありませんが、一連の処理ということでそういう表現をしています)
1つずつ取り出して処理をしていくスレッドなので、このスレッドの数がそのまま同時実行数となります。なので、「現時点で同時実行数の上限未満であれば、このスレッドを新規実行する」という処理で、同時実行数をコントロールできるわけです。
if (_delegatesQueuedOrRunning < _maxDegreeOfParallelism)
{
_delegatesQueuedOrRunning++;
NotifyThreadPoolOfPendingWork();
private void NotifyThreadPoolOfPendingWork()
{
ThreadPool.UnsafeQueueUserWorkItem(_ =>
{
Task item;
lock (_tasksLock)
{
if (_tasks.Count == 0)
{
_delegatesQueuedOrRunning--;
return;
}
item = _tasks.First!.Value;
_tasks.RemoveFirst();
}
base.TryExecuteTask(item);
NotifyThreadPoolOfPendingWork();
}, null);
Taskキューが空になるまではNotifyThreadPoolOfPendingWork()を呼び出し続け、空になったら終了しています。大半はそれを実現するためのコードで、Taskの実行自体は TryExecuteTask
の1行だけです。ここは動きを変更する必要が無いので、継承元のメソッドをそのまま呼んでいます。
要となる部分は以上で、あとは必要な処理をそれに合わせてオーバーライドしていくだけです。まず、Taskをすぐに実行する必要がある場合に呼ばれるTryExecuteTaskInlineメソッド。これは、同時実行数未満であればすぐに実行し、そうでない場合は実行不可のfalseを返します。この時、すでにキューに入ったTaskであれば、キューからも取り出します。
protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
{
if (_delegatesQueuedOrRunning < _maxDegreeOfParallelism)
{
if (taskWasPreviouslyQueued)
{
if (TryDequeue(task))
{
return base.TryExecuteTask(task);
}
else
{
return false;
}
}
else
{
return base.TryExecuteTask(task);
}
}
else
{
return false;
}
TryDequeueはそのまんまで、独自のキューからの取り出しを試みます。
protected override bool TryDequeue(Task task)
{
lock (_tasksLock)
{
return _tasks.Remove(task);
}
}
GetScheduledTasksは排他ロックを取得してTaskのリストを返します。ロックが取れなかった場合、ちょっと不思議なのですがAPI仕様に従ってNotSupportedExceptionを返します。
protected override IEnumerable<Task>? GetScheduledTasks()
{
bool lockTaken = false;
try
{
Monitor.TryEnter(_tasksLock, ref lockTaken);
if (lockTaken)
{
return _tasks.ToArray();
}
else
{
throw new NotSupportedException();
}
}
finally
{
if (lockTaken) Monitor.Exit(_tasksLock);
}
}
上で貼ったのと同じものですが、全てのコードはこちらです。
まとめ
以上で完成です!プロパティ1つ設定して済ませるわけには行かないのでちょっと面倒ですが、そんなに複雑な実装でもなかったと思います。全体のコードや動きを見たい人は、GitHubのリポジトリを見てみてください。