.NET の非同期プログラミングにおいて、System.Threading.Channels
は生産者-消費者パターンを処理するための強力な方法を提供します。特に、異なるタスクやサービス間でデータをやり取りする際に有効です。この記事では、UnboundedChannelOptions
と BoundedChannelOptions
の 2 つのクラス、およびそれらの使用シナリオと違いについて説明します。
コード背景紹介
まず、一つのコードを見てみましょう。これは ASP.NET Core プロジェクトで、Channel<VectorData>
を作成してメッセージを格納し、VectorService
でこれらのメッセージを処理します。
builder.Services.AddSingleton<Channel<VectorData>>(_ =>
{
//return Channel.CreateUnbounded<VectorData>(new UnboundedChannelOptions
//{
// SingleReader = true,
// AllowSynchronousContinuations = false,
//});
return Channel.CreateBounded<VectorData>(new BoundedChannelOptions(10)
{
SingleReader = true,
FullMode = BoundedChannelFullMode.Wait,
AllowSynchronousContinuations = false,
});
});
ここでは、次の2つの選択肢があります:
UnboundedChannelOptions
(無制限容量チャンネル)BoundedChannelOptions
(制限付き容量チャンネル)
では、これら2つの違いは何でしょうか?詳しく見てみましょう。
UnboundedChannelOptions:無制限容量チャンネル
Channel.CreateUnbounded<T>(new UnboundedChannelOptions {...})
を使用する場合、これはサイズ制限のないチャンネルを作成することを意味します。これは次のことを意味します:
- メッセージを無制限に投入でき、容量問題によるブロックは発生しません。
- プロデューサーの速度が消費者を大幅に上回る場合、データがずっと蓄積され、OOM(アウト・オブ・メモリー)の可能性があります。
- プロデューサーとコンシューマーの速度がマッチしている、もしくは十分なリソースがあるシナリオに適しています。
重要な属性解析:
-
SingleReader
:データを読む消費者が1つだけかどうか。true
に設定すると、複数のタスクが同時にメッセージを消費することがなくなり、読み取り効率が向上します。 -
AllowSynchronousContinuations
:同期的にコールバックを実行するか否かを指定します。false
に設定すると、すべての待機タスクは非同期にスケジュールされ、スレッドがブロックされることを避けます。
適用シナリオ:
- プロデューサーと消費者の処理速度が近い。
- メモリ消費が気にならない、または一時的に大量のデータを保存することを許容する。
- タスクの実行時間が短く、大量の待機データが蓄積しにくい。
BoundedChannelOptions:制限付き容量チャンネル
BoundedChannelOptions
は容量が制限されたチャンネルであり、作成時に最大容量を指定します。コードの中では new BoundedChannelOptions(10)
とされています。
重要な特性:
- チャンネルの容量は有限で、10件を超えるメッセージ時にはプロデューサーが書き込むことができません(消費者がメッセージを取り去るまでは)。
- チャンネルが満たされた時の動作を制御することができ、
FullMode
を指定することで設定が可能です:
-
BoundedChannelFullMode.Wait
(デフォルト):プロデューサーが待機、スペースができるまで書き込みできません。 -
BoundedChannelFullMode.DropOldest
:最古のデータを捨て、新しいデータを投入可能に。 -
BoundedChannelFullMode.DropNewest
:最新のデータを捨て、古いデータを保持。 -
BoundedChannelFullMode.DropWrite
:書き込もうとするデータを直接捨てる。
重要な属性解析:
-
SingleReader
:同様にUnboundedChannelOptions
と同じく、データの読み取りに消費者が1人かを制御します。 -
AllowSynchronousContinuations
:同期実行を許可し、不必要なスレッドスイッチを防ぎます。
適用シナリオ:
- プロデューサーの速度が消費者を大幅に上回る可能性がある場合、メッセージの蓄積を制御し、メモリ占用の増大を避けたい。
- チャンネルが満たされた時の動作を制御したい:新しいデータを優先するもしくはプロデューサーを待機させるなど。
Unbounded vs Bounded:どう選ぶか?
特性 | UnboundedChannelOptions | BoundedChannelOptions |
---|---|---|
容量を制限するか | ❌ 無制限 | ✅ 制限あり (例: 10件のメッセージ) |
高スループットに適応か | ✅ 適する | 🚫 影響する可能性 |
メモリ占用が満杯になる可能性 | ✅ 可能性あり | 🚫 あまりない |
プロデューサーが消費者よりも速い時 | ❌ OOMの可能性 | ✅ 制限可能 |
チャンネルが満載時の動作を設定 | 🚫 設定不可 | ✅ 設定可能 (FullMode ) |
適用シナリオ | プロデューサーと消費者の速度が一致 | 生産者速度が消費者より速い可能性がある場合 |
簡単に言ってしまえば:
- もしプロデューサーと消費者の速度やデータ量を考慮しきれない、また消費者より多くのデータを生成する可能性があるなら、
BoundedChannelOptions
の使用をお勧めします。 - 大量のデータが蓄積しないもしくは消費能力が追いつくことが保証されるなら、
UnboundedChannelOptions
も使用可能です。
コード実践:UnboundedChannelOptions への変更
もしUnboundedChannelOptions
を使用する場合、以下のように変更できます:
builder.Services.AddSingleton<Channel<VectorData>>(_ =>
{
return Channel.CreateUnbounded<VectorData>(new UnboundedChannelOptions
{
SingleReader = true,
AllowSynchronousContinuations = false,
});
});
ここでは、BoundedChannelOptions
における FullMode
がなくなります。なぜなら、それ自体が満たされることがないからです。
しかし、もしプロデューサーが絶え間なく書き込み続け、消費者が間に合わない場合、メモリ占用が増えていく可能性があります。
結論
-
UnboundedChannelOptions
は処理速度が均衡している場合に適していますが、プロデューサーが消費者を大幅に上回ると、OOM リスクがあります。 -
BoundedChannelOptions
はメモリ爆発を防ぐことができ、高スループットのプロデューサー-消費者モデルに適しています。 -
BoundedChannelOptions
はチャンネルがいっぱいになったときの動作をFullMode
により制御できますが、UnboundedChannelOptions
にはそのオプションはありません。
データ量を予測しにくい、または生産者が消費者よりも速くなる可能性がある場合は、BoundedChannelOptions
を優先して選択することをお勧めします。
希望この論文がこれら2つのChannel
オプションの違いを理解するのに役立ち、高並行、非同期タスクを作成する際に適するソリューションをより適切に選択することができるように!
完全なコード:
using System.Threading.Channels;
var builder = WebApplication.CreateBuilder(args);
builder.Services.AddHostedService<VectorService>();
builder.Services.AddSingleton<Channel<VectorData>>(_ =>
{
//return Channel.CreateUnbounded<VectorData>(new UnboundedChannelOptions
//{
// SingleReader = true,
// AllowSynchronousContinuations = false,
//});
return Channel.CreateBounded<VectorData>(new BoundedChannelOptions(10)
{
SingleReader = true,
FullMode = BoundedChannelFullMode.Wait,
AllowSynchronousContinuations = false,
});
});
var app = builder.Build();
app.MapGet("/vector", async (Channel<VectorData> channel) =>
{
await channel.Writer.WriteAsync(new VectorData($"ここはユーザーコンテンツメッセージ、{DateTime.Now.ToString("yyyyMMddHHmmssfff")}"));
return Results.Ok();
});
app.Run();
public record VectorData(string content);
public class VectorService : BackgroundService
{
private readonly Channel<VectorData> _channel;
public VectorService(Channel<VectorData> channel)
{
_channel = channel;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
while (await _channel.Reader.WaitToReadAsync(stoppingToken))
{
var vectorData = await _channel.Reader.ReadAsync(stoppingToken);
Console.WriteLine($"ベクトル化開始:{vectorData.content}");
await Task.Delay(3000, stoppingToken);
Console.WriteLine("ベクトル化終了");
}
}
}
(Translated by GPT)
元のリンク:https://mp.weixin.qq.com/s/mm6HQHVT4nHq49RAQnwvTg?token=1135395277&lang=zh_CN&wt.mc_id=MVP_325642