LoginSignup
1
1

More than 5 years have passed since last update.

Why things "repeat" in Rx, and how to make subscriptions "consistent"

Posted at

I got the idea to write this post from answering a question on StackOverflow about how to reuse data from an Observable: http://stackoverflow.com/questions/34657154/angular2-observable-scan-once-then-subscribe-by-many-async/34785866

If you've used RxJS a bit, but not much, you've probably mostly dealt with a single observer to an Observable. That's fine, really, but what happens when you try to add more observers to an Observable? Do you find that each observer sees a "reset" version of the state you want?

That's probably because you're using a "Cold" Observable, which is what you get by default. It makes sense when you look at how Observables work:

var numbers = Rx.Observable.create(observer => {
  console.log('observer subscribed');
  observer.next(Math.random());
  observer.next(Math.random());
  observer.next(Math.random());
});

numbers.subscribe(i => console.log(`sub1: ${i}`));
numbers.subscribe(i => console.log(`sub2: ${i}`));

Given what I just told you, you should know that the output of these subscriptions will not be the same, and that you will see "observer subscribed" twice:

"observer subscribed"
"sub1: 0.7506911975797266"
"sub1: 0.99208431574516"
"sub1: 0.42133050598204136"
"observer subscribed"
"sub2: 0.7750216415151954"
"sub2: 0.530988305574283"
"sub2: 0.6986252819187939"

So this will inevitably come up in situations where you have a counter or something like that:

var subject = new Rx.Subject();

var scan = subject.startWith(0).scan((count, change) => count + change);

scan.subscribe(count => console.log(`sub1: ${count}`));

subject.next(1);
subject.next(1);
subject.next(1);

scan.subscribe(count => console.log(`sub2: ${count}`));

subject.next(1);
subject.next(1);
subject.next(1);

/* output:
"sub1: 0"
"sub1: 1"
"sub1: 2"
"sub1: 3"
"sub2: 0"
"sub1: 4"
"sub2: 1"
"sub1: 5"
"sub2: 2"
"sub1: 6"
"sub2: 3"
*/

There's a variety of solutions out there, but the simplest by far is to just use a ReplaySubject. This way, you replay the last X number of values you want when you subscribe, and you can just use the same source of data across various subscribers. Like so:

var subject = new Rx.Subject();

var scan = subject.startWith(0).scan((count, change) => count + change);

var replaySubject = new Rx.ReplaySubject(1);

scan.subscribe(replaySubject);

replaySubject.subscribe(count => console.log(`sub1: ${count}`));

subject.next(1);
subject.next(1);
subject.next(1);

replaySubject.subscribe(count => console.log(`sub2: ${count}`));

subject.next(1);
subject.next(1);
subject.next(1);

/* output:
"sub1: 0"
"sub1: 1"
"sub1: 2"
"sub1: 3"
"sub2: 3"
"sub1: 4"
"sub2: 4"
"sub1: 5"
"sub2: 5"
"sub1: 6"
"sub2: 6"
*/

Of course, this means that your original scan Observable is eagerly computed, even when you don't really need it. In this case, you might consider using refCount on a published Observable so that you're not subscribed to the original Observable when you don't need to be. Be aware though: this may not do what you want.

var subject = new Rx.Subject();

var scan = subject.startWith(0).scan((count, change) => count + change);

var refCounted = scan.publish().refCount();

subject.next(1);
subject.next(1);
subject.next(1);

var subscription = refCounted.subscribe(count => console.log(`sub: ${count}`));

subject.next(1);
subject.next(1);
subject.next(1);

subscription.unsubscribe();

subject.next(1);
subject.next(1);
subject.next(1);

var subscription = refCounted.subscribe(count => console.log(`sub: ${count}`));

subject.next(1);
subject.next(1);
subject.next(1);

/* output:
"sub: 0"
"sub: 1"
"sub: 2"
"sub: 3"
"sub: 0" <-- pay attention to this value!
"sub: 1"
"sub: 2"
"sub: 3"
*/

Conclusion

If you enjoyed this post, or want more clarification, you might enjoy these:

1
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
1