改善の余地だらけの最低限の実装ですが、効率とお行儀が悪いだけでちゃんと動きます。
(.NET Core 3.1 ≒ .NET Standard 2.1 で動作確認)
※ 👇 .NET 9 で追加された Task.WhenEach
とは
つかいかた&結果
次項記載のソースコードを導入すると以下の要領で実行できるようになる。
await foreach (var task in WhenEachAsync(tasks))
{
// 終了したタスクに対する処理
}
// または
await foreach (var task in new WhenEachEnumerator<Task<int>>(tasks))
{
// 終了したタスクに対する処理
}
サンプルを実行すると、ジョブが流すログ(No.*
)の順番が毎回変わり、続く foreach
が流すログ(-> *
)で完了したタスクを正しく受け取れていることを確認できる。
= WhenEachAsync ===
No.1: 129 ms
-> 1: 130.0023
No.3: 265 ms
-> 3: 266.0412
No.2: 304 ms
-> 2: 304.298
= WhenEachEnumerator ===
No.2: 77 ms
-> 2: 77.6754
No.3: 122 ms
-> 3: 122.3983
No.1: 297 ms
-> 1: 298.128
ソースコード&サンプル
- 動作確認環境: https://dotnetfiddle.net/
- TODO: アロケーション回避は面倒なだけでやれば出来る(ハズ)
- 👉 構造体のコンストラクターで処理を委譲する参照型をプールから取得 →
DisposeAsync
でプールに返却する等
- 👉 構造体のコンストラクターで処理を委譲する参照型をプールから取得 →
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
#nullable enable
public static class Program
{
// TODO: new WhenEachEnumerator(tasks) で型推論が働かない? なぜ??
// NOTE: struct にするとエラー。MoveNextAsync を readonly に出来ないから呼び出し毎にコピーが起きるのが原因
class WhenEachEnumerator<T>
: IAsyncEnumerator<T>
, IAsyncEnumerable<T> // await foreach はダックタイピングなので実装しなくても可
where T : Task
{
readonly T[] tasks;
byte remaining; // 構造体のサイズ削減の為
public WhenEachEnumerator(T[] tasks)
{
// TODO: MoveNextAsync() で渡された配列を直接弄っちゃう状態なので ArrayPool にコピーして操作、
// DisposeAsync() で借りた配列を返却する等に変えたほうが良い
this.tasks = tasks ?? throw new ArgumentNullException(nameof(tasks));
this.remaining = checked((byte)tasks.Length); // 雑な範囲チェック
}
// NOTE: 引数無しのオーバーロードがあれば IAsyncEnumerable<T> を実装しなくても動く
public IAsyncEnumerator<T> GetAsyncEnumerator() => this;
public IAsyncEnumerator<T> GetAsyncEnumerator(CancellationToken _) => this; // TODO: トークン確認する
public async ValueTask DisposeAsync() => tasks.AsSpan().Clear(); // 参照型の ArrayPool は中身を空にしないとリーク
public T Current => tasks[remaining]; // 完了したタスクを末尾に移動して Current で返す(構造体のサイズ削減の為)
// 終了済みタスクのインデックスを配列にため込んで毎ループ Contains でチェックするというのは効率悪いと思われる
// 型を T?[] にして tasks[i]?.IsCompleted == true でチェックする手もあるが、それなら並び替えで良くね感
public async ValueTask<bool> MoveNextAsync()
{
T? result = null;
var remaining = this.remaining;
while (remaining != 0)
{
// TODO: C# 13.0 なら Span<T> が使える
for (int i = 0; i < remaining; i++)
{
result = tasks[i];
if (result.IsCompleted)
{
remaining--;
if (i != remaining)
{
tasks[i] = tasks[remaining];
tasks[remaining] = result;
}
this.remaining = remaining;
return true;
}
}
await Task.Yield();
}
return false;
}
}
// メソッド版。パラメーターを直接並べ替える行儀の悪さ
// ※ IAsyncEnumerator<T> が暗黙的に生成される(sharplab.io で確認できる)
static async IAsyncEnumerable<T> WhenEachAsync<T>(params T[]? tasks) where T : Task
{
int remaining = tasks?.Length ?? 0;
if (remaining == 0)
yield break;
T? result = null;
while (remaining != 0)
{
// TODO: C# 13.0 なら Span<T> が使える。。。??
for (int i = 0; i < remaining; i++)
{
result = tasks[i];
if (result.IsCompleted)
{
if (i != remaining - 1)
{
tasks[i] = tasks[remaining - 1];
}
remaining--;
yield return result;
}
}
await Task.Yield();
}
}
// テスト
public static async Task Main()
{
await DoIt(true);
await DoIt(false);
}
static async Task DoIt(bool methodOrEnumerator)
{
const int COUNT = 3;
var tasks = new Task<int>[COUNT];
for (int i = 0; i < COUNT; i++)
{
tasks[i] = SimpleJob(i + 1);
}
var start = Stopwatch.GetTimestamp();
if (methodOrEnumerator)
{
Console.WriteLine("= WhenEachAsync ===");
await foreach (var task in WhenEachAsync(tasks))
{
var elapsedMillis = TimeSpan.FromTicks((long)((Stopwatch.GetTimestamp() - start) * ((double)TimeSpan.TicksPerSecond / Stopwatch.Frequency))).TotalMilliseconds;
Console.WriteLine($"-> {task.Result}: " + elapsedMillis);
}
}
else
{
Console.WriteLine("= WhenEachEnumerator ===");
await foreach (var task in new WhenEachEnumerator<Task<int>>(tasks))
{
var elapsedMillis = TimeSpan.FromTicks((long)((Stopwatch.GetTimestamp() - start) * ((double)TimeSpan.TicksPerSecond / Stopwatch.Frequency))).TotalMilliseconds;
Console.WriteLine($"-> {task.Result}: " + elapsedMillis);
}
}
}
static Random rng = new Random();
static async Task<int> SimpleJob(int jobNumber)
{
var wait = rng.Next(31, 310);
await Task.Delay(wait);
Console.WriteLine($"No.{jobNumber}: {wait} ms");
return jobNumber;
}
}
--
ConfigureAwait(false)
を受け付け可能にする方法がない。。。どうすれば良いのか。
await
とか Dispose
みたいな「パターンベースの処理に引っかかる型」を捉えられる仮想インターフェイス的なモノを実装してくれないだろうか。IDuckTyping<Dispose>
IDuckTyping<GetAwaiter>
IDuckTyping<メソッド名, 戻り値の型>
みたいな。
--
以上です。お疲れ様でした。