やりたいこと
大量に発行されたメッセージを1フレームに1個ずつ処理するように「絞り」を入れたい
実装
UniRx/Scripts/Operators
に徐ろに以下のcsファイルを追加
ThrottlePerFrameObservable.cs
using System;
using System.Collections;
using System.Collections.Generic;
using UnityEngine;
namespace UniRx.Operators
{
internal class ThrottlePerFrameObservable<T> : OperatorObservableBase<T>
{
readonly IObservable<T> source;
private int frameCount;
public ThrottlePerFrameObservable(IObservable<T> source, int frameCount)
: base(source.IsRequiredSubscribeOnCurrentThread())
{
this.frameCount = frameCount;
this.source = source;
}
protected override IDisposable SubscribeCore(IObserver<T> observer, IDisposable cancel)
{
return new ThrottlePerFrame(this, observer, cancel, frameCount).Run();
}
class ThrottlePerFrame : OperatorObserverBase<T, T>
{
readonly ThrottlePerFrameObservable<T> parent;
readonly object gate = new object();
private bool isEnded = false;
private bool hasError = false;
private int frameCount;
private Exception error;
SerialDisposable cancelable;
private Queue<T> _messageQueue;
private ICancelable _cancelable;
public ThrottlePerFrame(ThrottlePerFrameObservable<T> parent, IObserver<T> observer, IDisposable cancel, int frameCount)
: base(observer, cancel)
{
this.frameCount = frameCount;
this.parent = parent;
}
private IEnumerator TakeCoroutine()
{
int count = 0;
while (!_cancelable.IsDisposed)
{
if (frameCount <= 0 || count % frameCount == 0)
{
if (_messageQueue.Count > 0)
{
lock (gate)
{
var d = _messageQueue.Dequeue();
observer.OnNext(d);
}
count = 0;
}
else
{
if (isEnded)
{
try
{
if (hasError)
{
observer.OnError(error);
}
else
{
observer.OnCompleted();
}
}
finally
{
_cancelable.Dispose();
Dispose();
}
yield break;
}
}
}
count++;
yield return null;
}
}
public IDisposable Run()
{
cancelable = new SerialDisposable();
var subscription = parent.source.Subscribe(this);
_messageQueue = new Queue<T>(4);
_cancelable = StableCompositeDisposable.Create(cancelable, subscription);
MainThreadDispatcher.SendStartCoroutine(TakeCoroutine());
return _cancelable;
}
public override void OnNext(T value)
{
lock (gate)
{
if (isEnded) return;
_messageQueue.Enqueue(value);
}
}
public override void OnError(Exception error)
{
lock (gate)
{
if (isEnded) return;
isEnded = true;
hasError = true;
this.error = error;
}
}
public override void OnCompleted()
{
lock (gate)
{
isEnded = true;
}
}
}
}
}
あとUniRx/Scripts/Observable.Time.cs
に以下の数行を追加
Observable.Time.cs
public static IObservable<TSource> ThrottlePerFrame<TSource>(this IObservable<TSource> source, int frameCount = 1)
{
return new ThrottlePerFrameObservable<TSource>(source, frameCount);
}
使い方
ThrottlePerFrame()
を挟むと1フレーム1個値が発行されるようになる。
public class Test : MonoBehaviour
{
void Start()
{
Enumerable.Range(0, 10).ToObservable()
.ThrottlePerFrame()
.Subscribe(x =>
{
Debug.Log(string.Format("{0}F = {1}", Time.frameCount, x));
});
}
}
1F = 0
2F = 1
3F = 2
4F = 3
5F = 4
6F = 5
7F = 6
8F = 7
9F = 8
10F = 9
ThrottlePerFrame()
に数値を入れると指定フレーム単位での発行になる。
public class Test : MonoBehaviour
{
void Start()
{
Enumerable.Range(0, 10).ToObservable()
.ThrottlePerFrame(5) //5フレーム単位
.Subscribe(x =>
{
Debug.Log(string.Format("{0}F = {1}", Time.frameCount, x));
});
}
}
1F = 0
6F = 1
11F = 2
16F = 3
21F = 4
26F = 5
31F = 6
36F = 7
41F = 8
46F = 9
途中でエラーが発生しても、既にキューに溜まっているOnNextを全て吐き出してからOnErrorを発行する(エラーを最後にまわす)。OnCompletedも同様。
namespace Assets
{
public class Test : MonoBehaviour
{
void Start()
{
Enumerable.Range(0, 10).ToObservable()
.Select(x =>
{
if (x == 5) throw new Exception("This is 5!");
return x;
})
.ThrottlePerFrame()
.Subscribe(x =>
{
Debug.Log(string.Format("{0}F = {1}", Time.frameCount, x));
}, Debug.LogError);
}
}
}
1F = 0
2F = 1
3F = 2
4F = 3
5F = 4
System.Exception: This is 5!
感想
オペレータじゃなくてスケジューラで実装すべきな気もするが、複数ストリーム作った時にスケジューラだと他のストリームと値が混ざるのでオペレータとして実装した。
正直ゴリ押し。
解説
オペレータ内にキューとコルーチンを持っていて、コルーチンがキューから1個ずつ値を取り出してOnNextしてるだけ。OnErrorとOnCompletedの実装もゴリ押し。
private IEnumerator TakeCoroutine()
{
int count = 0;
while (!_cancelable.IsDisposed)
{
if (frameCount <= 0 || count % frameCount == 0)
{
if (_messageQueue.Count > 0)
{
lock (gate)
{
var d = _messageQueue.Dequeue();
observer.OnNext(d);
}
count = 0;
}
else
{
if (isEnded)
{
try
{
if (hasError)
{
observer.OnError(error);
}
else
{
observer.OnCompleted();
}
}
finally
{
_cancelable.Dispose();
Dispose();
}
yield break;
}
}
}
count++;
yield return null;
}
}
public override void OnNext(T value)
{
lock (gate)
{
if (isEnded) return;
_messageQueue.Enqueue(value);
}
}
public override void OnError(Exception error)
{
lock (gate)
{
if (isEnded) return;
isEnded = true;
hasError = true;
this.error = error;
}
}
public override void OnCompleted()
{
lock (gate)
{
isEnded = true;
}
}