9
7

【Unity開発者向け】R3でObservableをTaskに変換して扱う方法一覧

Posted at

今回の内容

R3」がasync/awaitと組み合わせていろいろできてかなり面白いです。
今回はその中でも「ObservableTaskに変換してawaitする」にフォーカスして機能を紹介します。

ObservableTaskに変換するという機能自体は実は本家Rx(dotnet/reactive)にありました。それ相当の機能がR3によってUnityでも使えるようになったというお話)

執筆時の環境

  • Unity - 2023.1.14f1
  • R3 - 1.0.4

ObservableをTaskに変換してasync/awaitで待つ

Observable(Rx/R3におけるメッセージストリームのこと)をTaskに変換し、async/awaitで待ち受けることができます。なおObservableから変換したTaskが完了する条件は原則として「OnCompletedメッセージが発行されたとき」です。
いろいろ待ち受けができるので、用途に合わせて選んでください。

private async ValueTask AwaitSamplesAsync(CancellationToken ct)
{
    // ベースとなるObservable(0から9までの値を連続して出力する)
    // Result: 0 1 2 3 4 5 6 7 8 9 
    Observable.Range(0, 10, ct).Subscribe(x => Debug.Log(x));


    Debug.Log(
        // AllAsync:発行された全ての値が条件を満たすかどうかを判定する
        // Result: True
        await Observable.Range(0, 10, ct)
            .AllAsync(x => x >= 0, ct)
    );

    Debug.Log(
        // AnyAsync:発行された値のうち、一つでも条件を満たすかどうかを判定する
        //      途中で条件を満たしたとしてもOnCompletedが発行されるまでは結果を返さない
        // Result: False
        await Observable.Range(0, 10, ct)
            .AnyAsync(x => x < 0, ct)
    );

    Debug.Log(
        // FirstAsync:条件を満たした最初の値を1個だけ取り出す
        //        Anyと違い条件を満たした瞬間に完了する
        //        条件を満たすものがない場合はOnCompleted(InvalidOperationException)が発行される
        // Result: 3
        await Observable.Range(0, 10, ct)
            .FirstAsync(x => x >= 3, cancellationToken: ct)
    );


    Debug.Log(
        // FirstOrDefaultAsync:条件を満たした最初の値を1個だけ取り出す
        //           Anyと違い条件を満たした瞬間に完了する
        //           条件を満たすものがない場合はOnCompleted(default)が発行される
        // Result: 3
        await Observable.Range(0, 10, ct)
            .FirstOrDefaultAsync(x => x >= 3, cancellationToken: ct)
    );


    Debug.Log(
        // SingleAsync:条件を満たした値を1個だけ取り出す
        //        条件を満たすものがない場合はOnCompleted(InvalidOperationException)が発行される
        //             FirstAsyncと違いは次のとおり
        //          1. OnCompletedが発行されるまで結果を返さない
        //           2. 条件を満たす値が2個以上あった場合はOnCompleted(InvalidOperationException)が発行される
        // Result: 9
        await Observable.Range(0, 10, ct)
            .SingleAsync(x => x >= 9, cancellationToken: ct)
    );

    Debug.Log(
        // SingleOrDefaultAsync:条件を満たした値を1個だけ取り出す
        //            条件を満たすものがない場合はOnCompleted(default)が発行される
        //            FirstOrDefaultAsyncと違いは次のとおり
        //             1. OnCompletedが発行されるまで結果を返さない
        //             2. 条件を満たす値が2個以上あった場合はOnCompleted(InvalidOperationException)が発行される
        // Result: 0
        await Observable.Range(0, 10, ct)
            .SingleOrDefaultAsync(x => x > 100, cancellationToken: ct)
    );


    Debug.Log(
        // LastAsync:条件を満たした最後の値を1個だけ取り出す
        //        条件を満たすものがない場合はOnCompleted(InvalidOperationException)が発行される
        // Result: 9
        await Observable.Range(0, 10, ct)
            .LastAsync(x => x >= 3, cancellationToken: ct)
    );

    Debug.Log(
        // LastOrDefaultAsync:条件を満たした最後の値を1個だけ取り出す
        //             条件を満たすものがない場合はOnCompleted(default)が発行される
        // Result: 9
        await Observable.Range(0, 10, ct)
            .LastOrDefaultAsync(x => x >= 3, cancellationToken: ct)
    );


    Debug.Log(
        // AggregateAsync:値を畳み込み(fold)計算する
        // Result: 45
        await Observable.Range(0, 10, ct)
            .AggregateAsync((total, cur) => total + cur, cancellationToken: ct)
    );

    {
        // AggregateByAsync:値をkeySelectorでグループ化し、そのグループごとに畳み込み(fold)計算する
        // Result: 4.5
        IEnumerable<KeyValuePair<int, int>> result =
            await Observable.Range(0, 10, ct)
                .AggregateByAsync(
                    // 偶数グループと奇数グループに分けてそれぞれ合計値を計算
                    keySelector: key => key % 2,
                    seed: 0,
                    func: (total, cur) => total + cur,
                    cancellationToken: ct);

        // Result: Key:0 - Value:20
        //         Key:1 - Value:25
        foreach (var kv in result)
        {
            Debug.Log($"Key:{kv.Key} - Value:{kv.Value}");
        }
    }

    Debug.Log(
        // AverageAsync:発行された値の平均値を計算する
        // Result: 4.5
        await Observable.Range(0, 10, ct)
            .AverageAsync(cancellationToken: ct)
    );

    Debug.Log(
        // ContainsAsync:指定した値が発行されたかどうかを判定しbool値として結果を返す
        // Result: True
        await Observable.Range(0, 10, ct)
            .ContainsAsync(3, cancellationToken: ct)
    );


    Debug.Log(
        // MaxAsync:発行された値の"最大値"を取得する
        //           IComparer<T>を指定すると比較方法を指定できる
        // Result: 9
        await Observable.Range(0, 10, ct)
            .MaxAsync(cancellationToken: ct)
    );

    Debug.Log(
        // MinAsync:発行された値の"最小値"を取得する
        //           IComparer<T>を指定すると比較方法を指定できる
        // Result: 0
        await Observable.Range(0, 10, ct)
            .MinAsync(cancellationToken: ct)
    );

    // Observable.Range(0, 10, ct).MinMaxAsync()

    Debug.Log(
        // SumAsync:発行された値の"最小値"と"最大値"を同時に取得する
        //           IComparer<T>を指定すると比較方法を指定できる
        // Result: (0, 9)
        await Observable.Range(0, 10, ct)
            .MinMaxAsync(cancellationToken: ct)
    );

    Debug.Log(
        // SumAsync:発行された値の合計値を計算する
        // Result: 45
        await Observable.Range(0, 10, ct)
            .SumAsync(cancellationToken: ct)
    );

    Debug.Log(
        // MaxByAsync:発行された値の"最大値を含んだメッセージ本体"を取得する
        //             IComparer<T>を指定するこで比較方法を指定できる
        await Observable.Repeat(Unit.Default, 10)
            .Select(_ => UnityEngine.Random.insideUnitSphere) // 10個のランダムなVector3を生成
            .MaxByAsync(v => v.y, cancellationToken: ct) // y座標が最大の「Vector3」を取得
    );

    Debug.Log(
        // MinByAsync:発行された値の"最小値を含んだメッセージ本体"を取得する
        //             IComparer<T>を指定するこで比較方法を指定できる
        await Observable.Repeat(Unit.Default, 10)
            .Select(_ => UnityEngine.Random.insideUnitSphere) // 10個のランダムなVector3を生成
            .MinByAsync(v => v.y, cancellationToken: ct) // y座標が最小の「Vector3」を取得
    );

    Debug.Log(
        // ElementAtAsync:指定したインデックスの値を取得する
        //                インデックスが範囲外の場合はOnCompleted(ArgumentOutOfRangeException)が発行される
        // Result: 30
        await Observable.Range(0, 10, ct)
            .Select(x => x * 10) // 値を10倍にする
            .ElementAtAsync(3, cancellationToken: ct)
    );

    Debug.Log(
        // ElementAtOrDefaultAsync:指定したインデックスの値を取得する
        //                         インデックスが範囲外の場合はOnCompleted(default)が発行される
        // Result: 30
        await Observable.Range(0, 10, ct)
            .Select(x => x * 10) // 値を10倍にする
            .ElementAtOrDefaultAsync(3, cancellationToken: ct)
    );

    Debug.Log(
        // IsEmptyAsync:OnNextが一度も発行されなかったかどうかを判定する
        // Result: False
        await Observable.Range(0, 10, ct)
            .IsEmptyAsync(ct)
    );

    Debug.Log(
        // CountAsync:発行された値の個数をカウントする。int型を用いてカウントする。
        // Result: 10
        await Observable.Range(0, 10, ct)
            .CountAsync(cancellationToken: ct)
    );

    Debug.Log(
        // LongCountAsync:発行された値の個数をカウントする。long型を用いてカウントする。
        // Result: 10
        await Observable.Range(0, 10, ct)
            .LongCountAsync(cancellationToken: ct)
    );

    Debug.Log(
        // SequenceEqualAsync:他のObservableと比較して発行された値が順序含め全て同じかどうかを判定する
        // Result: True
        await Observable.Range(0, 10, ct)
            .SequenceEqualAsync(Observable.Range(0, 10), ct)
    );


    // それぞれのデータ構造に変換
    int[] array = await Observable.Range(0, 10, ct).ToArrayAsync(ct);
    Dictionary<string, int> dictionary = await Observable.Range(0, 10, ct).ToDictionaryAsync(x => x.ToString(), ct);
    List<int> list = await Observable.Range(0, 10, ct).ToListAsync(ct);
    ILookup<int, int> lookup = await Observable.Range(0, 10, ct).ToLookupAsync(x => x % 2, ct);
    HashSet<int> hashSet = await Observable.Range(0, 10, ct).ToHashSetAsync(ct);

    // このObservableが完了するの待つ。結果としては何も返さない。
    await Observable.Range(0, 10).WaitAsync(ct);

    // このObservableが完了するの待ちつつ、発行された値を処理できる。結果としては何も返さない。
    await Observable.Range(0, 10, ct).ForEachAsync(x => Debug.Log(x), ct);
}

おわりに

ForEachAsyncの応用が便利なので、これは別記事でまとめます。

9
7
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
9
7