はじめに
C#で非同期処理を組み立てていると、タスク間でデータを受け渡すキューが欲しくなる場面が出てきます。ですが、キューを自前で実装すると、次のような課題に直面します。
Channel<T> は、こうした「ステージ間の受け渡し」を .NET らしい形で整理して書くための部品です。今回は Channel<T> の基本から、実務で重要な Bounded(バックプレッシャー) と Complete(終了通知) までを、まとめてみました。
本記事のサンプルコードは .NET 8 / C# 12 で動作確認しています。
.NET Framework では一部の記法(await foreach など)が使用できませんが、Channel<T> の基本的な考え方や処理の流れは同様です。
Channel とは
Channel<T> は「書く側」と「読む側」を分離した非同期キューです。
-
Producer(書き込む側) が
Writerにデータを流し込む -
Consumer(読み取る側) が
Readerからデータを取り出す
この「Producer と Consumer をつなぐ」形を、ブロックしない API(WriteAsync / ReadAsync)で素直に書けるのが特徴です。
以降、この記事では Producer / Consumer という呼び方で統一します。
利用可能な環境
| 環境 | 対応状況 |
|---|---|
| .NET 6 / 7 / 8 / 9 | そのまま使える |
| .NET Core 3.x 以降 | そのまま使える |
| .NET Framework | NuGet で System.Threading.Channels を追加 1
|
名前空間: System.Threading.Channels
.NET Framework で利用可能な下限バージョンは、パッケージのバージョンに依存します。古い版(4.7.1 など)は .NET Framework 4.6.1 を対象にしていましたが、近年の版(.NET 7 世代以降)では 4.6.2 以上がサポート対象です。2
ReadAllAsync() は IAsyncEnumerable<T> を返します。3 await foreach で回すには C# 8.0 以降(async streams)が必要です。
従来の手法との比較
| 手法 | async/await 対応 | スレッドセーフ | 流量制御 |
|---|---|---|---|
lock + Queue<T>
|
× | 自前実装 | × |
BlockingCollection<T> 4
|
× | ○ | ○ |
Channel<T> 5
|
○ | ○ | ○ |
BlockingCollection<T> は同期ブロック前提の API(Take / GetConsumingEnumerable)で設計されています。
async/await で書きたいなら、最初から WriteAsync / ReadAllAsync を前提にした Channel<T> のほうが素直に組めます。
Channel<T> は BlockingCollection<T> の「モダンな async 版」と考えるとわかりやすいかもです。
基本的な使い方
最小構成のコードから試してみます。
using System;
using System.Threading.Channels;
using System.Threading.Tasks;
public class Program
{
public static async Task Main()
{
// Channel を作成
var channel = Channel.CreateUnbounded<int>();
// Producer: データを書き込む
await channel.Writer.WriteAsync(1);
await channel.Writer.WriteAsync(2);
await channel.Writer.WriteAsync(3);
channel.Writer.Complete(); // 書き込み完了を通知
// Consumer: データを読み取る
await foreach (var item in channel.Reader.ReadAllAsync())
{
Console.WriteLine(item);
}
}
}
[ 出力結果 ]
1
2
3
ポイント解説
Writer と Reader
Channel<T> は Writer と Reader という2つのプロパティを持ちます。
channel.Writer // 書き込み用(ChannelWriter<T>)
channel.Reader // 読み取り用(ChannelReader<T>)
この分離により、Producer には Writer だけを、Consumer には Reader だけを渡すことで、役割を明確にできます。
Complete() の重要性
Complete() は「これ以上データを書き込まない」ことを Consumer に伝えます。6
channel.Writer.Complete();
注意!!
これを呼び忘れると、Consumer の ReadAllAsync() は永遠に次のデータを待ち続けます。
Unbounded vs Bounded
Channel<T> には2種類の作成方法があります。
Unbounded(容量無制限)
var channel = Channel.CreateUnbounded<int>();
- 容量に制限がなく、メモリが許す限りデータを格納できる
-
WriteAsyncは即座に完了する - シンプルで高速だが、メモリ溢れのリスクがある
Bounded(容量制限あり)
var channel = Channel.CreateBounded<int>(capacity: 100);
- 指定した容量までしかデータを格納できない
- 満杯の場合、
WriteAsyncは空きができるまで待機する - メモリ使用量を制御できる
使い分けの指針
| ケース | 推奨 |
|---|---|
| 軽量なデータ・高速処理が必要 | Unbounded |
| メモリ使用量を制御したい | Bounded |
| 外部 API 呼び出しなど流量制御が必要 | Bounded |
| Producer が Consumer より圧倒的に速い | Bounded |
本番環境では、予期せぬメモリ溢れを防ぐために Bounded を検討することをおすすめします。
バックプレッシャーを理解する
Bounded Channel の真価はバックプレッシャーにあります。
バックプレッシャーとは
Consumer の処理が追いつかないとき、Producer を自動的に待たせる仕組みです。これにより、システム全体の安定性を保つことができます。
BoundedChannelOptions
Bounded Channel は、満杯時の挙動をカスタマイズできます。
var channel = Channel.CreateBounded<int>(new BoundedChannelOptions(capacity: 100)
{
FullMode = BoundedChannelFullMode.Wait,
SingleReader = false,
SingleWriter = false
});
| オプション | 説明 |
|---|---|
FullMode |
満杯時の挙動(後述) |
SingleReader |
Reader が1つだけなら true に設定すると最適化される |
SingleWriter |
Writer が1つだけなら true に設定すると最適化される |
SingleReader / SingleWriter を正しく設定すると、内部のロック競合が減りパフォーマンスが向上します。
FullMode の種類
満杯時にどうするかを FullMode で指定します。7
| FullMode | 値 | 挙動 | ユースケース |
|---|---|---|---|
Wait |
0 | 空きができるまで待機 | デフォルト。データを失いたくない場合 |
DropNewest |
1 | 最も新しいデータを捨てる | あまり使わない |
DropOldest |
2 | 最も古いデータを捨てる | リアルタイム処理。最新データが重要な場合 |
DropWrite |
3 | 書き込もうとしたデータを捨てる | サンプリング的な用途 |
注意
Drop 系モードでは、WriteAsync は成功を返しますが、内部でデータが捨てられています。ドロップを検知したい場合は、Channel.CreateBounded の itemDropped コールバック(オーバーロード引数)を登録するか、投入数と処理数の差分をメトリクスで監視してください。8
FullMode の動作例
using System;
using System.Threading.Channels;
using System.Threading.Tasks;
public class Program
{
public static async Task Main()
{
// 容量3、DropOldest モードの Channel
var channel = Channel.CreateBounded<int>(new BoundedChannelOptions(3)
{
// 最も古いデータを捨てる を設定
FullMode = BoundedChannelFullMode.DropOldest
});
// 5つのデータを書き込む
for (int i = 1; i <= 5; i++)
{
await channel.Writer.WriteAsync(i);
Console.WriteLine($"Written: {i}");
}
channel.Writer.Complete();
// 読み取り
await foreach (var item in channel.Reader.ReadAllAsync())
{
Console.WriteLine($"Read: {item}");
}
}
}
出力結果
Written: 1
Written: 2
Written: 3
Written: 4
Written: 5
Read: 3
Read: 4
Read: 5
1 と 2 は古いデータとして捨てられ、3, 4, 5 だけが残ります。
複数 Producer / 複数 Consumer
Channel<T> はスレッドセーフなので、複数の Producer や Consumer を同時に動かすことができます。
実装例
using System;
using System.Linq;
using System.Threading.Channels;
using System.Threading.Tasks;
public class Program
{
public static async Task Main()
{
var channel = Channel.CreateUnbounded<string>();
// 3つの Producer を起動
var producers = Enumerable.Range(1, 3).Select(id => Task.Run(async () =>
{
for (int i = 1; i <= 3; i++)
{
var message = $"Producer{id}-Item{i}";
await channel.Writer.WriteAsync(message);
Console.WriteLine($"[Write] {message}");
await Task.Delay(50);
}
})).ToArray();
// 2つの Consumer を起動
var consumers = Enumerable.Range(1, 2).Select(id => Task.Run(async () =>
{
await foreach (var item in channel.Reader.ReadAllAsync())
{
Console.WriteLine($"[Read by Consumer{id}] {item}");
await Task.Delay(100);
}
})).ToArray();
// 全 Producer の完了を待ってから Complete
await Task.WhenAll(producers);
channel.Writer.Complete();
// 全 Consumer の完了を待つ
await Task.WhenAll(consumers);
}
}
【実践】パイプライン構築
複数の処理ステージを Channel で繋ぐことで、効率的なデータ処理パイプラインを構築できます。
パイプラインの利点
- 各ステージが独立して動作する
- ステージごとに並列度を調整できる
- バックプレッシャーにより全体の流量が自動調整される
実装例
「データ生成 → 変換処理 → 結果出力」の3ステージパイプラインです。
using System;
using System.Linq;
using System.Threading.Channels;
using System.Threading.Tasks;
public class Program
{
public static async Task Main()
{
// Stage 1 → Stage 2 の Channel
var channel1 = Channel.CreateBounded<int>(new BoundedChannelOptions(10)
{
FullMode = BoundedChannelFullMode.Wait
});
// Stage 2 → Stage 3 の Channel
var channel2 = Channel.CreateBounded<string>(new BoundedChannelOptions(10)
{
FullMode = BoundedChannelFullMode.Wait
});
// Stage 1: データ生成(1並列)
var stage1 = Task.Run(async () =>
{
for (int i = 1; i <= 20; i++)
{
await channel1.Writer.WriteAsync(i);
Console.WriteLine($"[Stage1] Generated: {i}");
}
channel1.Writer.Complete();
});
// Stage 2: 変換処理(4並列でスケールアウト)
var stage2Workers = Enumerable.Range(1, 4).Select(workerId => Task.Run(async () =>
{
await foreach (var item in channel1.Reader.ReadAllAsync())
{
await Task.Delay(100); // 重い処理をシミュレート
var result = $"Processed-{item}";
await channel2.Writer.WriteAsync(result);
Console.WriteLine($"[Stage2-Worker{workerId}] {item} → {result}");
}
})).ToArray();
// Stage 2 完了後に channel2 を Complete
var stage2 = Task.Run(async () =>
{
await Task.WhenAll(stage2Workers);
channel2.Writer.Complete();
});
// Stage 3: 結果出力(1並列)
var stage3 = Task.Run(async () =>
{
await foreach (var item in channel2.Reader.ReadAllAsync())
{
Console.WriteLine($"[Stage3] Output: {item}");
}
});
// 全ステージの完了を待つ
await Task.WhenAll(stage1, stage2, stage3);
Console.WriteLine("Pipeline completed.");
}
}
この例では、ボトルネックになりがちな変換処理(Stage 2)を4並列で実行することで、スループットを向上させています。
CancellationToken でキャンセル対応
ReadAllAsync() は CancellationToken を受け取れるため、サービス停止やタイムアウトに対応できます。9
using System;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
public class Program
{
public static async Task Main()
{
var channel = Channel.CreateUnbounded<int>();
using var cts = new CancellationTokenSource();
// 3秒後にキャンセル
cts.CancelAfter(TimeSpan.FromSeconds(3));
// Producer: 1秒ごとにデータを書き込む
var producer = Task.Run(async () =>
{
for (int i = 1; i <= 10; i++)
{
await channel.Writer.WriteAsync(i);
Console.WriteLine($"Written: {i}");
await Task.Delay(1000);
}
channel.Writer.Complete();
});
// Consumer: キャンセル対応
try
{
await foreach (var item in channel.Reader.ReadAllAsync(cts.Token))
{
Console.WriteLine($"Read: {item}");
}
}
catch (OperationCanceledException)
{
Console.WriteLine("Consumer was cancelled.");
}
await producer;
}
}
[ 出力結果]
Written: 1
Read: 1
Written: 2
Read: 2
Written: 3
Read: 3
Consumer was cancelled.
落とし穴と対策
❌ Complete() を忘れる
// NG: Complete() がないと Consumer が永遠に待機
await channel.Writer.WriteAsync(1);
// channel.Writer.Complete(); ← 忘れている!
await foreach (var item in channel.Reader.ReadAllAsync())
{
// 1件処理後、次のデータを永遠に待ち続ける...
}
対策
try-finally で確実に Complete() を呼ぶ。
try
{
await channel.Writer.WriteAsync(1);
}
finally
{
channel.Writer.Complete();
}
❌ Producer の例外を握りつぶす
// NG: 例外が発生しても Consumer は気づかない
try
{
await channel.Writer.WriteAsync(Process(data));
}
catch (Exception)
{
// 例外を無視...
}
対策
Complete(exception) で例外を Consumer に伝播させる。
try
{
await channel.Writer.WriteAsync(Process(data));
}
catch (Exception ex)
{
channel.Writer.Complete(ex); // 例外付きで完了
throw;
}
Consumer 側では ReadAllAsync() が例外をスローするので、適切にハンドリングできます。
❌ Unbounded でメモリ溢れ
Producer が高速で Consumer が遅い場合、Unbounded Channel はメモリを食い続けます。
対策
本番環境では Bounded Channel を検討する。
var channel = Channel.CreateBounded<Data>(new BoundedChannelOptions(1000)
{
FullMode = BoundedChannelFullMode.Wait
});
❌ 処理順序の崩れ
複数の Consumer を使うと、アイテムの処理順序が保証されません。
// Consumer1 と Consumer2 が並列で読み取る
// → 1, 2, 3 の順で書き込んでも、処理完了順は保証されない
対策
順序が重要な場合は、Consumer を1つにするか、後続処理で順序を復元するロジックを入れる。
まとめ
Channel<T> は、async/await 時代の Producer-Consumer パターンを素直に書くための道具です。覚えておくと、スレッド間通信の選択肢が広がります。
早見表を見る
以下は個人的な使い分けの目安です。実際のプロジェクト要件に応じて判断してください。
| やりたいこと | 方法 |
|---|---|
| シンプルな非同期キュー | CreateUnbounded<T>() |
| メモリ使用量を制御 | CreateBounded<T>(capacity) |
| Producer を待たせて流量制御 | BoundedChannelFullMode.Wait |
| 古いデータを捨てる(リアルタイム向け) | BoundedChannelFullMode.DropOldest |
| 処理を並列化 | 複数の Consumer を起動 |
| 多段階のデータ処理 | パイプライン構築 |
| キャンセル対応 | ReadAllAsync(CancellationToken) |
使いどころを見る
スレッド間通信で lock や BlockingCollection を使っているなら、Channel<T> への置き換えを検討してみてみるのも良いかもです。
- ログ収集・書き込み
- バックグラウンドでのデータ処理
- API レート制限の実装
- ETL パイプライン
コードがシンプルになり、async/await との親和性も高まります。
この記事は、Channel を 実務で事故らず使うための要点(Bounded / Complete / Drop / 複数Consumerの注意) にフォーカスしています。例外的な最適化や高度な設計(ブロードキャスト、順序保証、シャーディング等)は用途が分かれるので、ここでは割愛しました。






