LoginSignup
13
7

More than 5 years have passed since last update.

【UniRx】 発行するメッセージを1フレームごとに1つにする

Last updated at Posted at 2017-10-02

やりたいこと

大量に発行されたメッセージを1フレームに1個ずつ処理するように「絞り」を入れたい

実装

UniRx/Scripts/Operatorsに徐ろに以下のcsファイルを追加

ThrottlePerFrameObservable.cs
using System;
using System.Collections;
using System.Collections.Generic;
using UnityEngine;

namespace UniRx.Operators
{
    internal class ThrottlePerFrameObservable<T> : OperatorObservableBase<T>
    {
        readonly IObservable<T> source;
        private int frameCount;
        public ThrottlePerFrameObservable(IObservable<T> source, int frameCount)
            : base(source.IsRequiredSubscribeOnCurrentThread())
        {
            this.frameCount = frameCount;
            this.source = source;
        }

        protected override IDisposable SubscribeCore(IObserver<T> observer, IDisposable cancel)
        {
            return new ThrottlePerFrame(this, observer, cancel, frameCount).Run();
        }

        class ThrottlePerFrame : OperatorObserverBase<T, T>
        {
            readonly ThrottlePerFrameObservable<T> parent;
            readonly object gate = new object();
            private bool isEnded = false;
            private bool hasError = false;
            private int frameCount;
            private Exception error;

            SerialDisposable cancelable;

            private Queue<T> _messageQueue;
            private ICancelable _cancelable;

            public ThrottlePerFrame(ThrottlePerFrameObservable<T> parent, IObserver<T> observer, IDisposable cancel, int frameCount)
                : base(observer, cancel)
            {
                this.frameCount = frameCount;
                this.parent = parent;
            }

            private IEnumerator TakeCoroutine()
            {
                int count = 0;
                while (!_cancelable.IsDisposed)
                {
                    if (frameCount <= 0 || count % frameCount == 0)
                    {
                        if (_messageQueue.Count > 0)
                        {
                            lock (gate)
                            {
                                var d = _messageQueue.Dequeue();
                                observer.OnNext(d);
                            }
                            count = 0;
                        }
                        else
                        {
                            if (isEnded)
                            {
                                try
                                {
                                    if (hasError)
                                    {
                                        observer.OnError(error);
                                    }
                                    else
                                    {
                                        observer.OnCompleted();
                                    }
                                }
                                finally
                                {
                                    _cancelable.Dispose();
                                    Dispose();
                                }
                                yield break;
                            }
                        }
                    }
                    count++;
                    yield return null;
                }
            }

            public IDisposable Run()
            {
                cancelable = new SerialDisposable();
                var subscription = parent.source.Subscribe(this);
                _messageQueue = new Queue<T>(4);

                _cancelable = StableCompositeDisposable.Create(cancelable, subscription);

                MainThreadDispatcher.SendStartCoroutine(TakeCoroutine());
                return _cancelable;
            }

            public override void OnNext(T value)
            {
                lock (gate)
                {
                    if (isEnded) return;
                    _messageQueue.Enqueue(value);
                }
            }

            public override void OnError(Exception error)
            {

                lock (gate)
                {
                    if (isEnded) return;
                    isEnded = true;
                    hasError = true;
                    this.error = error;
                }
            }

            public override void OnCompleted()
            {
                lock (gate)
                {
                    isEnded = true;
                }
            }
        }
    }
}

あとUniRx/Scripts/Observable.Time.cs に以下の数行を追加

Observable.Time.cs
public static IObservable<TSource> ThrottlePerFrame<TSource>(this IObservable<TSource> source, int frameCount = 1)
{
    return new ThrottlePerFrameObservable<TSource>(source, frameCount);
}

使い方

ThrottlePerFrame()を挟むと1フレーム1個値が発行されるようになる。

public class Test : MonoBehaviour
{
    void Start()
    {
        Enumerable.Range(0, 10).ToObservable()
            .ThrottlePerFrame()
            .Subscribe(x =>
                {
                    Debug.Log(string.Format("{0}F = {1}", Time.frameCount, x));
                });
    }
}
1F = 0
2F = 1
3F = 2
4F = 3
5F = 4
6F = 5
7F = 6
8F = 7
9F = 8
10F = 9

ThrottlePerFrame()に数値を入れると指定フレーム単位での発行になる。

public class Test : MonoBehaviour
{
    void Start()
    {
        Enumerable.Range(0, 10).ToObservable()
            .ThrottlePerFrame(5) //5フレーム単位
            .Subscribe(x =>
                {
                    Debug.Log(string.Format("{0}F = {1}", Time.frameCount, x));
                });
    }
}
1F = 0
6F = 1
11F = 2
16F = 3
21F = 4
26F = 5
31F = 6
36F = 7
41F = 8
46F = 9

途中でエラーが発生しても、既にキューに溜まっているOnNextを全て吐き出してからOnErrorを発行する(エラーを最後にまわす)。OnCompletedも同様。

namespace Assets
{
    public class Test : MonoBehaviour
    {
        void Start()
        {
            Enumerable.Range(0, 10).ToObservable()
                 .Select(x =>
                {
                    if (x == 5) throw new Exception("This is 5!");
                    return x;
                })
                .ThrottlePerFrame()
                .Subscribe(x =>
                    {
                        Debug.Log(string.Format("{0}F = {1}", Time.frameCount, x));
                    }, Debug.LogError);
        }
    }
}
1F = 0
2F = 1
3F = 2
4F = 3
5F = 4
System.Exception: This is 5!

感想

オペレータじゃなくてスケジューラで実装すべきな気もするが、複数ストリーム作った時にスケジューラだと他のストリームと値が混ざるのでオペレータとして実装した。
正直ゴリ押し。

解説

オペレータ内にキューとコルーチンを持っていて、コルーチンがキューから1個ずつ値を取り出してOnNextしてるだけ。OnErrorとOnCompletedの実装もゴリ押し。

private IEnumerator TakeCoroutine()
{
    int count = 0;
    while (!_cancelable.IsDisposed)
    {
        if (frameCount <= 0 || count % frameCount == 0)
        {
            if (_messageQueue.Count > 0)
            {
                lock (gate)
                {
                    var d = _messageQueue.Dequeue();
                    observer.OnNext(d);
                }
                count = 0;
            }
            else
            {
                if (isEnded)
                {
                    try
                    {
                        if (hasError)
                        {
                            observer.OnError(error);
                        }
                        else
                        {
                            observer.OnCompleted();
                        }
                    }
                    finally
                    {
                        _cancelable.Dispose();
                        Dispose();
                    }
                    yield break;
                }
            }
        }
        count++;
        yield return null;
    }
}

public override void OnNext(T value)
{
    lock (gate)
    {
        if (isEnded) return;
        _messageQueue.Enqueue(value);
    }
}

public override void OnError(Exception error)
{

    lock (gate)
    {
        if (isEnded) return;
        isEnded = true;
        hasError = true;
        this.error = error;
    }
}

public override void OnCompleted()
{
    lock (gate)
    {
        isEnded = true;
    }
}
13
7
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
13
7