自分が引き継いでメンテしているリポジトリで、Channel
が出てきたのですが、よくわかっていなかったので、少しづつ勉強していきたいと思います。このブログは基本的に下記のブログをフォローして自分で試しているものです。
Concurrent と Parallelism
Concurrent と Parallelism は別の概念だ。Concurrent は2つのオペレーションが、お互いに気づくことなく実行される。その際、実際に実行される時間は関係がない。別の例を挙げるとわかりやすいかもしれない。シングルプロセッサのマシンでは、Concurrentな実行は、可能だけど、Parallelに実行することはできない。つまり、Concurrentは、構造のことであって、Parallelismの恩恵を受けるかもしれないものだ。Paralleismは本当に同時に処理することを指す。タイムシェアリングなどで、平行に動いているように見えるのではない。
Channels とは
コンカレントのプログラミングはお互い独立しているので、コーディネイトする仕組みが必要になる。Go 言語でおなじみの Channel の仕組みが、.NET にも存在する。チャネルは、データストラクチャで、一つのスレッドがほかのスレッドとコミュニケーションをとることを可能にします。通常 .NET ではロックや、Synchronization の仕組みを使って変数を共有して行われます。一方チャネルは、2つのスレッドの間でメッセージをダイレクトに送信します。Syncronization やロックは必要ありません。メッセージは、First In First Out で送信されます。
Simple usage
基本的な使い方としては、次のようにチャネルを生成します。
var ch = Channel.CreateUnbounded<string>();
Producer
の部分でデータを生成します。ランダムにディレイさせながら、メッセージを生成していき、最後に Complete
でそのチャネルの終了を送信します。
var producer = Task.Run(async () =>
{
var rnd = new Random();
for (int i = 0; i < 5; i++)
{
Console.WriteLine($"Producer: waiting... ");
await Task.Delay(TimeSpan.FromSeconds(rnd.Next(3)));
Console.WriteLine("Writing to the channel");
await ch.Writer.WriteAsync($"Message {i}");
}
ch.Writer.Complete();
});
Consumer
では、チャネルを読み込みます。データがある限り、つまり Complete()
メソッドが Producer
側でコールされるまで読み続けられます。
Producer
と Consumer
は別スレッドで動作していますが、チャネルを通じてデータを送付出来ているのでがわかると思います。
var consumer = Task.Run(async () =>
{
while (await ch.Reader.WaitToReadAsync())
Console.WriteLine($"Consumer: {await ch.Reader.ReadAsync()}");
});
Generator Sample
Generator
のパターンをご紹介します。このパターンでは、CreateMessage()
というメソッドが、ChannelReader<T>
を返却するようにしています。メソッドの内部で、チャネルを作成して、新しいスレッドを起動して、データを書き込みます。これはブロックしないので、先に処理が進みます。
そして、チャネルのリーダーを返します。
static ChannelReader<string> CreateMessenger(string msg, int count)
{
var ch = Channel.CreateUnbounded<string>();
var rnd = new Random();
Task.Run(async () =>
{
for (int i = 0; i < count; i++)
{
await ch.Writer.WriteAsync($"[{DateTime.Now}] {msg} {i}");
await Task.Delay(TimeSpan.FromSeconds(rnd.Next(3)));
}
ch.Writer.Complete();
});
return ch.Reader;
}
使用する側としては、チャネルのリーダーが獲得できるので、それのどちらかのメッセージが来たら、受け取ったメッセージを表示しています。このパターンはチャネルが1つだとうまくいきますが、このように2つ以上つかうと問題になるケースがあります。
public async Task ExecuteAsync()
{
var joe = CreateMessenger("Joe", 2);
var ann = CreateMessenger("Ann", 5);
while(await joe.WaitToReadAsync() || await ann.WaitToReadAsync())
{
Console.WriteLine(await joe.ReadAsync());
Console.WriteLine(await ann.ReadAsync());
}
}
しかし、このコードには問題があります。どちらかのチャネルが早く終わってしまった後に、再度 ReadAsync()
メソッドを呼ぶと例外が発生してしまいます。この問題をどうやって解決すればよいでしょうか?
Merge
一つの方法として、Merge
のパターンを使う方法があります。複数のチャネルを1つのチャネルにマージします。具体的には、新たなチャネルを作って、複数のチャネルのメッセージをそちらに流し込むようにすればOKです。
static ChannelReader<T> Merge<T>(
ChannelReader<T> first, ChannelReader<T> second)
{
var output = Channel.CreateUnbounded<T>();
Task.Run(async () =>
{
await foreach (var item in first.ReadAllAsync())
await output.Writer.WriteAsync(item);
});
Task.Run(async () =>
{
await foreach (var item in second.ReadAllAsync())
await output.Writer.WriteAsync(item);
});
return output;
}
public async Task ExecuteAync()
{
var ch = Merge(CreateMessenger("Joe", 3), CreateMessenger("Ann", 5));
await foreach (var item in ch.ReadAllAsync())
Console.WriteLine(item);
}
Joe のメッセージが早めに終了していますが、1つのチャネルにマージしていますので、問題は起りません。問題があるとすると、すべての処理が終わったことを知る方法がこのコードではありません。
Merge improved
Merge のパターンを汎用的に使えるように改造してみましょう。Arrayを使って、すべてのチャネルを1つに統合しますが、その際にTask.WhenAll
を使って、チャネル自体の終了を待ち受けます。すべてのチャネルが終了すると、プログラムが終了します。
static ChannelReader<T> Merge<T>(
ChannelReader<T>[] inputs)
{
var output = Channel.CreateUnbounded<T>();
Task.Run(async () =>
{
async Task Redirect(ChannelReader<T> input)
{
await foreach (var item in input.ReadAllAsync())
await output.Writer.WriteAsync(item);
}
await Task.WhenAll(inputs.Select(i => Redirect(i)).ToArray());
output.Writer.Complete();
});
return output;
}
public async Task ExecuteAync()
{
var inputs = new List<ChannelReader<string>>();
inputs.Add(CreateMessenger("Joe", 3));
inputs.Add(CreateMessenger("Ann", 5));
var ch = Merge<string>(inputs.ToArray());
await foreach (var item in ch.ReadAllAsync())
Console.WriteLine(item);
}
全てのチャネルが終了したら、プログラム自体もしっかり終了しています。
Demultiplexer
今回ご紹介する最後のパターンですが、Producer
に複数の Consumer
が Subscribe する形式のパターンです。ポイントは、1つのチャネルを複数のチャネルに分割しています。具体的には index = (index + 1) % n;
を使って、メッセージを複数のチャネルにロードバランシングしています。
static IList<ChannelReader<T>> Split<T>(ChannelReader<T> ch, int n)
{
var outputs = new Channel<T>[n];
for (int i = 0; i < n; i++)
outputs[i] = Channel.CreateUnbounded<T>();
Task.Run(async () =>
{
var index = 0;
await foreach (var item in ch.ReadAllAsync())
{
await outputs[index].Writer.WriteAsync(item);
index = (index + 1) % n;
}
foreach (var ch in outputs)
ch.Writer.Complete();
});
return outputs.Select(ch => ch.Reader).ToArray();
}
受け取る側では、複数のチャネルを作ったのち、Split
メソッドで、チャネルを分割、ループで、それぞれのチャネルごとに、Consumer のスレッドを作成して、すべての処理が終了すると、プログラムが終了します。
public async Task ExecuteAsync()
{
var joe = CreateMessenger("Joe", 10);
var readers = Split<string>(joe, 3);
var tasks = new List<Task>();
for (int i=0; i < readers.Count; i++)
{
var reader = readers[i];
var index = i;
tasks.Add(Task.Run(async () =>
{
await foreach (var item in reader.ReadAllAsync())
Console.WriteLine($"Reader {index}: {item}");
}));
}
await Task.WhenAll(tasks);
}
まとめ
C#のChannelもGoのチャネルと同じように、ロックや同期を意識せずに、スレッド間でメッセージの送受信ができてとても便利です。次回は、タイムアウトの処理などのパターンについて学んでいきたいと思います。