47
28

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

System.Threading.Channelsで非同期フレンドリーな生産者・消費者(Pub/Sub)パターンを実装する

Last updated at Posted at 2020-08-07

結構自作しちゃうことが多かったんですが、それなりに実装難易度が高い上にちょっと時間がないので何かないか困っていたら、良いものを教えていただいたので簡単な使い方をまとめておきます。

あえとすさんいつもありがとうございます!

これは何?

マルチスレッドプログラミングの一般的なデザインパターンのひとつ、生産者・消費者パターン(最近だとPub/Subパターンといった方が伝わりが良い?)の実装を助けてもらえるMicrosoft謹製のライブラリです。async/awaitとの親和性が高く、簡単に使えて高速に動作します。

参考資料

  1. System.Threading.Channelsを使う
  2. C# Channels - Publish / Subscribe Workflows

とくに1.の記事は、ほかのライブラリとの比較もされていて、一読することを強くお勧めします。

生産者:消費者=1:1の実装例

まずは一番簡単な例から。生産者も消費者もひとりなんですが、生産者はキューにアイテムを突っ込んだら、消費者に非同期で処理してもらいたいときに利用する実装です。こちらはほぼ参考資料のままです。シンプルで素晴らしいサンプルです。

static async Task Main(string[] args)
{
    var channel = Channel.CreateUnbounded<string>(
        new UnboundedChannelOptions
        {
            SingleReader = true,
            SingleWriter = true
        });

    var consumer = Task.Run(async () =>
    {
        while (await channel.Reader.WaitToReadAsync())
        {
            Console.WriteLine(await channel.Reader.ReadAsync());
        }
    });
    var producer = Task.Run(async () =>
    {
        var rnd = new Random();
        for (int i = 0; i < 5; i++)
        {
            await Task.Delay(TimeSpan.FromSeconds(rnd.Next(3)));
            await channel.Writer.WriteAsync($"Message {i}");
        }
        channel.Writer.Complete();
    });

    await Task.WhenAll(producer, consumer);            
    Console.WriteLine("Completed.");
}

まず最初にキュー(+α)に該当するChannelオブジェクトを生成しています。

var channel = Channel.CreateUnbounded<string>(
    new UnboundedChannelOptions
    {
        SingleReader = true,
        SingleWriter = true
    });

CreateUnboundedはサイズ制限のないキューを作成します。CreateBoundedを利用するとサイズ制限のあるキューを作成できます。

またUnboundedChannelOptionsのSingleReaderとSingleWriterをtrueとして制約をかけることで性能がやや向上するようです。デフォルトはfalseです。

つづいて、キューに登録されたアイテムを処理する消費者(Consumer)を作成します。

var consumer = Task.Run(async () =>
{
    while (await channel.Reader.WaitToReadAsync())
    {
        Console.WriteLine(await channel.Reader.ReadAsync());
    }
});

WaitToReadAsyncメソッドでChannelが閉じられる(すべての処理が完了している)かどうかチェックし、ReadAsyncでアイテムを取得して処理しています。

ではつぎは生産者(Producer)側を見てみましょう。

var producer = Task.Run(async () =>
{
    var rnd = new Random();
    for (int i = 0; i < 5; i++)
    {
        await Task.Delay(TimeSpan.FromSeconds(rnd.Next(3)));
        await channel.Writer.WriteAsync($"Message {i}");
    }
    channel.Writer.Complete();
});

Randomに3秒以内のディレイを挟みつつ、WriteAsyncでアイテムを登録しています。

5つのアイテムを書き込んだらCompleteでチャネルを閉じています。

最後は生産者と消費者の処理を待って終了します。

await Task.WhenAll(producer, consumer);            
Console.WriteLine("Completed.");

生産者:消費者=1:nの実装例

参考資料の例だとキューに入れられたアイテムの処理時間が均等であれば良いのですが、アイテムによって処理時間が異なるような場合は不適切です。

その場合、つぎのように利用しましょう。

CreateUnboundedでWriter側だけSingle制約をかけていることに注意してください。

