36
28

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

【C#】Channel<T> でスレッド間通信をシンプルに

Posted at

はじめに

C#で非同期処理を組み立てていると、タスク間でデータを受け渡すキューが欲しくなる場面が出てきます。ですが、キューを自前で実装すると、次のような課題に直面します。

0001 (1).png

Channel<T> は、こうした「ステージ間の受け渡し」を .NET らしい形で整理して書くための部品です。今回は Channel<T> の基本から、実務で重要な Bounded(バックプレッシャー)Complete(終了通知) までを、まとめてみました。

本記事のサンプルコードは .NET 8 / C# 12 で動作確認しています。
.NET Framework では一部の記法(await foreach など)が使用できませんが、Channel<T> の基本的な考え方や処理の流れは同様です。

Channel とは

Channel<T> は「書く側」と「読む側」を分離した非同期キューです。

a0001.png

  • Producer(書き込む側)Writer にデータを流し込む
  • Consumer(読み取る側)Reader からデータを取り出す

この「Producer と Consumer をつなぐ」形を、ブロックしない API(WriteAsync / ReadAsync)で素直に書けるのが特徴です。

以降、この記事では Producer / Consumer という呼び方で統一します。

利用可能な環境

環境 対応状況
.NET 6 / 7 / 8 / 9 そのまま使える
.NET Core 3.x 以降 そのまま使える
.NET Framework NuGet で System.Threading.Channels を追加 1

名前空間: System.Threading.Channels

.NET Framework で利用可能な下限バージョンは、パッケージのバージョンに依存します。古い版(4.7.1 など)は .NET Framework 4.6.1 を対象にしていましたが、近年の版(.NET 7 世代以降)では 4.6.2 以上がサポート対象です。2

ReadAllAsync()IAsyncEnumerable<T> を返します。3 await foreach で回すには C# 8.0 以降(async streams)が必要です。

従来の手法との比較

手法 async/await 対応 スレッドセーフ 流量制御
lock + Queue<T> × 自前実装 ×
BlockingCollection<T> 4 ×
Channel<T> 5

BlockingCollection<T> は同期ブロック前提の API(Take / GetConsumingEnumerable)で設計されています。

async/await で書きたいなら、最初から WriteAsync / ReadAllAsync を前提にした Channel<T> のほうが素直に組めます。

Channel<T>BlockingCollection<T> の「モダンな async 版」と考えるとわかりやすいかもです。

基本的な使い方

最小構成のコードから試してみます。

C#
using System;
using System.Threading.Channels;
using System.Threading.Tasks;

public class Program
{
    public static async Task Main()
    {
        // Channel を作成
        var channel = Channel.CreateUnbounded<int>();

        // Producer: データを書き込む
        await channel.Writer.WriteAsync(1);
        await channel.Writer.WriteAsync(2);
        await channel.Writer.WriteAsync(3);
        channel.Writer.Complete();  // 書き込み完了を通知

        // Consumer: データを読み取る
        await foreach (var item in channel.Reader.ReadAllAsync())
        {
            Console.WriteLine(item);
        }
    }
}

[ 出力結果 ]

1
2
3

.NET Fiddle で実行する

ポイント解説

Writer と Reader

Channel<T>WriterReader という2つのプロパティを持ちます。

C#
channel.Writer  // 書き込み用(ChannelWriter<T>)
channel.Reader  // 読み取り用(ChannelReader<T>)

この分離により、Producer には Writer だけを、Consumer には Reader だけを渡すことで、役割を明確にできます。

Complete() の重要性

Complete() は「これ以上データを書き込まない」ことを Consumer に伝えます。6

channel.Writer.Complete();

注意!!
これを呼び忘れると、Consumer の ReadAllAsync() は永遠に次のデータを待ち続けます。

Unbounded vs Bounded

Channel<T> には2種類の作成方法があります。

Unbounded(容量無制限)

a0002 (2).png

var channel = Channel.CreateUnbounded<int>();
  • 容量に制限がなく、メモリが許す限りデータを格納できる
  • WriteAsync は即座に完了する
  • シンプルで高速だが、メモリ溢れのリスクがある

Bounded(容量制限あり)

a0003 (2).png

