Channel
UniTask
には Channel
という機能があります。名前空間は Cysharp.Threading.Tasks
です
System.Threading.Channels を元に作られていて、 async/await
と親和性の高いデータの受け渡しが手軽にできます
データを発行する生産者 と それを読み取る消費者 という2人の登場人物が出てきます
UniTaskのChannel と System.Threading.Channels.Channel
(読み飛ばしても良き)
Channel は 生産者と消費者が 何 : 何 なのかをしっかり意識してプログラムを書く必要があります
System.Threading.Channels は
Channel<int> channel = Channel.CreateUnbounded<int>(
new UnboundedChannelOptions
{
SingleWriter = true,
SingleReader = true,
});
こんな感じで Channel に Writer(生産者), Reader(消費者)が単一なのか複数なのかをオプションで指定できます
対して UniTask の Channel は 単一消費者 専用
です
Channel<int> channel = Channel.CreateSingleConsumerUnbounded<int>();
ファクトリメソッドの名前からして単一消費者臭がぷんぷんしますね!
実際 Channel<T> は abstract なので今後 Channel.CreateUnbounded
が来てもおかしくない!
Writer と Reader
Channel は Writer(生産者), Reader(消費者) に分かれてデータのやりとりをします。
データ構造は内部でキューが使われていて、 Writer が エンキュー、 Reader がデキューをするイメージです。
UniTask の Channel は単一消費者
なので注意が必要です
Writer
Writerはこんな感じの手順です
- Channel から ChannelWriter を取得する
- 値を書き込む(Enqueueする)
- チャネルを閉じる
// ChannelWriterを取得する
ChannelWriter<int> writer = channel.Writer;
// 値を書き込む
writer.TryWrite(1);
writer.TryWrite(2);
writer.TryWrite(3);
// チャネルを閉じる
writer.TryComplete();
writer.TryWrite
キューに値を追加します。チャネルが既に閉じている場合は false が返ってきます
writer.TryComplete
/ void writer.Complete
後片付け的な処理です。チャネルを閉じ、これ以降は値を書き込めなくなります
既にチャネルが閉じている場合、TryComplete は false を返し、Complete は ChannelClosedException がスローされます
Reader
- Channel から ChannelReader を取得する
- 読み込む(Dequeueする)
- (チャネルの完了を待つ)
// ChannelReaderを取得する
ChannelReader<int> reader => channel.Reader;
// 値が書き込まれるまで待ってtrueを返す(クローズしていてキューに何もなくなるとfalseを返す)
// 既に値がキューされていたら true を返す
if(await reader.WaitToReadAsync(token))
{
// キューから値を1つ取り出す。キューに何もなかったらfalseを返す
if(!reader.TryRead(out var i)) return;
Debug.Log(i); // 値が取れたときだけログを吐く
}
// 既に値がキューされていたらデキューして返す
// なかったら WaitToReadAsync + TryRead の挙動をする
var value = await reader.ReadAsync();
// 全ての書き込みを IUniTaskAsyncEnumerable<T> で値を取りに行く
await reader.ReadAllAsync()
.ForEachAsync(i =>
{
Debug.Log(i);
});
// クローズされていてキューに何もないときに完了するUniTask
await reader.Completion;
Reader はいくつか読み込み方法があるので使い分ける必要があります。
1個だけ値が欲しいとき
-
await reader.WaitToReadAsync
+reader.TryRead
WaitToReadAsync は値が書き込まれるまで待ちます
両方とも例外を吐かず、キューに何もない場合は false が返ってきます
そのとき、reader.TryRead には default値が渡されるので注意しましょう
await reader.ReadAsync
既に値がキューされていたらデキューして返し、なければ WaitToReadAsync + TryRead の挙動をします
クローズされていてキューになにもない場合は ChannelClosedException がスローされます
ログを出したくないときは try-catch で例外を握りつぶせばOK!
複数の値が欲しいとき
-
while
+await reader.WaitToReadAsync
+reader.TryRead
while と WaitToReadAsync を組み合わせることで複数待機が可能です
// チャネルが閉じるまで繰り返す
while(await reader.WaitToReadAsync())
{
if(!reader.TryRead(out var i)) return;
Debug.Log(i);
}
-
while
+await reader.ReadAsync
// 例外が吐かれるので無限ループでも大丈夫
while(true)
{
var i = await reader.ReadAsync();
Debug.Log(i);
}
reader.ReadAllAsync
writer.TryWrite を IUniTaskAsyncEnumerable<T> で待機できます。個人的にこれが一番好きですね
Forget すればブロッキングせずに読み込み処理ができる、とか
値の取り方(ForEachAwaitAsyncとか)オペレータを組わせたりとか色々な工夫ができます
チャネルが閉じすべての値を読み込み終わったら foreach の await が完了します
reader.Completion
Writer が TryComplete / Complete を呼んでいて(クローズ)、かつ Reader の読み込みが終了した(キューが空になった)ときに完了する UniTask です
TryComplete / Complete の引数に例外を指定していたら、reader.Completion は失敗状態になり例外を吐きます
注意点
やはり注意すべきは単一消費者という点です
内部では1つのチャネルに1つのキューが使われていて Writer も Reader もそれを共有しています。多方面から Reader が デキューをするとそりゃ意図しない挙動になるよねっていう話です。狭いスコープで使うのが良いと思います。
Reader を public にしてデータのやり取りをする UniRx みたいな使い方はできないでしょう。
もし値を外部にも通知したい場合は reader.ReadAsync().Publish()
を使うことで安全に IUniTaskAsyncEnumerable として公開できます
IUniTaskAsyncEnumerableとして外部に公開する
ということで、例として Channel で動くタイマーを作ってみました
public class AsyncTimer : IDisposable
{
private readonly int limitTime; // 制限時間
private readonly Channel<int> channel;
private readonly IConnectableUniTaskAsyncEnumerable<int> publish;
private readonly IDisposable connection;
private CancellationTokenSource cancellationTokenSource;
private CancellationToken cancellationToken;
// IUniTaskAsyncEnumerable<T>として公開する
public IUniTaskAsyncEnumerable<int> CountdownAsyncEnumerable => publish;
public AsyncTimer(int limitTime)
{
// チャネルを生成
channel = Channel.CreateSingleConsumerUnbounded<int>();
// IConnectableUniTaskAsyncEnumerable<T>に変換
publish = channel.Reader.ReadAllAsync().Publish();
// .Connect()で起動する
connection = publish.Connect();
this.limitTime = limitTime;
cancellationTokenSource = new CancellationTokenSource();
cancellationToken = cancellationTokenSource.Token;
}
public async UniTask CountdownAsync()
{
// カウントダウン処理
foreach(var i in Enumerable.Range(0,limitTime + 1).Reverse())
{
// 値を書き込む
channel.Writer.TryWrite(i);
if(i == 0) break;
await UniTask.Delay(1000, cancellationToken : cancellationToken);
}
// チャネルをクローズする
channel.Writer.TryComplete();
}
public void Dispose()
{
channel.Writer.TryComplete();
connection.Dispose();
cancellationTokenSource.Cancel();
cancellationTokenSource.Dispose();
}
}
注目すべきはココです
// チャネルを生成
channel = Channel.CreateSingleConsumerUnbounded<int>();
// IConnectableUniTaskAsyncEnumerable<T>に変換
publish = channel.Reader.ReadAllAsync().Publish();
// .Connect()で起動する
connection = publish.Connect();
Publish() と Connect() で外部に公開してもちゃんと動く IUniTaskAsyncEnumerable ができました
.Connect()を呼ばないと writer.TryWrite しても 値を取りに行けないので注意が必要です
まとめ
- チャネルは非同期キューイングな処理ができる
- Writer と Reader に分かれて処理を行う
- 基本は System.Threading.Channels と同じなのでそっちを調べても良き
- 単一消費者専用なので注意
- 消費者を複数にする場合は Publish() と Connect() を使う