【UniRx】連続したOnNextの最初しか流さないオペレータThrottleFirst

  • 24
    Like
  • 0
    Comment
More than 1 year has passed since last update.

UniRxについての記事のまとめはこちら


UniRx最新版にThrottleFirstが組み込まれました!

ゲーム開発において「ある処理を行ってからしばらくの間はイベントを無視したい」「イベントがたくさんきた時に最初だけ処理して残りはしばらくの間無視したい」といった需要は多いかと思います。
(例えば「ボタンの長押しイベントを300ミリ秒間隔に間引きたい」「最後にアニメーションイベントが来てから3秒間はイベントを無視したい」等)

既存のRxのオペレータの組み合わせでもできなくはないですが、頻出する使用法なだけに専用のオペレータが欲しくなります。
なのでThrottleを元に作成してみました。

RxJSなどには「ThrottleFirst」という名前で同じ挙動のオペレータが存在していました。Rx.NETには存在しないオペレータです。

ThrottleFirst

画像はReactiveXより引用

UniRxでThrottleFirst

ThrottleFirst.cs
using System;
namespace UniRx
{
    public static partial class Observable
    {
        public static IObservable<TSource> ThrottleFirst<TSource>(this IObservable<TSource> source, TimeSpan dueTime)
        {
            return source.ThrottleFirst(dueTime, Scheduler.DefaultSchedulers.TimeBasedOperations);
        }

        public static IObservable<TSource> ThrottleFirst<TSource>(this IObservable<TSource> source, TimeSpan dueTime, IScheduler scheduler)
        {
            return new AnonymousObservable<TSource>(observer =>
            {
                var gate = new object();
                var open = true;
                var cancelable = new SerialDisposable();

                var subscription = source.Subscribe(x =>
                {
                    lock (gate)
                    {
                        if (!open) return;
                        observer.OnNext(x);
                        open = false;
                    }

                    var d = new SingleAssignmentDisposable();
                    cancelable.Disposable = d;
                    d.Disposable = scheduler.Schedule(dueTime, () =>
                    {
                        lock (gate)
                        {
                            open = true;
                        }
                    });

                },
                    exception =>
                    {
                        cancelable.Dispose();

                        lock (gate)
                        {
                            observer.OnError(exception);
                        }
                    },
                    () =>
                    {
                        cancelable.Dispose();

                        lock (gate)
                        {
                            observer.OnCompleted();

                        }
                    });

                return new CompositeDisposable(subscription, cancelable);
            });
        }

        public static IObservable<TSource> ThrottleFirstFrame<TSource>(this IObservable<TSource> source, int frameCount,
            FrameCountType frameCountType = FrameCountType.Update)
        {
            return new AnonymousObservable<TSource>(observer =>
            {
                var gate = new object();
                var open = true;
                var cancelable = new SerialDisposable();

                var subscription = source.Subscribe(x =>
                {
                    lock (gate)
                    {
                        if (!open) return;
                        observer.OnNext(x);
                        open = false;
                    }

                    var d = new SingleAssignmentDisposable();
                    cancelable.Disposable = d;

                    d.Disposable = Observable.TimerFrame(frameCount, frameCountType)
                        .Subscribe(_ =>
                        {
                            lock (gate)
                            {
                                open = true;
                            }
                        });
                },
                    exception =>
                    {
                        cancelable.Dispose();

                        lock (gate)
                        {
                            observer.OnError(exception);
                        }
                    },
                    () =>
                    {
                        cancelable.Dispose();

                        lock (gate)
                        {
                            observer.OnCompleted();

                        }
                    });

                return new CompositeDisposable(subscription, cancelable);
            });
        }
    }
}

  • ThrottleFirst(OnNextを無視する時間)
  • ThrottleFirstFrame(OnNextを無視するフレーム数)

どちらも最初の1回目は必ずOnNextを通過させます。
2回目以降のOnNextについては、最後にOnNextを通過させてから一定時間経過するまでの間OnNextを流さずに捨て去ります。

例えばThrottleFirst(TimeSpan.FromSeconds(1))といった指定の仕方をした場合は、メッセージが1秒以内に連続してきた場合は最初だけ通してそれ以外を無視するといった挙動になります。

使用例

クリックされてから5秒間はクリックを無視する
this.UpdateAsObservable()
    .Where(_=>Input.GetMouseButtonDown(0))
    .ThrottleFirst(TimeSpan.FromSeconds(5))
    .Subscribe(x => Debug.Log("Clicked!"));
Updateを1/10に間引く(9回Updateが来るまで無視する)
this.UpdateAsObservable()
    .ThrottleFirstFrame(9)
    .Subscribe(x => Debug.Log("tenth part Update"));