var channel = Channel.CreateBounded<int>(capacity: 100);
  • 指定した容量までしかデータを格納できない
  • 満杯の場合、WriteAsync は空きができるまで待機する
  • メモリ使用量を制御できる

使い分けの指針

ケース 推奨
軽量なデータ・高速処理が必要 Unbounded
メモリ使用量を制御したい Bounded
外部 API 呼び出しなど流量制御が必要 Bounded
Producer が Consumer より圧倒的に速い Bounded

本番環境では、予期せぬメモリ溢れを防ぐために Bounded を検討することをおすすめします。

バックプレッシャーを理解する

Bounded Channel の真価はバックプレッシャーにあります。

バックプレッシャーとは

Consumer の処理が追いつかないとき、Producer を自動的に待たせる仕組みです。これにより、システム全体の安定性を保つことができます。

a0004 (1).png

BoundedChannelOptions

Bounded Channel は、満杯時の挙動をカスタマイズできます。

C#
var channel = Channel.CreateBounded<int>(new BoundedChannelOptions(capacity: 100)
{
    FullMode = BoundedChannelFullMode.Wait,
    SingleReader = false,
    SingleWriter = false
});
オプション 説明
FullMode 満杯時の挙動(後述)
SingleReader Reader が1つだけなら true に設定すると最適化される
SingleWriter Writer が1つだけなら true に設定すると最適化される

SingleReader / SingleWriter を正しく設定すると、内部のロック競合が減りパフォーマンスが向上します。

FullMode の種類

満杯時にどうするかを FullMode で指定します。7

FullMode 挙動 ユースケース
Wait 0 空きができるまで待機 デフォルト。データを失いたくない場合
DropNewest 1 最も新しいデータを捨てる あまり使わない
DropOldest 2 最も古いデータを捨てる リアルタイム処理。最新データが重要な場合
DropWrite 3 書き込もうとしたデータを捨てる サンプリング的な用途

注意
Drop 系モードでは、WriteAsync は成功を返しますが、内部でデータが捨てられています。ドロップを検知したい場合は、Channel.CreateBoundeditemDropped コールバック(オーバーロード引数)を登録するか、投入数と処理数の差分をメトリクスで監視してください。8

FullMode の動作例

C#
using System;
using System.Threading.Channels;
using System.Threading.Tasks;

public class Program
{
    public static async Task Main()
    {
        // 容量3、DropOldest モードの Channel
        var channel = Channel.CreateBounded<int>(new BoundedChannelOptions(3)
        {
            // 最も古いデータを捨てる を設定
            FullMode = BoundedChannelFullMode.DropOldest
        });

        // 5つのデータを書き込む
        for (int i = 1; i <= 5; i++)
        {
            await channel.Writer.WriteAsync(i);
            Console.WriteLine($"Written: {i}");
        }
        channel.Writer.Complete();

        // 読み取り
        await foreach (var item in channel.Reader.ReadAllAsync())
        {
            Console.WriteLine($"Read: {item}");
        }
    }
}

出力結果

Written: 1
Written: 2
Written: 3
Written: 4
Written: 5
Read: 3
Read: 4
Read: 5

1 と 2 は古いデータとして捨てられ、3, 4, 5 だけが残ります。

.NET Fiddle で実行する

複数 Producer / 複数 Consumer

Channel<T> はスレッドセーフなので、複数の Producer や Consumer を同時に動かすことができます。

a0005 (1).png

実装例

C#
using System;
using System.Linq;
using System.Threading.Channels;
using System.Threading.Tasks;

public class Program
{
    public static async Task Main()
    {
        var channel = Channel.CreateUnbounded<string>();

        // 3つの Producer を起動
        var producers = Enumerable.Range(1, 3).Select(id => Task.Run(async () =>
        {
            for (int i = 1; i <= 3; i++)
            {
                var message = $"Producer{id}-Item{i}";
                await channel.Writer.WriteAsync(message);
                Console.WriteLine($"[Write] {message}");
                await Task.Delay(50);
            }
        })).ToArray();

        // 2つの Consumer を起動
        var consumers = Enumerable.Range(1, 2).Select(id => Task.Run(async () =>
        {
            await foreach (var item in channel.Reader.ReadAllAsync())
            {
                Console.WriteLine($"[Read by Consumer{id}] {item}");
                await Task.Delay(100);
            }
        })).ToArray();

        // 全 Producer の完了を待ってから Complete
        await Task.WhenAll(producers);
        channel.Writer.Complete();

        // 全 Consumer の完了を待つ
        await Task.WhenAll(consumers);
    }
}

