JavaScript
TypeScript
RxJS

RxJS入門#1基本の概念をひとつずつ学ぶ


はじめに

rxjs6.3、typescript3.2で動作確認。


RxJSとはなにか

非同期とイベントのためのObserverパターンを使ったライブラリ。

イベントで渡ってきたデータを自由に加工することができるので、公式サイトでは「イベント用のLodash」と紹介されています。


Think of RxJS as Loadash for events.


非同期といえばjsにはすでにPromiseやasync/awaitなどの仕組みがありますが、それらとは何が違いRxJSだと何が嬉しいんでしょうか。

また、イベントといえばclickとかだと思いますが、イベントから渡ってきたデータの加工をわざわざ非同期用のライブラリでするってなんだ?って感じですね。

今回は基本の概念をひとつずつ次の順に見ていきます。


  • Observable:イベントや値をRxJSで受け取れる形にする

  • Operators:受け取ったイベントや値を加工する

  • Subject:Observableを同時にいろんなところで受け取れるようにする(マルチキャストを可能にする)

  • Subscription:subscribeの解除を行う


Observable:イベントや値をRxJSで受け取れる形にする


従来のclickイベント

document.addEventListener('click', event => console.log(event));


RxJSのclickイベント

import { fromEvent } from 'rxjs';

fromEvent(document, 'click').subscribe(event => console.log(event));

見慣れた従来のclickイベントはいいとして、RxJSの方もなにかの結果をメソッドチェーンで受け取るというこの書き方にはちょっと見覚えがあります。

subscribeという単語にこそなってますが、これはajaxやPromiseで使ってきたthenに近いものがありそうですね。

あれは非同期の処理結果をjqXHRオブジェクトやPromiseオブジェクトとして返してもらって、callback地獄ではなくわかりやすくその後の処理を書けるみたいな感じだったと思います。

今回はfromEventメソッドの返り値をsubscribeがあたかもthenのように受け取っているわけですが、このfromEventはなにを返しているんでしょうか。

この返してもらっているものがRxJSの肝であるObservableオブジェクトです。

ここではEventオブジェクトをラッパーしたObservableがsubscribeの引数に渡り、その後の処理に使用できるようになっています。

ちなみに、Observableが新たに生成されたとき(例えばイベントが発火したときなど)や変更したときを監視しているのがObserverというものです。Observableが監視される側、Observerが監視する側と単純に覚えておけばいいでしょう。

subscribeはObservableのメソッドですが、イメージ的にはObserverがObservableをsubscribeしていると考えた方がいいと思います。

Observableは公式サイトでは「未来の値やイベントのコレクションを呼び出せるもの」というような説明がされています。


Observable: represents the idea of an invokable collection of future values or events.


未来の値やイベントを呼び出せるというのは、「登録時点ではイベントが起きていなくてもObservableとしては登録できてsubscribeして呼び出す準備ができる」といったような意味だと思います。

上記はイベントで説明しましたが、イベントである必要はないので例えば値をラッパーするObservableも作ることができます。


Observableインスタンスを直接作成

import { Observable } from 'rxjs';

const observable = new Observable(subscriber => {
subscriber.next(1);
subscriber.next(2);
subscriber.next(3);
});

observable.subscribe(x => console.log(x));

Observableはnewからインスタンスを作る際は、引数にsubscribe関数を取ります。

subscribe関数は引数にsubscriberを取り、subscriberのnextメソッドで値を渡していきます。

この書き方は冗長なので次のように書くこともできます。


fromで値を渡してObservableを作成

import { from } from 'rxjs';

const observable = from([1, 2, 3]);
observable.subscribe(v => console.log(v));

fromEventでイベントからObservableを作れるように、fromはArrayライクなオブジェクトを引数に取り、Observableを作ることができます。また、fromではなくofを使うと、可変長の引数を渡してObservableを作ることができます。


ofで可変長引数に値を渡してObservableを作成

import { from } from 'rxjs';

const observable = of(1, 2, 3);
observable.subscribe(v => console.log(v));

未来の値やイベントを呼び出せるという意味はなんとなくわかりました。

では、「未来の値やイベントのコレクションを呼び出せるもの」のコレクションとはなんでしょうか?


Operators:受け取ったイベントや値を加工する


Observableにpipeメソッドで処理を追加してコレクション化

import { fromEvent } from 'rxjs';

import { mapTo, scan } from 'rxjs/operators';

fromEvent(document, 'click')
.pipe(
mapTo(1), // eventオブジェクトを1に変換
scan((count, click) => count + click, 0) // countに1を加算する。countの初期値は0
)
.subscribe(count => console.log(`Clicked ${count} times`));

Observableをpipeでつなげることにより、その中で順番にイベントや値を加工していくことができます。

最初にRxJSをイベント用のLodashと言いましたが、要はコレクションとはLodashのように値やイベントを自在に加工した後のもの的な意味です。

この加工に使えるmapToやscanはOperatorsと呼ばれています。map,filter,reduce,everyみたいなArrayに使うメソッドやLodashっぽいものが集まっています。scanはreduceみたいなものです。

ちなみにfromEventやfromなどのObservableを最初に作り出すものはCreation Operatorsと呼ばれます。

Operatorsは100ぐらいあり、Operatorsの中でも加工用以外種類もあったりするのですが今回はあまり扱いません。

さて、イベントを非同期のような仕組みで書けて、さらにそのイベントや値を自在に変換してその後の処理に渡せることはわかりました。

では、それが嬉しいパターンってなんでしょうか?イベントの値を変換するだけならRxJSでなくてもよさそうです。


コンポーネント間のイベントのやり取りが複雑なSPAでこそRxJSは使える

話は少し変わりますが、SPAフレームワークでのコンポーネント間のイベントの伝播って大体子から親にemitしてバケツリレーしていく形ですよね。

子から親ならシンプルですが、例えば子→親→別の子に伝播させたかったりするケースだけ考えてもけっこう面倒です。

ここで未来のイベントを呼び出せるというRxJSの考え方が活きてきます。

通常のemitが水(イベント)をバケツに入れてリレーさせていくと考えるとすれば、RxJSは水(値やイベント)を水道に流し各所(コンポーネント)で待ってるだけで水が流れてくるようにできるイメージだと思います。

例えるとこんな感じだと思います。

たとえ
RxJXの各要素


Observable(にラップされたイベントや値)

水道管
Observer(Observableを監視する)

ろ過
Operators

蛇口を開ける
subscribeメソッド

バケツリレーよりも圧倒的に便利そうです。

ただし、バケツリレーなら特になんの設備もなくバケツだけで水を運ぶことができますが、水道を整備するとなったら各所に水を届ける仕組みが必要で、そのあたりが少々面倒くさいところです。

なのでバケツリレーで十分ならそれでよし、複雑になりそうなら水道の手配を考えるといった使い分けがいいのではないでしょうか。

ちなみにReduxやVuexのStoreパターンは水道というよりダムを用意してあげて、すべての水はそこに置いとくから後は各所で取りに行ってねというイメージかなと思います。(水道もダムも水で説明してますが、水そのものが流れてくるというより、ペットボトルに入ったラベル付けされた水が水道から流れてくるなり、ダムに貯まってるようなイメージでしょうか。)

さて、ではどうやって各所にこの水を届ける仕組みを整えてあげられるのでしょうか?

これはSubjectという仕組みが使われることが多いです。


Subject:Observableを同時にいろんなところで受け取れるようにする(マルチキャストを可能にする)

今までの説明では特に問題ありませんでしたが、Observableだけでは同時にひとつのObserverにしか値やイベントを流すことができません(ユニキャスト)

例えば、次のように1つのObservableを2つのObserverに流すことはできますが、同時にひとつのObservableが流れることはなく、まず先にsubscribeしているところにObservableが流れ終わってから、次のsubscribeしているところに流れることになります。


Observable単独ではユニキャストでObserverに流すことしかできない

import { from } from 'rxjs';

const observable = from([1, 2, 3]);

observable.subscribe(v => console.log(`observerA: ${v}`));
observable.subscribe(v => console.log(`observerB: ${v}`));

// Log
// observerA: 1
// observerA: 2
// observerA: 3
// observerB: 1
// observerB: 2
// observerB: 3

これは処理も軽いですし、同じコンポーネント内での例なので少しイメージがつきづらいですが、時間のかかる処理を複数のコンポーネントで処理したい場合であれば非同期的に処理したいはずです。

Observable単独ではユニキャストでしか処理できないところを、Subjectを使用することでマルチキャストで複数のObserverに流すことができるようになります。


A Subject is like an Observable, but can multicast to many Observers


SubjectはObservableのように振る舞いますが、複数のObserverにマルチキャストできるようになります。

つまり下記のようなことが可能になります。


Subjectを間に挟むことでマルチキャストでObserverに流すことができる

import { Subject, from } from 'rxjs';

const subject = new Subject<number>();

subject.subscribe(v => console.log(`observerA: ${v}`));
subject.subscribe(v => console.log(`observerB: ${v}`));

const observable = from([1, 2, 3]);
observable.subscribe(subject);

// Log
// observerA: 1
// observerB: 1
// observerA: 2
// observerB: 2
// observerA: 3
// observerB: 3

まずSubjectインスタンスを直接作成した後、それをsubscribeしたときの動作を登録します。

今まではObservableをsubscribeしていたため少し変にも思えますが、SubjectはObservableのように振る舞うためこのような書き方が可能です。

そして、Observable自体のsubscribeには先程作ったSubjectを渡してやります。

こうすることで、

Observable → Subject → Observer

          ↘ Observer

の流れができ、SubjectからはマルチキャストでObserverに届けられます。


SubjectはObservable自体としても使える

import { Subject } from 'rxjs';

const subject = new Subject<number>();

subject.subscribe(v => console.log(`observerA: ${v}`));
subject.subscribe(v => console.log(`observerB: ${v}`));

subject.next(1);
subject.next(2);
subject.next(3);

// Log
// observerA: 1
// observerB: 1
// observerA: 2
// observerB: 2
// observerA: 3
// observerB: 3

SubjectはObservableのように振る舞うのでObservable自体としても使うことができます。

あとからSubjectにnextメソッドで値やイベントを流すことができます。

Subjectには様々な種類があるのですがここでは説明しません。


Subscription:subscribeの解除を行う

これまでもちょっと出てきたSubscriptionを最後に説明したいのですが、その前にsubscribeメソッドについて深掘りします。


subscribeメソッドの引数は本来はnext、error、completeメソッドを持つ

import { from } from 'rxjs';

const observable = from([1, 2, 3]);

observable.subscribe({
next: x => console.log('got value ' + x),
error: err => console.error('something wrong occurred: ' + err),
complete: () => console.log('done')
});

// Log
// got value 1
// got value 2
// got value 3
// done

今まではsubscribeメソッドに無名関数をひとつ渡してきただけでしたが、本来はsubscribeメソッドはnext、error、completeの3つのメソッドを持ちます。

それぞれの意味はなんとなくわかると思いますが、

nextはObservableを受け取ったとき、

errorはObservableの受け取りに失敗したとき、

completeはObservableを受け取り終わったとき、

にそれぞれ発火します。

subscribeメソッドは連想配列形式でこれらメソッドを渡すこともできますが、無名関数を順番に引数に渡すだけでもnext、error、completeの順に関数が登録されていきます。

なので、上の書き方は次のように書くこともできます。


subscribeメソッドには引数に無名関数を渡すだけでもいい

import { from } from 'rxjs';

const observable = from([1, 2, 3]);

observable.subscribe(
x => console.log('got value ' + x),
err => console.error('something wrong occurred: ' + err),
() => console.log('done')
);

// Log
// got value 1
// got value 2
// got value 3
// done

渡した無名関数の順にnext、error、completeになります。

もし、error、completeが不要であれば第一引数にnext用の無名関数を渡せばいいだけなので、今まではこれを使用してきました。

さて、subscribeメソッドがわかったところでSubscriptionについて見ていきます。


Subscriptionはsubscribeメソッドの戻り値

import { interval } from 'rxjs';

const observable = interval(1000);
const subscription = observable.subscribe(x => console.log(x));

setTimeout(() => {
subscription.unsubscribe();
}, 5000);

// Log
// 0
// 1
// 2
// 3
// 4

Subscriptionはsubscribeメソッドの戻り値です。

その主要な役割はunscribeによるsubscribeの解除です。

上の例では、intervalで1秒に1回、1ずつ増えるObservableが渡ってきますが、5秒後にunsubscribeしているため、それ以降ログが吐かれることはありません。


まとめ

RxJSでイベントや値を流す側と、受け取る側を疎結合にできるので、SPAのように複数のコンポーネントでそれらを分けて管理したいときに便利そうなことがわかりました。

最後に水と水道で例えるとこのようになるかと思います。

たとえ
RxJXの各要素


Observable(にラップされたイベントや値)

水道管
Observer(Observableを監視する)

複数の水道管につなげられる仕組み
Subject

ろ過
Operators

蛇口を開ける
subscribeメソッド

蛇口を閉める
Subscription.unsubscribeメソッド


参考

RxJS公式サイト