【ReactiveExtensions】「Repeat」とは何なのか?

  • 33
    Like
  • 2
    Comment
More than 1 year has passed since last update.

UniRxについての記事のまとめはこちら


RxのRepeatは頻繁に使う割に挙動が読めず、初心者がハマってしまうオペレータだと思います。
そこでRepeatについて調べてまとめてみました。

Rxの「Repeat」とは?

一言で言えばRepeatは「OnCompletedが来た時に再Subscribeしてくれるオペレータ」です。
(更に正確に言えば、OnCompletedが来た時に同じストリームを生成してConcatで後ろに繋げてくれるオペレータです)
稀に「流れてきたメッセージを再現して繰り返してくれるオペレータ」と勘違いされている方が居ますがそうではありません。

例えば以下のコードを御覧ください。

乱数を返すCreateのRepeat
var random = new Random();

//乱数を1つ返すストリーム
Observable.Create<int>(observer =>
{
    observer.OnNext(random.Next());
    observer.OnCompleted();
    return () => { };
})
    .Repeat(3)
    .Subscribe(x => Debug.WriteLine(x), () => Debug.WriteLine("OnCompleted"));
実行結果
1968212648
1328553849
1930075018
OnCompleted

3回とも違った値がOnNextに流れてきました。
これはRepeatのタイミングでObservable.Createが再度実行されているからです。
このようにRepeatはOnCompletedを検知したタイミングで再度Subscribeし、concatでストリームの後ろに新しいストリームを繋ぎ直してくれるだけのオペレータです。
Repeatは値をキャッシュして繰り返し同じ値を流すような機能を持っていません。

Repeatの使い道

使い道1)ストリームを繰り返しSubscribeする

OnCompletedが発行された時にもう一回同じストリームをSubscribeする単純な使い方です。

マウスドラッグの監視を繰り返す
mouseMove
    .SkipUntil(mouseDown)
    .TakeUntil(mouseUp)
    .Repeat()
    .Subscribe(x => Debug.WriteLine(x.Location));

使い道2)オペレータの状態を初期化する

先程も説明しましたが、RepeatはOnCompletedが来た時にSubscribeをやり直してくれるオペレータです。この再Subscribeしてくれるというのがポイントです。

Rxの殆どのオペレータはSubscribe時に生成されるという性質となっています。そのため「再Subscribeする=オペレータを全て初期化する」と考えることができます。

5秒たったらタイマを0に戻す
Observable.Timer(TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(1))
    .Take(5)
    .Repeat()
    .Subscribe(time => Debug.WriteLine(time));
クリックされたらタイマを0に戻す
Observable.Timer(TimeSpan.FromMilliseconds(0), TimeSpan.FromSeconds(1))
    .TakeUntil(mouseClickStream)
    .Repeat()
    .Subscribe(x => Debug.WriteLine(x));
リセットイベントが来たらBufferをクリアする
hogeStream
    .Buffer(10)
    .TakeUntil(resetStream)
    .Repeat()
    .Subscribe(data => Debug.WriteLine(data.Count));

TakeUntil+RepeatもしくはFirst+Repeatをストリームの途中に挟んであげることで任意のタイミングでストリームの初期化を行うことができます。

Repeatを使う上で気をつけるべき点

無限Repeatはかなり危険であることを認識しなくてはいけません。
無限Repeatは注意しないと簡単にフリーズやstack overflowを引き起こします。

危険な例1)ファクトリメソッドの無限Repeat

ファクトリメソッドはSubscribe直後にまとめてOnNext/OnCompletedを発行する物がいくつかあります。このようにSubscribe直後にOnCompletedを返すObservableの場合、Repeatが無限に繰り返されフリーズするかstack overflowを引き起こし死んでしまいます。

Returnの無限Repeatはやばい
Observable.Return(0)
    .Repeat()
    .Subscribe(x => Debug.WriteLine(x));

ただし一概にファクトリメソッドが危険というわけではありません。
例えばObservable.Timerの無限Repeatは問題なく利用することができます。
(ただしDispose忘れには注意が必要)

危険な例2)ストリームの根源でOnCompletedが発行される場合

Subscribeしているストリームの根源でOnCompletedが発行された場合、もしくはHot変換したストリームでOnCompletedが発行された場合に無限Repeatしていると死にます。

Subjectの無限Repeatの例
var subject = new Subject<int>();
subject.Materialize().Repeat().Subscribe(x => Debug.WriteLine(x));

subject.OnNext(1);
subject.OnNext(2);
subject.OnNext(3);
subject.OnCompleted();
実行結果
OnNext(1)
OnNext(2)
OnNext(3)
OnCompleted()
OnCompleted()
OnCompleted()
OnCompleted()
OnCompleted()
.
.
(以下無限にOnCompleted())

ストリームの根源がOnCompletedされてしまうと何度Subscribeしても即OnCompletedが返されてしまいます。そのためRepeatを挟んでいると無限にRepeatを試行して死にます。

そのためRepeatを使う場合はストリームの根源でOnCompletedが発行されるより前にDisposeを呼び出す等のストリームの寿命管理をしっかり行うことが大切です。

まとめ

RepeatはOnCompletedが来た時にSubscribeをやり直してくれるオペレータであり、値を保持して繰り返してくれるオペレータではありません。
またRepeatを使う場合はOnCompletedの発行タイミングに十分注意する必要があり、気を抜くと直ぐにフリーズやstack overflowを引き起こしてしまいます。

補足)Observable.Return

以下のコードと実行結果を御覧ください。

Observable.Return
var random = new Random();

//乱数を返す
Observable.Return(random.Next())
    .Repeat(3)
    .Subscribe(x => Debug.WriteLine(x), () => Debug.WriteLine("OnCompleted"));
実行結果
873345220
873345220
873345220
OnCompleted

Observable.ReturnをRepeatするとObservable.Createの時とは違い、常に同じ値が繰り返し表示されてしまいました。
これはObservable.Returnが遅延評価ではないことが原因であり、Repeatに起因するものではありません。

もしObservable.Returnを遅延評価にした上でRepeatしたい場合(需要ある…?)はObservable.DeferでObservable.Returnを包むと良いでしょう。

DeferとReturn
var random = new Random();
Observable.Defer(() => Observable.Return(random.Next()))
    .Repeat(3)
    .Subscribe(x => Debug.WriteLine(x), () => Debug.WriteLine("OnCompleted"));
実行結果
1413311669
1173910711
1011315106
OnCompleted

Deferで包むことで、Repeatの再Subscribe時まで評価を遅延させることができます。