.NET Fiddle で実行する

注意点

  • Complete() は1回だけ : 全ての Producer が完了してから、最後に1回だけ呼び出します。Complete() 後に WriteAsync を呼ぶと ChannelClosedException がスローされます。
  • ブロードキャストではない : 各アイテムは1つの Consumer だけが受け取ります。全 Consumer に同じデータを送りたい場合は、別の仕組み(例: 各 Consumer 用に Channel を分ける)が必要です。

【実践】パイプライン構築

複数の処理ステージを Channel で繋ぐことで、効率的なデータ処理パイプラインを構築できます。

a0006 (1).png

パイプラインの利点

  • 各ステージが独立して動作する
  • ステージごとに並列度を調整できる
  • バックプレッシャーにより全体の流量が自動調整される

実装例

「データ生成 → 変換処理 → 結果出力」の3ステージパイプラインです。

C#
using System;
using System.Linq;
using System.Threading.Channels;
using System.Threading.Tasks;

public class Program
{
    public static async Task Main()
    {
        // Stage 1 → Stage 2 の Channel
        var channel1 = Channel.CreateBounded<int>(new BoundedChannelOptions(10)
        {
            FullMode = BoundedChannelFullMode.Wait
        });

        // Stage 2 → Stage 3 の Channel
        var channel2 = Channel.CreateBounded<string>(new BoundedChannelOptions(10)
        {
            FullMode = BoundedChannelFullMode.Wait
        });

        // Stage 1: データ生成(1並列)
        var stage1 = Task.Run(async () =>
        {
            for (int i = 1; i <= 20; i++)
            {
                await channel1.Writer.WriteAsync(i);
                Console.WriteLine($"[Stage1] Generated: {i}");
            }
            channel1.Writer.Complete();
        });

        // Stage 2: 変換処理(4並列でスケールアウト)
        var stage2Workers = Enumerable.Range(1, 4).Select(workerId => Task.Run(async () =>
        {
            await foreach (var item in channel1.Reader.ReadAllAsync())
            {
                await Task.Delay(100);  // 重い処理をシミュレート
                var result = $"Processed-{item}";
                await channel2.Writer.WriteAsync(result);
                Console.WriteLine($"[Stage2-Worker{workerId}] {item}{result}");
            }
        })).ToArray();

        // Stage 2 完了後に channel2 を Complete
        var stage2 = Task.Run(async () =>
        {
            await Task.WhenAll(stage2Workers);
            channel2.Writer.Complete();
        });

        // Stage 3: 結果出力(1並列)
        var stage3 = Task.Run(async () =>
        {
            await foreach (var item in channel2.Reader.ReadAllAsync())
            {
                Console.WriteLine($"[Stage3] Output: {item}");
            }
        });

        // 全ステージの完了を待つ
        await Task.WhenAll(stage1, stage2, stage3);
        Console.WriteLine("Pipeline completed.");
    }
}

.NET Fiddle で実行する

この例では、ボトルネックになりがちな変換処理(Stage 2)を4並列で実行することで、スループットを向上させています。

CancellationToken でキャンセル対応

ReadAllAsync()CancellationToken を受け取れるため、サービス停止やタイムアウトに対応できます。9

C#
using System;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;

public class Program
{
    public static async Task Main()
    {
        var channel = Channel.CreateUnbounded<int>();
        using var cts = new CancellationTokenSource();

        // 3秒後にキャンセル
        cts.CancelAfter(TimeSpan.FromSeconds(3));

        // Producer: 1秒ごとにデータを書き込む
        var producer = Task.Run(async () =>
        {
            for (int i = 1; i <= 10; i++)
            {
                await channel.Writer.WriteAsync(i);
                Console.WriteLine($"Written: {i}");
                await Task.Delay(1000);
            }
            channel.Writer.Complete();
        });

        // Consumer: キャンセル対応
        try
        {
            await foreach (var item in channel.Reader.ReadAllAsync(cts.Token))
            {
                Console.WriteLine($"Read: {item}");
            }
        }
        catch (OperationCanceledException)
        {
            Console.WriteLine("Consumer was cancelled.");
        }

        await producer;
    }
}

