非同期でネットワークからの入力を中継するようなプログラムを書いていて、以前から.NETのSystem.Threading.Channelsを使ってみたかったので下調べしました。
誤解を恐れず簡単に説明すると、System.Threading.ChannelsはWriterから書き込んだObjectをReaderから読み出せるようにするための仕組み。
Queueなどに比べてマルチスレッド時のどうこうとか、データが来るまでの待ちとかがすっきりと書ける。
サンプルソース1
using System.Threading.Channels;
using static System.Runtime.InteropServices.JavaScript.JSType;
namespace ChannelsTest
{
internal class Program
{
static void Main(string[] args)
{
Console.WriteLine("Main: Booted");
Channel<string> channel = Channel.CreateUnbounded<string>(new UnboundedChannelOptions()
{
AllowSynchronousContinuations = false,
SingleReader = false,
SingleWriter = true,
});
Task writeTask = Task.Run(async () =>
{
try
{
Console.WriteLine("Writer: Start");
var wt = channel.Writer;
for (int i = 0; i < 10; i++)
{
string data = DateTime.Now.ToLongTimeString();
Console.WriteLine($"Writer: [{i}] write '{data}'...");
await wt.WriteAsync(DateTime.Now.ToLongTimeString());
Console.WriteLine($"Writer: [{i}] write '{data}'...done.");
await Task.Delay(1000);
}
Console.WriteLine($"Writer: Complete...");
wt.Complete();
Console.WriteLine("Writer: end");
}
catch (Exception ex)
{
Console.WriteLine($"Writer: Error: {ex}");
}
});
Task readTask1 = Task.Run(async () =>
{
Console.WriteLine("Reader1: Start");
var rd = channel.Reader;
while (true)
{
try
{
if (!await rd.WaitToReadAsync())
{
Console.WriteLine($"Reader1: Detect completion.");
break;
}
Console.WriteLine($"Reader1: read...");
string data = await rd.ReadAsync();
Console.WriteLine($"Reader1: read...done. '{data}'");
}
catch (Exception e)
{
Console.WriteLine($"Reader1: Error: {e}");
break;
}
}
});
Task all = Task.WhenAll(writeTask, readTask1);
Console.WriteLine("Main: Start tasks.");
all.Wait();
Console.WriteLine("Main: End tasks.");
}
}
}
詳しい解説はMicrosoftのドキュメントを参照という事で(コラ)気になるであろうあたりを。
初期化
ジェネリクス無しのChannel
クラスのメソッドを使って作成。
ここでChannelの中を流すオブジェクトの型を指定する必要がある。
Channelのインスタンスはジェネリクス付きになる。
このインスタンスのWriter
,Reader
メソッドを通してやり取りすることになる。
Writer
データを送り出す方。
WriteAsync()
とかTryWrite()
とかで送りたいデータを突っ込んでやればよい。
全部のデータを送り終わったり、上流側でもうデータが来ないと分かった場合(例えば、ソケットが閉じたとか)はComplete()
を実行して宣言してやる。
Reader
データを受ける方。
まず、WaitToReadAsync()
を実行して受け取るデータがあるかを調べる。というか、データが来るかWriter側でComplete()
されるまで待たされる。便利。
WaitToReadAsync()
がfalse
を返した場合はWriterがComplete()
を実行したという事なので今後の読み込みをやめると幸せ。
WaitToReadAsync()
がtrue
を返した場合はWriterからのデータが来ているのでそれを受け取ってやる必要がある。
ReadAsync()
するとWriterが突っ込んだオブジェクトが出てくる。
その後は再びWaitToReadAsync()
で待ち戻るも良し、Reader側でComplete()
して受付を終了するも良し。
実行結果
上記のプログラムを実行するとこんな出力になる。
Main: Booted
Main: Start tasks.
Reader1: Start
Writer: Start
Writer: [0] write '13:52:16'...
Reader1: read...
Reader1: read...done. '13:52:16'
Writer: [0] write '13:52:16'...done.
Writer: [1] write '13:52:17'...
Writer: [1] write '13:52:17'...done.
Reader1: read...
Reader1: read...done. '13:52:17'
Writer: [2] write '13:52:18'...
Writer: [2] write '13:52:18'...done.
Reader1: read...
Reader1: read...done. '13:52:18'
Writer: [3] write '13:52:19'...
Writer: [3] write '13:52:19'...done.
Reader1: read...
Reader1: read...done. '13:52:19'
Writer: [4] write '13:52:20'...
Writer: [4] write '13:52:20'...done.
Reader1: read...
Reader1: read...done. '13:52:20'
Writer: [5] write '13:52:21'...
Writer: [5] write '13:52:21'...done.
Reader1: read...
Reader1: read...done. '13:52:21'
Writer: [6] write '13:52:22'...
Writer: [6] write '13:52:22'...done.
Reader1: read...
Reader1: read...done. '13:52:22'
Writer: [7] write '13:52:23'...
Writer: [7] write '13:52:23'...done.
Reader1: read...
Reader1: read...done. '13:52:23'
Writer: [8] write '13:52:24'...
Writer: [8] write '13:52:24'...done.
Reader1: read...
Reader1: read...done. '13:52:24'
Writer: [9] write '13:52:25'...
Writer: [9] write '13:52:25'...done.
Reader1: read...
Reader1: read...done. '13:52:25'
Writer: Complete...
Writer: end
Reader1: Detect completion.
Main: End tasks.
WriterのTaskで1秒おきに10個のデータ(ここでは単なるstring
)を書き込んで終了している。
Readerは無限に待ち続けるけど、Writer側でComplete()したのを検出すると終了する。
Writer側の表示を見ると分かるけどWriteAsync()はReder側で読まれるのを待たずに完了している。(こともある。これはConsoleがいけないのか、まぁ、気にしない。)
最後にちゃんとWriter側でのComplete()がReader側でも検出できて無事に終了。
複数対複数の送受信に使えるの??
結論:使えない。
まずWriterは1つのままReader側を増やした場合、複数あるReaderの「どれか」でしか受け取れない。
(ソースはめんどいので省略)
Main: Booted
Main: Start tasks.
Writer: Start
Reader2: Start
Reader1: Start
Writer: [0] write '14:19:17'...
Reader2: read...
Reader1: read...
Reader2: read...done. '14:19:17'
Writer: [0] write '14:19:17'...done.
Writer: [1] write '14:19:18'...
Writer: [1] write '14:19:18'...done.
Reader1: read...done. '14:19:18'
Writer: [2] write '14:19:19'...
Writer: [2] write '14:19:19'...done.
Reader2: read...
Reader1: read...
Reader2: read...done. '14:19:19'
Writer: [3] write '14:19:20'...
Writer: [3] write '14:19:20'...done.
Reader1: read...done. '14:19:20'
Writer: [4] write '14:19:21'...
Writer: [4] write '14:19:21'...done.
Reader1: read...
Reader1: read...done. '14:19:21'
Reader2: read...
Writer: [5] write '14:19:22'...
Writer: [5] write '14:19:22'...done.
Reader2: read...done. '14:19:22'
Writer: [6] write '14:19:23'...
Writer: [6] write '14:19:23'...done.
Reader2: read...
Reader2: read...done. '14:19:23'
Reader1: read...
Writer: [7] write '14:19:24'...
Writer: [7] write '14:19:24'...done.
Reader1: read...done. '14:19:24'
Writer: [8] write '14:19:25'...
Writer: [8] write '14:19:25'...done.
Reader2: read...
Reader2: read...done. '14:19:25'
Reader1: read...
Writer: [9] write '14:19:26'...
Writer: [9] write '14:19:26'...done.
Reader1: read...done. '14:19:26'
Writer: Complete...
Writer: end
Reader2: Detect completion.
Reader1: Detect completion.
Main: End tasks.
Writerから送ったデータ(例えば「14:19:17」)はReader1かReader2のどちらかでしか拾えていない。
Reader1かReader2はランダム(というか「たまたま拾えた方?」)。
まぁ、1:1での使用が前提なんでしょうね。
(どっかでChannelを応用して1:NのChannelを作っていた人がいたような…。)
そのほか
今回サンプルで使ったのはUnbounded
なChannelなのでBounded
な方は未検証。
Bounded
はキューの容量が制限出来たり、上限に達したときのそれぞれのメソッドの振る舞いが変わったりするそうな。
メモリの使用量をきっちり管理したかったり、外部からのアタックで爆破されたくないサーバなどの場合はBounded
を使ってオーバーフロー時の挙動をちゃんとコーディングしたほうがいいんだろうなーと想像。