I got the idea to write this post from answering a question on StackOverflow about how to reuse data from an Observable: http://stackoverflow.com/questions/34657154/angular2-observable-scan-once-then-subscribe-by-many-async/34785866
If you've used RxJS a bit, but not much, you've probably mostly dealt with a single observer to an Observable. That's fine, really, but what happens when you try to add more observers to an Observable? Do you find that each observer sees a "reset" version of the state you want?
That's probably because you're using a "Cold" Observable, which is what you get by default. It makes sense when you look at how Observables work:
var numbers = Rx.Observable.create(observer => {
console.log('observer subscribed');
observer.next(Math.random());
observer.next(Math.random());
observer.next(Math.random());
});
numbers.subscribe(i => console.log(`sub1: ${i}`));
numbers.subscribe(i => console.log(`sub2: ${i}`));
Given what I just told you, you should know that the output of these subscriptions will not be the same, and that you will see "observer subscribed" twice:
"observer subscribed"
"sub1: 0.7506911975797266"
"sub1: 0.99208431574516"
"sub1: 0.42133050598204136"
"observer subscribed"
"sub2: 0.7750216415151954"
"sub2: 0.530988305574283"
"sub2: 0.6986252819187939"
So this will inevitably come up in situations where you have a counter or something like that:
var subject = new Rx.Subject();
var scan = subject.startWith(0).scan((count, change) => count + change);
scan.subscribe(count => console.log(`sub1: ${count}`));
subject.next(1);
subject.next(1);
subject.next(1);
scan.subscribe(count => console.log(`sub2: ${count}`));
subject.next(1);
subject.next(1);
subject.next(1);
/* output:
"sub1: 0"
"sub1: 1"
"sub1: 2"
"sub1: 3"
"sub2: 0"
"sub1: 4"
"sub2: 1"
"sub1: 5"
"sub2: 2"
"sub1: 6"
"sub2: 3"
*/
There's a variety of solutions out there, but the simplest by far is to just use a ReplaySubject
. This way, you replay the last X number of values you want when you subscribe, and you can just use the same source of data across various subscribers. Like so:
var subject = new Rx.Subject();
var scan = subject.startWith(0).scan((count, change) => count + change);
var replaySubject = new Rx.ReplaySubject(1);
scan.subscribe(replaySubject);
replaySubject.subscribe(count => console.log(`sub1: ${count}`));
subject.next(1);
subject.next(1);
subject.next(1);
replaySubject.subscribe(count => console.log(`sub2: ${count}`));
subject.next(1);
subject.next(1);
subject.next(1);
/* output:
"sub1: 0"
"sub1: 1"
"sub1: 2"
"sub1: 3"
"sub2: 3"
"sub1: 4"
"sub2: 4"
"sub1: 5"
"sub2: 5"
"sub1: 6"
"sub2: 6"
*/
Of course, this means that your original scan
Observable is eagerly computed, even when you don't really need it. In this case, you might consider using refCount
on a publish
ed Observable so that you're not subscribed to the original Observable when you don't need to be. Be aware though: this may not do what you want.
var subject = new Rx.Subject();
var scan = subject.startWith(0).scan((count, change) => count + change);
var refCounted = scan.publish().refCount();
subject.next(1);
subject.next(1);
subject.next(1);
var subscription = refCounted.subscribe(count => console.log(`sub: ${count}`));
subject.next(1);
subject.next(1);
subject.next(1);
subscription.unsubscribe();
subject.next(1);
subject.next(1);
subject.next(1);
var subscription = refCounted.subscribe(count => console.log(`sub: ${count}`));
subject.next(1);
subject.next(1);
subject.next(1);
/* output:
"sub: 0"
"sub: 1"
"sub: 2"
"sub: 3"
"sub: 0" <-- pay attention to this value!
"sub: 1"
"sub: 2"
"sub: 3"
*/
Conclusion
If you enjoyed this post, or want more clarification, you might enjoy these:
- Andre's vide on Cold vs Hot Observables: https://egghead.io/lessons/rxjs-demystifying-cold-and-hot-observables-in-rxjs
- Paul Taylor's talk from Reactive 2015 on RxJS: https://youtu.be/QhjALubBQPg?t=385 (especially this timestamp)