[ 出力結果]

Written: 1
Read: 1
Written: 2
Read: 2
Written: 3
Read: 3
Consumer was cancelled.

.NET Fiddle で実行する

落とし穴と対策

❌ Complete() を忘れる

C#
// NG: Complete() がないと Consumer が永遠に待機
await channel.Writer.WriteAsync(1);
// channel.Writer.Complete();  ← 忘れている!

await foreach (var item in channel.Reader.ReadAllAsync())
{
    // 1件処理後、次のデータを永遠に待ち続ける...
}

対策
try-finally で確実に Complete() を呼ぶ。

C#
try
{
    await channel.Writer.WriteAsync(1);
}
finally
{
    channel.Writer.Complete();
}

❌ Producer の例外を握りつぶす

C#
// NG: 例外が発生しても Consumer は気づかない
try
{
    await channel.Writer.WriteAsync(Process(data));
}
catch (Exception)
{
    // 例外を無視...
}

対策
Complete(exception) で例外を Consumer に伝播させる。

C#
try
{
    await channel.Writer.WriteAsync(Process(data));
}
catch (Exception ex)
{
    channel.Writer.Complete(ex);  // 例外付きで完了
    throw;
}

Consumer 側では ReadAllAsync() が例外をスローするので、適切にハンドリングできます。

❌ Unbounded でメモリ溢れ

Producer が高速で Consumer が遅い場合、Unbounded Channel はメモリを食い続けます。

対策
本番環境では Bounded Channel を検討する。

C#
var channel = Channel.CreateBounded<Data>(new BoundedChannelOptions(1000)
{
    FullMode = BoundedChannelFullMode.Wait
});

❌ 処理順序の崩れ

複数の Consumer を使うと、アイテムの処理順序が保証されません

C#
// Consumer1 と Consumer2 が並列で読み取る
// → 1, 2, 3 の順で書き込んでも、処理完了順は保証されない

対策
順序が重要な場合は、Consumer を1つにするか、後続処理で順序を復元するロジックを入れる。

まとめ

Channel<T> は、async/await 時代の Producer-Consumer パターンを素直に書くための道具です。覚えておくと、スレッド間通信の選択肢が広がります。

早見表を見る

以下は個人的な使い分けの目安です。実際のプロジェクト要件に応じて判断してください。

やりたいこと 方法
シンプルな非同期キュー CreateUnbounded<T>()
メモリ使用量を制御 CreateBounded<T>(capacity)
Producer を待たせて流量制御 BoundedChannelFullMode.Wait
古いデータを捨てる(リアルタイム向け) BoundedChannelFullMode.DropOldest
処理を並列化 複数の Consumer を起動
多段階のデータ処理 パイプライン構築
キャンセル対応 ReadAllAsync(CancellationToken)
使いどころを見る

スレッド間通信で lockBlockingCollection を使っているなら、Channel<T> への置き換えを検討してみてみるのも良いかもです。

  • ログ収集・書き込み
  • バックグラウンドでのデータ処理
  • API レート制限の実装
  • ETL パイプライン

コードがシンプルになり、async/await との親和性も高まります。

この記事は、Channel を 実務で事故らず使うための要点(Bounded / Complete / Drop / 複数Consumerの注意) にフォーカスしています。例外的な最適化や高度な設計(ブロードキャスト、順序保証、シャーディング等)は用途が分かれるので、ここでは割愛しました。

  1. System.Threading.Channels - NuGet Gallery

  2. System.Threading.Channels - NuGet Gallery(Supported frameworks)

  3. ChannelReader.ReadAllAsync - Microsoft Learn

  4. BlockingCollection<T> クラス - Microsoft Learn

  5. System.Threading.Channels 概要 - Microsoft Learn

  6. ChannelWriter.Complete - Microsoft Learn

  7. BoundedChannelFullMode 列挙型 - Microsoft Learn

  8. Channel.CreateBounded(itemDropped オーバーロード) - Microsoft Learn

  9. ChannelReader.ReadAllAsync(CancellationToken) - Microsoft Learn

36
28
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
36
28

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?