LoginSignup
3
1

More than 3 years have passed since last update.

UniRxで購読時既に同じ非同期処理が行われている際に楽にそっちを受け取る

Last updated at Posted at 2019-10-13

TL;DR

Publishを使いHot変換するパタン

概要

初めてUniRxを使う際に、色んな実装にあたって実装方法に対して様々な疑問が生まれていたものの、
最近までもよく見かける実装に関しての話となります。

「非同期として値を受け取る際に、要求したもののストリームが既に走っているんだったらそっちを受け取りたい」

これに対しては色んな実装方法がありますが、一番楽な実装を解説してみようと思います。

本題

これの実装パタンとしては以下の三つほどを見かけております。

  1. メンバー変数を使うパタン
  2. Subjectを使い中継させるパタン
  3. Publishを使いHot変換するパタン

例として、二つの場合を想定しております。

  1. 一回だけ非同期でCreateを行い、要求されたらそれを返すSingleton Instance
  2. 毎回要求されるたびにCreateを行うが、要求された際に先に走るロードがあったら別途ロードせずそっちを返す
public class InstanceManager
{
    (以降の例文では省略)
    private IObservable<Instance> CreateInternalAsObservable()
    {
        ...
    }

    public IObservable<Instance> CreateAsObservable()
    {
        ...
    }
}

メンバー変数を使う実装

1の場合に対してはこんな形になると思います。

public class InstanceManager
{
    private Instance _instance; //作られたinstanceを保持する
    private bool _isCreating; //既に作ろうとしているか

    public IObservable<Instance> CreateAsObservable()
    {
        return Observable.Defer(() =>
        {
            if (_instance != null) //instanceがあったらそれを返す
            {
                return Observable.Return(_instance);
            }
            else if (_isCreating) //作ろうとしているならそれが作られるまで待つ
            {
                return Observable.EveryUpdate()
                    .Select(_ => _instance)
                    .First(x => x != null);
            }
            else //なければ作る
            {
                _isCreating = true;
                return CreateInternalAsObservable()
                    .Do(instance =>
                    {
                        _instance = instance;
                        _isCreating = false;
                    });
            }
        }
    }
}

2の場合に対しても似たような形になるでしょう。(一番最初のif文が消された感じです)

public class InstanceManager
{
    private Instance _instance; //作られたinstanceを保持する
    private bool _isCreating; //既に作ろうとしているか

    public IObservable<Instance> CreateAsObservable()
    {
        return Observable.Defer(() =>
        {
            if (_isCreating) //作ろうとしているならそれが作られるまで待つ
            {
                return Observable.EveryUpdate()
                    .Select(_ => _instance)
                    .First(x => x != null);
            }
            else //作っていなければ作る
            {
                _isCreating = true;
                return CreateInternalAsObservable()
                    .Do(instance =>
                    {
                        _instance = instance;
                        _isCreating = false;
                    });
            }
        }
    }
}

んーいまいちな感じがしますね。まずEveryUpdateを使うため、毎フレーム処理を走らせないといけないものと、余計なMutable状態のboolメンバー変数が増えてしまいました。

Subjectを使い中継させるパタン

1の場合に対してはこんな形になると思います。

public class InstanceManager : IDisposable
{
    //作られたinstanceを保持するSubject
    private readonly ISubject<Instance> _createdInstance =
        new AsyncSubject<Instance>();

    //作る際に要求するSubject
    private readonly ISubject<Unit> _requestCreate = new AsyncSubject<Unit>();

    //SubscribeをDisposeする
    private readonly IDisposable _disposable;

    public InstanceManager()
    {
        _disposable = _requestCreate
            .ContinueWith(CreateInternalAsObservable)
            .Subscribe(_createdInstance);
    }

    public IObservable<Instance> CreateAsObservable()
    {
        return Observable.Defer(() =>
        {
            _requestCreate.OnNext(Unit.Default);
            _requestCreate.OnCompleted();

            return _createdInstance;
        }
    }

    void IDisposable.Dispose()
    {
        _disposable.Dispose();
    }
}

2の場合に対してはRefCountもメンバーも使わず実装できるパタンがすぐ思いつかず...

Publishを使いHot変換するパタン

1の場合に対してはこんな形になると思います。

public class InstanceManager : IDisposable
{
    private readonly CompositeDisposable _disposable = CompositeDisposable();
    private readonly IConnectableObservable<Instance> _createInstanceObservable;

    public InstanceManager()
    {
        //変数として保存しておく
        _createdInstanceObservable = CreateInternalAsObservable()
            .PublishLast(); 
    }

    public IObservable<Instance> CreateAsObservable()
    {
        return Observable.Defer(() =>
        {
            //このクラスのDisposableにAddToすることに注意
            _createInstanceObservable.Connect()
                .AddTo(_disposable);

            return _createInstanceObservable;
        }
    }

    void IDisposable.Dispose()
    {
        _disposable.Dispose();
    }
}

PublishLastはAsyncSubjectに中継させ、最後の値を常に返す形になっております。
Publish/ConnectやAsyncSubjectに関しては良い記事が多いのでそっちを見ていただければと思います。

PublishとConnect(英文)

2の場合に対してはこんな形になります。

public class InstanceManager
{
    private readonly IObservable<Instance> _sharedCreateInstanceObservable;

    public InstanceManager()
    {
        //変数として保存しておく
        _sharedCreateInstanceObservable = CreateSharedObservable();
    }

    private IObservable<Instance> CreateSharedObservable()
    {
        return Observable.Create<Instance>(observer =>
        {
            //OnNextのみなのに注意
            return CreateInternalAsObservable()
                .Subscribe(observer.OnNext);
        })
        .Share();
    }

    public IObservable<Instance> CreateAsObservable()
    {
        return _sharedCreateInstanceObservable.First();
    }
}

observer.OnNextがOnNextのみになっていることに注意が必要です。
ShareはPublish().RefCount()になっているものの、Publish()がSubjectに中継させるためCreateInternalAsObservable()が即時OnCompleteを発行してしまった場合は、二度動かなくなるためです。
外に公開するCreateAsObservableには、Firstを付けて内部処理が終わり次第RefCountを減らす役割を持たせております。

終わりに

Hot変換を具体的にどこで使えば良いかに関しては良い記事がいっぱいあるわけですが、Hotの性質上こういう場合にももちろん対応できるところが少し伝われば幸いです。

3
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
3
1