UniRxについての記事のまとめはこちら
RxのRepeatは頻繁に使う割に挙動が読めず、初心者がハマってしまうオペレータだと思います。
そこでRepeatについて調べてまとめてみました。
#Rxの「Repeat」とは?
一言で言えばRepeatは**「OnCompletedが来た時に再Subscribeしてくれるオペレータ」**です。
(更に正確に言えば、OnCompletedが来た時に同じストリームを生成してConcatで後ろに繋げてくれるオペレータです)
稀に「流れてきたメッセージを再現して繰り返してくれるオペレータ」と勘違いされている方が居ますがそうではありません。
例えば以下のコードを御覧ください。
var random = new Random();
//乱数を1つ返すストリーム
Observable.Create<int>(observer =>
{
observer.OnNext(random.Next());
observer.OnCompleted();
return () => { };
})
.Repeat(3)
.Subscribe(x => Debug.WriteLine(x), () => Debug.WriteLine("OnCompleted"));
1968212648
1328553849
1930075018
OnCompleted
3回とも違った値がOnNextに流れてきました。
これはRepeatのタイミングでObservable.Createが再度実行されているからです。
このようにRepeatはOnCompletedを検知したタイミングで再度Subscribeし、concatでストリームの後ろに新しいストリームを繋ぎ直してくれるだけのオペレータです。
Repeatは値をキャッシュして繰り返し同じ値を流すような機能を持っていません。
#Repeatの使い道
##使い道1)ストリームを繰り返しSubscribeする
OnCompletedが発行された時にもう一回同じストリームをSubscribeする単純な使い方です。
mouseMove
.SkipUntil(mouseDown)
.TakeUntil(mouseUp)
.Repeat()
.Subscribe(x => Debug.WriteLine(x.Location));
##使い道2)オペレータの状態を初期化する
先程も説明しましたが、RepeatはOnCompletedが来た時にSubscribeをやり直してくれるオペレータです。この再Subscribeしてくれるというのがポイントです。
Rxの殆どのオペレータはSubscribe時に生成されるという性質となっています。そのため「再Subscribeする=オペレータを全て初期化する」と考えることができます。
Observable.Timer(TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(1))
.Take(5)
.Repeat()
.Subscribe(time => Debug.WriteLine(time));
Observable.Timer(TimeSpan.FromMilliseconds(0), TimeSpan.FromSeconds(1))
.TakeUntil(mouseClickStream)
.Repeat()
.Subscribe(x => Debug.WriteLine(x));
hogeStream
.Buffer(10)
.TakeUntil(resetStream)
.Repeat()
.Subscribe(data => Debug.WriteLine(data.Count));
TakeUntil+RepeatもしくはFirst+Repeatをストリームの途中に挟んであげることで任意のタイミングでストリームの初期化を行うことができます。
#Repeatを使う上で気をつけるべき点
無限Repeatはかなり危険であることを認識しなくてはいけません。
無限Repeatは注意しないと簡単にフリーズやstack overflowを引き起こします。
##危険な例1)ファクトリメソッドの無限Repeat
ファクトリメソッドはSubscribe直後にまとめてOnNext/OnCompletedを発行する物がいくつかあります。このようにSubscribe直後にOnCompletedを返すObservableの場合、Repeatが無限に繰り返されフリーズするかstack overflowを引き起こし死んでしまいます。
Observable.Return(0)
.Repeat()
.Subscribe(x => Debug.WriteLine(x));
ただし一概にファクトリメソッドが危険というわけではありません。
例えばObservable.Timerの無限Repeatは問題なく利用することができます。
(ただしDispose忘れには注意が必要)
##危険な例2)ストリームの根源でOnCompletedが発行される場合
Subscribeしているストリームの根源でOnCompletedが発行された場合、もしくはHot変換したストリームでOnCompletedが発行された場合に無限Repeatしていると死にます。
var subject = new Subject<int>();
subject.Materialize().Repeat().Subscribe(x => Debug.WriteLine(x));
subject.OnNext(1);
subject.OnNext(2);
subject.OnNext(3);
subject.OnCompleted();
OnNext(1)
OnNext(2)
OnNext(3)
OnCompleted()
OnCompleted()
OnCompleted()
OnCompleted()
OnCompleted()
.
.
(以下無限にOnCompleted())
ストリームの根源がOnCompletedされてしまうと何度Subscribeしても即OnCompletedが返されてしまいます。そのためRepeatを挟んでいると無限にRepeatを試行して死にます。
そのためRepeatを使う場合はストリームの根源でOnCompletedが発行されるより前にDisposeを呼び出す等のストリームの寿命管理をしっかり行うことが大切です。
#まとめ
RepeatはOnCompletedが来た時にSubscribeをやり直してくれるオペレータであり、値を保持して繰り返してくれるオペレータではありません。
またRepeatを使う場合はOnCompletedの発行タイミングに十分注意する必要があり、気を抜くと直ぐにフリーズやstack overflowを引き起こしてしまいます。
#補足)Observable.Return
以下のコードと実行結果を御覧ください。
var random = new Random();
//乱数を返す
Observable.Return(random.Next())
.Repeat(3)
.Subscribe(x => Debug.WriteLine(x), () => Debug.WriteLine("OnCompleted"));
873345220
873345220
873345220
OnCompleted
Observable.ReturnをRepeatするとObservable.Createの時とは違い、常に同じ値が繰り返し表示されてしまいました。
これはObservable.Returnが遅延評価ではないことが原因であり、Repeatに起因するものではありません。
もしObservable.Returnを遅延評価にした上でRepeatしたい場合(需要ある…?)はObservable.DeferでObservable.Returnを包むと良いでしょう。
var random = new Random();
Observable.Defer(() => Observable.Return(random.Next()))
.Repeat(3)
.Subscribe(x => Debug.WriteLine(x), () => Debug.WriteLine("OnCompleted"));
1413311669
1173910711
1011315106
OnCompleted
Deferで包むことで、Repeatの再Subscribe時まで評価を遅延させることができます。