概要
前回の記事で、排他などもできる便利なキューとしてSystem.Threading.Channelsを使うという切り口で、基本的な使い方の紹介をしました。今回は、ちょっとした応用の使い方を書こうと思います。
デリゲートを入れて、処理の待ち行列にもできる
「キュー」と言われたときにイメージするのは、やはりデータを入れるキューだと思います。
System.Threading.ChannelsはGenericなので、自由な型を与えることができます。ということは、Action型等のデリゲートを与えることもできます。
つまり、次のようにバラバラの処理の待ち行列としても使えるということです。
var queue = Channel.CreateUnbounded<Action>();
//Enqueue側
await queue.Writer.WriteAsync(()=>{
Console.WriteLine("1つ目のAction");
});
await queue.Writer.WriteAsync(()=>{
Trace.WriteLine("2つ目のAction");
});
//Dequeue側
await foreach(var dequeueData = queue.Reader.ReadAllAsync()){
dequeueData.Invoke();
}
Enqueue側で入れたバラバラな処理を、Dequeue側では内容を意識することなく順番に実行できています。
Dequeue側で「空になった」時の処理分岐も可能
シンプルな使い方では、キューの中身があれば取り出し、無ければ待つ、というのを1メソッドで行うシンプルな処理でした。これは便利なメソッドですが、もっと細かく書くこともできます。例えば「空の時だけフラグを立てる」といった処理も可能です。例えば次のようになります。
var queue = Channel.CreateUnbounded<Action>();
bool isEmpty;
//Dequeue側
while(true){
if(!queue.Reader.TryRead(out var item)){
//キューが空の場合
isEmpty = true;
if(!await queue.Reader.WaitToReadAsync()){
//キューがCompleteされた場合
break;
}
//キューにアイテムが入ってきた場合
isEmpty = false;
continue;
}
//デキューできた場合
//itemの処理をする
}
TryReadは、取得を試みてすぐに成否を返します。失敗した場合は空なので、WaitToReadAsync()で取得可能になるまで待ちます。これらを利用してReadAllAsync()と同じような使い方になるようにループを組んだものです。キューが空の状態のときだけ、isEmpty=trueになっています。
処理が成功するまでキューから消さない事も可能
Dequeue側で取り出した時点でキューから消すのではなく、処理が成功した場合だけキューから消したいというケースも有ります。いわゆるfront→popです。System.Threading.Channelsはそういう処理にも対応しています。TryPeekというメソッドがあり、Peekで取得しても、Readで取得し直すまではキューから消えることはありません。似たようなキューのライブラリを使ったことがあれば、よく出てくる組み合わせだと思います。例えば、次のような使い方になります。
var queue = Channel.CreateUnbounded<Func<bool>>();
//Dequeue側
queue.Reader.TryPeek(out var item)
//処理(失敗時は例外を投げる)
queue.Reader.TryRead(out _)
この例だと、処理が失敗した場合は例外になってReadへ進まないので、キューの中身は消えません。
シンプルな処理なので、Peekしている間に他スレッドのReadをブロックするような機能はありません。Dequeue側が複数スレッドの場合は、Peek~Read間を排他するなど、何か工夫は必要になると思います。
まとめ
System.Threading.Channelsは最低限のキューとして使うのも簡単ですが、応用的に「こういう使い方をしたい」という要望にも割と応えてくれます。今回の記事も合わせるとだいぶ色々なケースで使えると思うので、お勧めです。