今回の内容
「R3」がasync/await
と組み合わせていろいろできてかなり面白いです。
今回はその中でも「Observable
をTask
に変換してawait
する」にフォーカスして機能を紹介します。
(Observable
をTask
に変換するという機能自体は実は本家Rx(dotnet/reactive)にありました。それ相当の機能がR3によってUnityでも使えるようになったというお話)
執筆時の環境
- Unity - 2023.1.14f1
- R3 - 1.0.4
ObservableをTaskに変換してasync/awaitで待つ
Observable
(Rx/R3におけるメッセージストリームのこと)をTask
に変換し、async/await
で待ち受けることができます。なおObservable
から変換したTask
が完了する条件は原則として「OnCompletedメッセージが発行されたとき」です。
いろいろ待ち受けができるので、用途に合わせて選んでください。
private async ValueTask AwaitSamplesAsync(CancellationToken ct)
{
// ベースとなるObservable(0から9までの値を連続して出力する)
// Result: 0 1 2 3 4 5 6 7 8 9
Observable.Range(0, 10, ct).Subscribe(x => Debug.Log(x));
Debug.Log(
// AllAsync:発行された全ての値が条件を満たすかどうかを判定する
// Result: True
await Observable.Range(0, 10, ct)
.AllAsync(x => x >= 0, ct)
);
Debug.Log(
// AnyAsync:発行された値のうち、一つでも条件を満たすかどうかを判定する
// 途中で条件を満たしたとしてもOnCompletedが発行されるまでは結果を返さない
// Result: False
await Observable.Range(0, 10, ct)
.AnyAsync(x => x < 0, ct)
);
Debug.Log(
// FirstAsync:条件を満たした最初の値を1個だけ取り出す
// Anyと違い条件を満たした瞬間に完了する
// 条件を満たすものがない場合はOnCompleted(InvalidOperationException)が発行される
// Result: 3
await Observable.Range(0, 10, ct)
.FirstAsync(x => x >= 3, cancellationToken: ct)
);
Debug.Log(
// FirstOrDefaultAsync:条件を満たした最初の値を1個だけ取り出す
// Anyと違い条件を満たした瞬間に完了する
// 条件を満たすものがない場合はOnCompleted(default)が発行される
// Result: 3
await Observable.Range(0, 10, ct)
.FirstOrDefaultAsync(x => x >= 3, cancellationToken: ct)
);
Debug.Log(
// SingleAsync:条件を満たした値を1個だけ取り出す
// 条件を満たすものがない場合はOnCompleted(InvalidOperationException)が発行される
// FirstAsyncと違いは次のとおり
// 1. OnCompletedが発行されるまで結果を返さない
// 2. 条件を満たす値が2個以上あった場合はOnCompleted(InvalidOperationException)が発行される
// Result: 9
await Observable.Range(0, 10, ct)
.SingleAsync(x => x >= 9, cancellationToken: ct)
);
Debug.Log(
// SingleOrDefaultAsync:条件を満たした値を1個だけ取り出す
// 条件を満たすものがない場合はOnCompleted(default)が発行される
// FirstOrDefaultAsyncと違いは次のとおり
// 1. OnCompletedが発行されるまで結果を返さない
// 2. 条件を満たす値が2個以上あった場合はOnCompleted(InvalidOperationException)が発行される
// Result: 0
await Observable.Range(0, 10, ct)
.SingleOrDefaultAsync(x => x > 100, cancellationToken: ct)
);
Debug.Log(
// LastAsync:条件を満たした最後の値を1個だけ取り出す
// 条件を満たすものがない場合はOnCompleted(InvalidOperationException)が発行される
// Result: 9
await Observable.Range(0, 10, ct)
.LastAsync(x => x >= 3, cancellationToken: ct)
);
Debug.Log(
// LastOrDefaultAsync:条件を満たした最後の値を1個だけ取り出す
// 条件を満たすものがない場合はOnCompleted(default)が発行される
// Result: 9
await Observable.Range(0, 10, ct)
.LastOrDefaultAsync(x => x >= 3, cancellationToken: ct)
);
Debug.Log(
// AggregateAsync:値を畳み込み(fold)計算する
// Result: 45
await Observable.Range(0, 10, ct)
.AggregateAsync((total, cur) => total + cur, cancellationToken: ct)
);
{
// AggregateByAsync:値をkeySelectorでグループ化し、そのグループごとに畳み込み(fold)計算する
// Result: 4.5
IEnumerable<KeyValuePair<int, int>> result =
await Observable.Range(0, 10, ct)
.AggregateByAsync(
// 偶数グループと奇数グループに分けてそれぞれ合計値を計算
keySelector: key => key % 2,
seed: 0,
func: (total, cur) => total + cur,
cancellationToken: ct);
// Result: Key:0 - Value:20
// Key:1 - Value:25
foreach (var kv in result)
{
Debug.Log($"Key:{kv.Key} - Value:{kv.Value}");
}
}
Debug.Log(
// AverageAsync:発行された値の平均値を計算する
// Result: 4.5
await Observable.Range(0, 10, ct)
.AverageAsync(cancellationToken: ct)
);
Debug.Log(
// ContainsAsync:指定した値が発行されたかどうかを判定しbool値として結果を返す
// Result: True
await Observable.Range(0, 10, ct)
.ContainsAsync(3, cancellationToken: ct)
);
Debug.Log(
// MaxAsync:発行された値の"最大値"を取得する
// IComparer<T>を指定すると比較方法を指定できる
// Result: 9
await Observable.Range(0, 10, ct)
.MaxAsync(cancellationToken: ct)
);
Debug.Log(
// MinAsync:発行された値の"最小値"を取得する
// IComparer<T>を指定すると比較方法を指定できる
// Result: 0
await Observable.Range(0, 10, ct)
.MinAsync(cancellationToken: ct)
);
// Observable.Range(0, 10, ct).MinMaxAsync()
Debug.Log(
// SumAsync:発行された値の"最小値"と"最大値"を同時に取得する
// IComparer<T>を指定すると比較方法を指定できる
// Result: (0, 9)
await Observable.Range(0, 10, ct)
.MinMaxAsync(cancellationToken: ct)
);
Debug.Log(
// SumAsync:発行された値の合計値を計算する
// Result: 45
await Observable.Range(0, 10, ct)
.SumAsync(cancellationToken: ct)
);
Debug.Log(
// MaxByAsync:発行された値の"最大値を含んだメッセージ本体"を取得する
// IComparer<T>を指定するこで比較方法を指定できる
await Observable.Repeat(Unit.Default, 10)
.Select(_ => UnityEngine.Random.insideUnitSphere) // 10個のランダムなVector3を生成
.MaxByAsync(v => v.y, cancellationToken: ct) // y座標が最大の「Vector3」を取得
);
Debug.Log(
// MinByAsync:発行された値の"最小値を含んだメッセージ本体"を取得する
// IComparer<T>を指定するこで比較方法を指定できる
await Observable.Repeat(Unit.Default, 10)
.Select(_ => UnityEngine.Random.insideUnitSphere) // 10個のランダムなVector3を生成
.MinByAsync(v => v.y, cancellationToken: ct) // y座標が最小の「Vector3」を取得
);
Debug.Log(
// ElementAtAsync:指定したインデックスの値を取得する
// インデックスが範囲外の場合はOnCompleted(ArgumentOutOfRangeException)が発行される
// Result: 30
await Observable.Range(0, 10, ct)
.Select(x => x * 10) // 値を10倍にする
.ElementAtAsync(3, cancellationToken: ct)
);
Debug.Log(
// ElementAtOrDefaultAsync:指定したインデックスの値を取得する
// インデックスが範囲外の場合はOnCompleted(default)が発行される
// Result: 30
await Observable.Range(0, 10, ct)
.Select(x => x * 10) // 値を10倍にする
.ElementAtOrDefaultAsync(3, cancellationToken: ct)
);
Debug.Log(
// IsEmptyAsync:OnNextが一度も発行されなかったかどうかを判定する
// Result: False
await Observable.Range(0, 10, ct)
.IsEmptyAsync(ct)
);
Debug.Log(
// CountAsync:発行された値の個数をカウントする。int型を用いてカウントする。
// Result: 10
await Observable.Range(0, 10, ct)
.CountAsync(cancellationToken: ct)
);
Debug.Log(
// LongCountAsync:発行された値の個数をカウントする。long型を用いてカウントする。
// Result: 10
await Observable.Range(0, 10, ct)
.LongCountAsync(cancellationToken: ct)
);
Debug.Log(
// SequenceEqualAsync:他のObservableと比較して発行された値が順序含め全て同じかどうかを判定する
// Result: True
await Observable.Range(0, 10, ct)
.SequenceEqualAsync(Observable.Range(0, 10), ct)
);
// それぞれのデータ構造に変換
int[] array = await Observable.Range(0, 10, ct).ToArrayAsync(ct);
Dictionary<string, int> dictionary = await Observable.Range(0, 10, ct).ToDictionaryAsync(x => x.ToString(), ct);
List<int> list = await Observable.Range(0, 10, ct).ToListAsync(ct);
ILookup<int, int> lookup = await Observable.Range(0, 10, ct).ToLookupAsync(x => x % 2, ct);
HashSet<int> hashSet = await Observable.Range(0, 10, ct).ToHashSetAsync(ct);
// このObservableが完了するの待つ。結果としては何も返さない。
await Observable.Range(0, 10).WaitAsync(ct);
// このObservableが完了するの待ちつつ、発行された値を処理できる。結果としては何も返さない。
await Observable.Range(0, 10, ct).ForEachAsync(x => Debug.Log(x), ct);
}
おわりに
ForEachAsync
の応用が便利なので、これは別記事でまとめます。