Why things "repeat" in Rx, and how to make subscriptions "consistent"

  • 1
    いいね
  • 0
    コメント
この記事は最終更新日から1年以上が経過しています。

I got the idea to write this post from answering a question on StackOverflow about how to reuse data from an Observable: http://stackoverflow.com/questions/34657154/angular2-observable-scan-once-then-subscribe-by-many-async/34785866

If you've used RxJS a bit, but not much, you've probably mostly dealt with a single observer to an Observable. That's fine, really, but what happens when you try to add more observers to an Observable? Do you find that each observer sees a "reset" version of the state you want?

That's probably because you're using a "Cold" Observable, which is what you get by default. It makes sense when you look at how Observables work:

var numbers = Rx.Observable.create(observer => {
  console.log('observer subscribed');
  observer.next(Math.random());
  observer.next(Math.random());
  observer.next(Math.random());
});

numbers.subscribe(i => console.log(`sub1: ${i}`));
numbers.subscribe(i => console.log(`sub2: ${i}`));

Given what I just told you, you should know that the output of these subscriptions will not be the same, and that you will see "observer subscribed" twice:

"observer subscribed"
"sub1: 0.7506911975797266"
"sub1: 0.99208431574516"
"sub1: 0.42133050598204136"
"observer subscribed"
"sub2: 0.7750216415151954"
"sub2: 0.530988305574283"
"sub2: 0.6986252819187939"

So this will inevitably come up in situations where you have a counter or something like that:

var subject = new Rx.Subject();

var scan = subject.startWith(0).scan((count, change) => count + change);

scan.subscribe(count => console.log(`sub1: ${count}`));

subject.next(1);
subject.next(1);
subject.next(1);

scan.subscribe(count => console.log(`sub2: ${count}`));

subject.next(1);
subject.next(1);
subject.next(1);

/* output:
"sub1: 0"
"sub1: 1"
"sub1: 2"
"sub1: 3"
"sub2: 0"
"sub1: 4"
"sub2: 1"
"sub1: 5"
"sub2: 2"
"sub1: 6"
"sub2: 3"
*/

There's a variety of solutions out there, but the simplest by far is to just use a ReplaySubject. This way, you replay the last X number of values you want when you subscribe, and you can just use the same source of data across various subscribers. Like so:

var subject = new Rx.Subject();

var scan = subject.startWith(0).scan((count, change) => count + change);

var replaySubject = new Rx.ReplaySubject(1);

scan.subscribe(replaySubject);

replaySubject.subscribe(count => console.log(`sub1: ${count}`));

subject.next(1);
subject.next(1);
subject.next(1);

replaySubject.subscribe(count => console.log(`sub2: ${count}`));

subject.next(1);
subject.next(1);
subject.next(1);

/* output:
"sub1: 0"
"sub1: 1"
"sub1: 2"
"sub1: 3"
"sub2: 3"
"sub1: 4"
"sub2: 4"
"sub1: 5"
"sub2: 5"
"sub1: 6"
"sub2: 6"
*/

Of course, this means that your original scan Observable is eagerly computed, even when you don't really need it. In this case, you might consider using refCount on a published Observable so that you're not subscribed to the original Observable when you don't need to be. Be aware though: this may not do what you want.

var subject = new Rx.Subject();

var scan = subject.startWith(0).scan((count, change) => count + change);

var refCounted = scan.publish().refCount();

subject.next(1);
subject.next(1);
subject.next(1);

var subscription = refCounted.subscribe(count => console.log(`sub: ${count}`));

subject.next(1);
subject.next(1);
subject.next(1);

subscription.unsubscribe();

subject.next(1);
subject.next(1);
subject.next(1);

var subscription = refCounted.subscribe(count => console.log(`sub: ${count}`));

subject.next(1);
subject.next(1);
subject.next(1);

/* output:
"sub: 0"
"sub: 1"
"sub: 2"
"sub: 3"
"sub: 0" <-- pay attention to this value!
"sub: 1"
"sub: 2"
"sub: 3"
*/

Conclusion

If you enjoyed this post, or want more clarification, you might enjoy these: