2
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

非同期でネットワークからの入力を中継するようなプログラムを書いていて、以前から.NETのSystem.Threading.Channelsを使ってみたかったので下調べしました。

誤解を恐れず簡単に説明すると、System.Threading.ChannelsはWriterから書き込んだObjectをReaderから読み出せるようにするための仕組み。
Queueなどに比べてマルチスレッド時のどうこうとか、データが来るまでの待ちとかがすっきりと書ける。

サンプルソース1

program1.cs
using System.Threading.Channels;
using static System.Runtime.InteropServices.JavaScript.JSType;

namespace ChannelsTest
{
    internal class Program
    {
        static void Main(string[] args)
        {
            Console.WriteLine("Main: Booted");

            Channel<string> channel = Channel.CreateUnbounded<string>(new UnboundedChannelOptions()
            {
                AllowSynchronousContinuations = false,
                SingleReader = false,
                SingleWriter = true,
            });


            Task writeTask = Task.Run(async () => 
            {
                try
                {
                    Console.WriteLine("Writer: Start");
                    var wt = channel.Writer;

                    for (int i = 0; i < 10; i++)
                    {
                        string data = DateTime.Now.ToLongTimeString();
                        Console.WriteLine($"Writer: [{i}] write '{data}'...");
                        await wt.WriteAsync(DateTime.Now.ToLongTimeString());
                        Console.WriteLine($"Writer: [{i}] write '{data}'...done.");
                        await Task.Delay(1000);
                    }
                    Console.WriteLine($"Writer: Complete...");
                    wt.Complete();
                    Console.WriteLine("Writer: end");
                }
                catch (Exception ex)
                {
                    Console.WriteLine($"Writer: Error: {ex}");
                }
            });

            Task readTask1 = Task.Run(async () => 
            {
                Console.WriteLine("Reader1: Start");
                var rd = channel.Reader;

                while (true)
                {
                    try
                    {
                        if (!await rd.WaitToReadAsync())
                        {
                            Console.WriteLine($"Reader1: Detect completion.");
                            break;
                        }

                        Console.WriteLine($"Reader1: read...");
                        string data = await rd.ReadAsync();
                        Console.WriteLine($"Reader1: read...done. '{data}'");
                    }
                    catch (Exception e)
                    {
                        Console.WriteLine($"Reader1: Error: {e}");
                        break;
                    }
                }

            });

            Task all = Task.WhenAll(writeTask, readTask1);

            Console.WriteLine("Main: Start tasks.");
            all.Wait();
            Console.WriteLine("Main: End tasks.");
        }
    }
}

詳しい解説はMicrosoftのドキュメントを参照という事で(コラ)気になるであろうあたりを。

初期化

ジェネリクス無しのChannelクラスのメソッドを使って作成。
ここでChannelの中を流すオブジェクトの型を指定する必要がある。

Channelのインスタンスはジェネリクス付きになる。
このインスタンスのWriter,Readerメソッドを通してやり取りすることになる。

Writer

データを送り出す方。

WriteAsync()とかTryWrite()とかで送りたいデータを突っ込んでやればよい。
全部のデータを送り終わったり、上流側でもうデータが来ないと分かった場合(例えば、ソケットが閉じたとか)はComplete()を実行して宣言してやる。

Reader

データを受ける方。

まず、WaitToReadAsync()を実行して受け取るデータがあるかを調べる。というか、データが来るかWriter側でComplete()されるまで待たされる。便利。
WaitToReadAsync()falseを返した場合はWriterがComplete()を実行したという事なので今後の読み込みをやめると幸せ。

WaitToReadAsync()trueを返した場合はWriterからのデータが来ているのでそれを受け取ってやる必要がある。
ReadAsync()するとWriterが突っ込んだオブジェクトが出てくる。

その後は再びWaitToReadAsync()で待ち戻るも良し、Reader側でComplete()して受付を終了するも良し。

実行結果

上記のプログラムを実行するとこんな出力になる。

DOS窓
Main: Booted
Main: Start tasks.
Reader1: Start
Writer: Start
Writer: [0] write '13:52:16'...
Reader1: read...
Reader1: read...done. '13:52:16'
Writer: [0] write '13:52:16'...done.
Writer: [1] write '13:52:17'...
Writer: [1] write '13:52:17'...done.
Reader1: read...
Reader1: read...done. '13:52:17'
Writer: [2] write '13:52:18'...
Writer: [2] write '13:52:18'...done.
Reader1: read...
Reader1: read...done. '13:52:18'
Writer: [3] write '13:52:19'...
Writer: [3] write '13:52:19'...done.
Reader1: read...
Reader1: read...done. '13:52:19'
Writer: [4] write '13:52:20'...
Writer: [4] write '13:52:20'...done.
Reader1: read...
Reader1: read...done. '13:52:20'
Writer: [5] write '13:52:21'...
Writer: [5] write '13:52:21'...done.
Reader1: read...
Reader1: read...done. '13:52:21'
Writer: [6] write '13:52:22'...
Writer: [6] write '13:52:22'...done.
Reader1: read...
Reader1: read...done. '13:52:22'
Writer: [7] write '13:52:23'...
Writer: [7] write '13:52:23'...done.
Reader1: read...
Reader1: read...done. '13:52:23'
Writer: [8] write '13:52:24'...
Writer: [8] write '13:52:24'...done.
Reader1: read...
Reader1: read...done. '13:52:24'
Writer: [9] write '13:52:25'...
Writer: [9] write '13:52:25'...done.
Reader1: read...
Reader1: read...done. '13:52:25'
Writer: Complete...
Writer: end
Reader1: Detect completion.
Main: End tasks.

WriterのTaskで1秒おきに10個のデータ(ここでは単なるstring)を書き込んで終了している。
Readerは無限に待ち続けるけど、Writer側でComplete()したのを検出すると終了する。
Writer側の表示を見ると分かるけどWriteAsync()はReder側で読まれるのを待たずに完了している。(こともある。これはConsoleがいけないのか、まぁ、気にしない。)

最後にちゃんとWriter側でのComplete()がReader側でも検出できて無事に終了。

複数対複数の送受信に使えるの??

結論:使えない。

まずWriterは1つのままReader側を増やした場合、複数あるReaderの「どれか」でしか受け取れない。

(ソースはめんどいので省略)

複数Reader
Main: Booted
Main: Start tasks.
Writer: Start
Reader2: Start
Reader1: Start
Writer: [0] write '14:19:17'...
Reader2: read...
Reader1: read...
Reader2: read...done. '14:19:17'
Writer: [0] write '14:19:17'...done.
Writer: [1] write '14:19:18'...
Writer: [1] write '14:19:18'...done.
Reader1: read...done. '14:19:18'
Writer: [2] write '14:19:19'...
Writer: [2] write '14:19:19'...done.
Reader2: read...
Reader1: read...
Reader2: read...done. '14:19:19'
Writer: [3] write '14:19:20'...
Writer: [3] write '14:19:20'...done.
Reader1: read...done. '14:19:20'
Writer: [4] write '14:19:21'...
Writer: [4] write '14:19:21'...done.
Reader1: read...
Reader1: read...done. '14:19:21'
Reader2: read...
Writer: [5] write '14:19:22'...
Writer: [5] write '14:19:22'...done.
Reader2: read...done. '14:19:22'
Writer: [6] write '14:19:23'...
Writer: [6] write '14:19:23'...done.
Reader2: read...
Reader2: read...done. '14:19:23'
Reader1: read...
Writer: [7] write '14:19:24'...
Writer: [7] write '14:19:24'...done.
Reader1: read...done. '14:19:24'
Writer: [8] write '14:19:25'...
Writer: [8] write '14:19:25'...done.
Reader2: read...
Reader2: read...done. '14:19:25'
Reader1: read...
Writer: [9] write '14:19:26'...
Writer: [9] write '14:19:26'...done.
Reader1: read...done. '14:19:26'
Writer: Complete...
Writer: end
Reader2: Detect completion.
Reader1: Detect completion.
Main: End tasks.

Writerから送ったデータ(例えば「14:19:17」)はReader1かReader2のどちらかでしか拾えていない。
Reader1かReader2はランダム(というか「たまたま拾えた方?」)。

まぁ、1:1での使用が前提なんでしょうね。
(どっかでChannelを応用して1:NのChannelを作っていた人がいたような…。)

そのほか

今回サンプルで使ったのはUnboundedなChannelなのでBoundedな方は未検証。
Boundedはキューの容量が制限出来たり、上限に達したときのそれぞれのメソッドの振る舞いが変わったりするそうな。
メモリの使用量をきっちり管理したかったり、外部からのアタックで爆破されたくないサーバなどの場合はBoundedを使ってオーバーフロー時の挙動をちゃんとコーディングしたほうがいいんだろうなーと想像。

2
2
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?