33
37

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

C# の Channel を学ぶ (1)

Last updated at Posted at 2020-10-31

自分が引き継いでメンテしているリポジトリで、Channelが出てきたのですが、よくわかっていなかったので、少しづつ勉強していきたいと思います。このブログは基本的に下記のブログをフォローして自分で試しているものです。

Concurrent と Parallelism

Concurrent と Parallelism は別の概念だ。Concurrent は2つのオペレーションが、お互いに気づくことなく実行される。その際、実際に実行される時間は関係がない。別の例を挙げるとわかりやすいかもしれない。シングルプロセッサのマシンでは、Concurrentな実行は、可能だけど、Parallelに実行することはできない。つまり、Concurrentは、構造のことであって、Parallelismの恩恵を受けるかもしれないものだ。Paralleismは本当に同時に処理することを指す。タイムシェアリングなどで、平行に動いているように見えるのではない。

Channels とは

コンカレントのプログラミングはお互い独立しているので、コーディネイトする仕組みが必要になる。Go 言語でおなじみの Channel の仕組みが、.NET にも存在する。チャネルは、データストラクチャで、一つのスレッドがほかのスレッドとコミュニケーションをとることを可能にします。通常 .NET ではロックや、Synchronization の仕組みを使って変数を共有して行われます。一方チャネルは、2つのスレッドの間でメッセージをダイレクトに送信します。Syncronization やロックは必要ありません。メッセージは、First In First Out で送信されます。

Simple usage

基本的な使い方としては、次のようにチャネルを生成します。

var ch = Channel.CreateUnbounded<string>();

Producer の部分でデータを生成します。ランダムにディレイさせながら、メッセージを生成していき、最後に Completeでそのチャネルの終了を送信します。

var producer = Task.Run(async () =>
{
    var rnd = new Random();
    for (int i = 0; i < 5; i++)
    {
        Console.WriteLine($"Producer: waiting... ");
        await Task.Delay(TimeSpan.FromSeconds(rnd.Next(3)));
        Console.WriteLine("Writing to the channel");
        await ch.Writer.WriteAsync($"Message {i}");
    }
    ch.Writer.Complete();
});

Consumer では、チャネルを読み込みます。データがある限り、つまり Complete() メソッドが Producer 側でコールされるまで読み続けられます。
ProducerConsumer は別スレッドで動作していますが、チャネルを通じてデータを送付出来ているのでがわかると思います。

var consumer = Task.Run(async () =>
{
    while (await ch.Reader.WaitToReadAsync())
        Console.WriteLine($"Consumer: {await ch.Reader.ReadAsync()}");
});

image.png

Generator Sample

Generator のパターンをご紹介します。このパターンでは、CreateMessage() というメソッドが、ChannelReader<T> を返却するようにしています。メソッドの内部で、チャネルを作成して、新しいスレッドを起動して、データを書き込みます。これはブロックしないので、先に処理が進みます。
そして、チャネルのリーダーを返します。

static ChannelReader<string> CreateMessenger(string msg, int count)
{
    var ch = Channel.CreateUnbounded<string>();
    var rnd = new Random();

    Task.Run(async () =>
    {
        for (int i = 0; i < count; i++)
        {
            await ch.Writer.WriteAsync($"[{DateTime.Now}] {msg} {i}");
            await Task.Delay(TimeSpan.FromSeconds(rnd.Next(3)));
        }
        ch.Writer.Complete();

    });

    return ch.Reader;
}

使用する側としては、チャネルのリーダーが獲得できるので、それのどちらかのメッセージが来たら、受け取ったメッセージを表示しています。このパターンはチャネルが1つだとうまくいきますが、このように2つ以上つかうと問題になるケースがあります。

        public async Task ExecuteAsync()
        {
            var joe = CreateMessenger("Joe", 2);
            var ann = CreateMessenger("Ann", 5);

            while(await joe.WaitToReadAsync() || await ann.WaitToReadAsync())
            {
                Console.WriteLine(await joe.ReadAsync());
                Console.WriteLine(await ann.ReadAsync());
            }
        }

しかし、このコードには問題があります。どちらかのチャネルが早く終わってしまった後に、再度 ReadAsync() メソッドを呼ぶと例外が発生してしまいます。この問題をどうやって解決すればよいでしょうか?