static async Task Main(string[] args)
{
    var channel = Channel.CreateUnbounded<string>(
        new UnboundedChannelOptions
        {
            SingleWriter = true
        });
    
    var consumers = Enumerable
        .Range(1, 3)    // 1~3の数値を取得する
        .Select(consumerNumber =>
            Task.Run(async () =>
            {
                while (await channel.Reader.WaitToReadAsync())
                {
                    if (channel.Reader.TryRead(out var item))
                    {
                        Console.WriteLine($"Consumer:{consumerNumber} {item}");
                    }
                }
            }));
    var producer = Task.Run(async () =>
    {
        var rnd = new Random();
        for (var i = 0; i < 5; i++)
        {
            await Task.Delay(TimeSpan.FromSeconds(rnd.Next(3)));
            await channel.Writer.WriteAsync($"Message {i}");
        }
        channel.Writer.Complete();
    });

    await Task.WhenAll(consumers.Union(new[] {producer}));
    Console.WriteLine("Completed.");
}

まず最初に複数の消費者を生成します。

var consumers = Enumerable
    .Range(1, 3)    // 1~3の数値を取得する
    .Select(consumerNumber =>
        Task.Run(async () =>
        {
            while (await channel.Reader.WaitToReadAsync())
            {
                if (channel.Reader.TryRead(out var item))
                {
                    Console.WriteLine($"Consumer:{consumerNumber} {item}");
                }
            }
        }));

ここで重要なのは、アイテムの取得にReadAsyncではなくTryReadを使っている点です。

Channelでは消費者はWaitToReadAsyncでキューにアイテムが登録されるのを待ちますが、アイテムがひとつ登録されると、一旦すべての生産者が「起こされ」ます。ReadAsyncを使った場合、2番目以降にアイテムを取りに行った消費者は、アイテムがないため例外がスローされてしまいます。

そこでTryReadを利用することで、まだアイテムがあった場合だけ処理するように実装してあげる必要があります。

あとはすべての消費者と生産者の処理の終了を待機して完了です。

await Task.WhenAll(consumers.Union(new[] {producer}));
Console.WriteLine("Completed.");

生産者:消費者=n:1の実装例

参考資料の例では、複数のChannelを作ってマージする方法が紹介されていますが、元々Channel自体が複数からの入力をサポートしているので必ずしもChannelを複数作る必要はありません。

ということで以下がシンプルな例になります。CreateUnboundedでReader側だけSingle制約をかけていることに気を付けてください。

static async Task Main(string[] args)
{
    var channel = Channel.CreateUnbounded<string>(
        new UnboundedChannelOptions
        {
            SingleReader = true
        });

    var consumer = Task.Run(async () =>
    {
        while (await channel.Reader.WaitToReadAsync())
        {
            Console.WriteLine(await channel.Reader.ReadAsync());
        }
    });

    var producers = Enumerable
        .Range(1, 3)
        .Select(producerNumber =>Task.Run(async () =>
        {
            var rnd = new Random();
            for (var i = 0; i < 5; i++)
            {
                await Task.Delay(TimeSpan.FromSeconds(rnd.Next(3)));
                await channel.Writer.WriteAsync($"Producer:{producerNumber} Message {i}");
            }
        }));

    await Task.WhenAll(producers);
    channel.Writer.Complete();

    await consumer;
    Console.WriteLine("Completed.");
}

Consumer側の生成は1:1の場合と変わりありません。

違いはProducer側の実装です。1:1の時はforループ前後が次のように実装されていました。
forループの外でWriterをCompleteして閉じていましたが、これでは最初に処理が完了したProducerが閉じてしまい、以後のProducerの処理でエラーになってしまします。

for (int i = 0; i < 5; i++)
{
    await Task.Delay(TimeSpan.FromSeconds(rnd.Next(3)));
    await channel.Writer.WriteAsync($"Message {i}");
}
channel.Writer.Complete();

そのため、つぎのようにすべてのProducerの処理が完了してからCompleteするようにします。

await Task.WhenAll(producers);
channel.Writer.Complete();

ここは実際の実装はケースによって異なりますが、生産者の処理を全て待つ必要がない場合は、つぎのようにWriteAsyncではなくTryWriteを使うこともできます。

//await channel.Writer.WriteAsync($"Producer:{producerNumber} Message {i}");
channel.Writer.TryWrite($"Producer:{producerNumber} Message {i}");

Completeも閉じられる場所が一つではない場合は、TryCompleteを使うようにしましょう。

以上です。

47
28
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
47
28

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?