image.png

Merge

一つの方法として、Mergeのパターンを使う方法があります。複数のチャネルを1つのチャネルにマージします。具体的には、新たなチャネルを作って、複数のチャネルのメッセージをそちらに流し込むようにすればOKです。

static ChannelReader<T> Merge<T>(
    ChannelReader<T> first, ChannelReader<T> second)
{
    var output = Channel.CreateUnbounded<T>();
    Task.Run(async () =>
    {
        await foreach (var item in first.ReadAllAsync())
            await output.Writer.WriteAsync(item);
    });
    Task.Run(async () =>
    {
        await foreach (var item in second.ReadAllAsync())
            await output.Writer.WriteAsync(item);
    });

    return output;
}
public async Task ExecuteAync()
{
    var ch = Merge(CreateMessenger("Joe", 3), CreateMessenger("Ann", 5));
    await foreach (var item in ch.ReadAllAsync())
        Console.WriteLine(item);
}

Joe のメッセージが早めに終了していますが、1つのチャネルにマージしていますので、問題は起りません。問題があるとすると、すべての処理が終わったことを知る方法がこのコードではありません。
image.png

Merge improved

Merge のパターンを汎用的に使えるように改造してみましょう。Arrayを使って、すべてのチャネルを1つに統合しますが、その際にTask.WhenAll を使って、チャネル自体の終了を待ち受けます。すべてのチャネルが終了すると、プログラムが終了します。

static ChannelReader<T> Merge<T>(
    ChannelReader<T>[] inputs)
{
    var output = Channel.CreateUnbounded<T>();

    Task.Run(async () =>
    {
        async Task Redirect(ChannelReader<T> input)
        {
            await foreach (var item in input.ReadAllAsync())
                await output.Writer.WriteAsync(item);
        }

        await Task.WhenAll(inputs.Select(i => Redirect(i)).ToArray());
        output.Writer.Complete();
    });
    return output;
}
public async Task ExecuteAync()
{
    var inputs = new List<ChannelReader<string>>();
    inputs.Add(CreateMessenger("Joe", 3));
    inputs.Add(CreateMessenger("Ann", 5));
    var ch = Merge<string>(inputs.ToArray());
    await foreach (var item in ch.ReadAllAsync())
        Console.WriteLine(item);
}

全てのチャネルが終了したら、プログラム自体もしっかり終了しています。
image.png

Demultiplexer

今回ご紹介する最後のパターンですが、Producer に複数の Consumer が Subscribe する形式のパターンです。ポイントは、1つのチャネルを複数のチャネルに分割しています。具体的には index = (index + 1) % n; を使って、メッセージを複数のチャネルにロードバランシングしています。

static IList<ChannelReader<T>> Split<T>(ChannelReader<T> ch, int n)
{
    var outputs = new Channel<T>[n];
    for (int i = 0; i < n; i++)
        outputs[i] = Channel.CreateUnbounded<T>();

    Task.Run(async () =>
    {
        var index = 0;
        await foreach (var item in ch.ReadAllAsync())
        {
            await outputs[index].Writer.WriteAsync(item);
            index = (index + 1) % n;
        }

        foreach (var ch in outputs)
            ch.Writer.Complete();
    });
    return outputs.Select(ch => ch.Reader).ToArray();
}

受け取る側では、複数のチャネルを作ったのち、Split メソッドで、チャネルを分割、ループで、それぞれのチャネルごとに、Consumer のスレッドを作成して、すべての処理が終了すると、プログラムが終了します。

public async Task ExecuteAsync()
{
    var joe = CreateMessenger("Joe", 10);
    var readers = Split<string>(joe, 3);
    var tasks = new List<Task>();

    for (int i=0; i < readers.Count; i++)
    {
        var reader = readers[i];
        var index = i;
        tasks.Add(Task.Run(async () =>
        {
            await foreach (var item in reader.ReadAllAsync())
                Console.WriteLine($"Reader {index}: {item}");
        }));
    }
    await Task.WhenAll(tasks);
}

image.png

まとめ

C#のChannelもGoのチャネルと同じように、ロックや同期を意識せずに、スレッド間でメッセージの送受信ができてとても便利です。次回は、タイムアウトの処理などのパターンについて学んでいきたいと思います。

Resource

33
37
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
33
37

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?