JavaScript
RxJS
Observable

RxJSの基礎中の基礎

Angularの依存ライブラリにも採用されるなど、近年注目を集めているRxJS。

その基礎中の基礎を確認しておきましょう。

ObservableとかObserverとかSubscribeとか謎概念がいろいろあるので、そのあたりの整理が目的です。

正直、僕もここまで理解するだけで結構時間がかかりました。「ストリームを『購読』する」とか言われても、全然意味がわからなかった。意味がわからなかったというか、「結局どう動いているのか」がイマイチ掴めない感じでした。

この記事ではそうならないように、できるだけ簡単な所から初めて、一つずつ理解を積み上げていきたいと思います。

あえてTypeScriptは使わずに、JavaScriptでコードを書いていきます。TypeScriptの方が、型の記述ができるので参考情報が増えるのですが、理解するための必要知識も増えてしまうからです。必要知識は少ない方がいい。


ObserverとSubscribeの関係


Observer

Observerはオブジェクトです。

3つの関数をまとめて一つのオブジェクトにしたものです。

3つの関数とは、 「next」「error」「complete」の3つです。


Subscribe

Subscribeは関数です。なので以下では時々「Subscribe関数」と呼びます。

Subscribe関数は、Observerを受け取って、Observerの中にある関数(next,error,complete)を実行します。

どういう順序で、どういう引数を与えて実行するかは、Subscribe関数内に記述します。


コードで確認しましょう。

今回は、Observer内のnext関数を、引数を変えながら3回実行したあと、Observer内のcomplete関数を実行してみます。

まだRxJSは出てきません(なんですって!!!)


コード

var observer_1 = {

next: x => console.log('Observer1 got a value: ' + x),
error: err => console.log('Observer1 got an error: ' + err),
complete: () => console.log('Observer1 got Complete'),
}; // next,error,completeという3つの関数の集まり

function subscribe_1(observer){
observer.next(1);
observer.next(2);
observer.next(3);
observer.complete();
}// observerを受け取り、observerが持つ関数を実行する関数

subscribe_1(observer_1);



出力

Observer1 got a value: 1

Observer1 got a value: 2
Observer1 got a value: 3
Observer1 got Complete


解釈を変えてみる

ところで、このコードは

SubscribeがObserverに、『1,2,3』というデータを順に渡した後、『完了』を通知した

と解釈することができます。

この観点で、同じコードをもう一度みてみましょう。


コード

var observer_1 = {

next: x => console.log('Observer1 got a value: ' + x),
error: err => console.log('Observer1 got an error: ' + err),
complete: () => console.log('Observer1 got Complete'),
}; // データを受け取った時に何をするのかが書かれている

function subscribe_1(observer){
observer.next(1);
observer.next(2);
observer.next(3);
observer.complete();
}// 指定のObserverに、『1,2,3』というデータと『完了』を送る

subscribe_1(observer_1);
// subscribe_1が送るデータの受け取り手としてobserver_1を指定する



出力

Observer1 got a value: 1

Observer1 got a value: 2
Observer1 got a value: 3
Observer1 got Complete

同じ出力が出ましたね。コメント以外同じコードですから、そりゃあそう。


まとめ


  • Observerはnext,error,completeという3つの関数を持っている。

  • Subscribe関数にObserverを渡すと、Observer内の関数が実行される。

  • SubscribeがObserverにデータを渡している、と解釈できる。


ObservableとSubscribeの関係

ではいよいよ、RxJSライブラリを読み込んで使いましょう。


Observable

RxJSで一番基本になるのは、Observableというオブジェクトです。

Observerオブジェクトと名前が似ていますが、別物なので気をつけてください。こういう紛らわしい所も嫌ですね。しっかり区別してください。

Observableオブジェクトは、subscribeという名前のメソッドを持ちます。

イメージ、こんな感じです。

observable_a = {

subscribe : function(observer){
observer.next(1);
observer.next(2);
observer.next(3);
observer.complete();
}
}

さっき作った関数subscribe_1の内容をそのまま書いてみました。

subscribeメソッドは、Observerオブジェクトを受け取って中の関数(next,error,complete)を実行してくれるようなメソッドになっています。(今の所errorは一回も呼んでないですけどね)

てもこれじゃ本当にメソッドを一つ持つだけのオブジェクトになっていて、他に何の機能もないですね。

それではいけないので、ちゃんとRxJSライブラリを使って、本物のObservableオブジェクトを作りましょう。


Observableを作る


createで作る

Observableオブジェクトを1から作るには、Rx.Observable.createという関数を使います。

このRx.Observable.createの引数に関数を渡すと、それがそのまま、subscribeという名前のメソッドとして登録されます。

そして、そのsubscribeメソッドにObserverオブジェクトを渡すと、Observer内の関数(next,error,complete)を実行してくれるわけです。

やってみましょう。


コード

Rx = require('./rx.min.js');//import文などで代用しても可

function subscribe_1(observer){
observer.next(1);
observer.next(2);
observer.next(3);
observer.complete();
}// さっきと同じSubscribe関数
// 「指定のObserverに、『1,2,3』というデータと『完了』を送るもの」と解釈することもできる

var observable_1 = Rx.Observable.create(subscribe_1);
// Observableオブジェクトを作る
// subscribeメソッドとして、関数subscribe_1が登録される。

var observer_1 = {
next: x => console.log('Observer1 got a value: ' + x),
error: err => console.log('Observer1 got an error: ' + err),
complete: () => console.log('Observer1 got Complete'),
}; // さっきと同じObserverオブジェクト
// 「データを受け取った時の挙動が書かれているもの」と解釈することも出来る

observable_1.subscribe(observer_1);
// subscribeメソッドを呼び出し、Observerを渡す
// 「データの受け取り手としてobserver_1を指定した」と解釈することもできる



出力

Observer1 got a value: 1

Observer1 got a value: 2
Observer1 got a value: 3
Observer1 got Complete

同じ出力がでました。

さっき、ObserverオブジェクトとSubscribe関数を作って実行した時とほとんど変わりませんね。

Subscribe関数を直接呼ぶ前に、一度Observableのメソッドにしてから呼び出しただけの違いです。


createを使わずに作る

今のと同じObservableオブジェクトを、createを使わずに作ることが出来ます。


コード

Rx = require('./rx.min.js');//import文などで代用しても可

observable_1 = Rx.Observable.of(1,2,3);//createではなくofを使う

var observer_1 = {
next: x => console.log('Observer1 got a value: ' + x),
error: err => console.log('Observer1 got an error: ' + err),
complete: () => console.log('Observer1 got Complete'),
}; // さっきと同じObserverオブジェクト

observable_1.subscribe(observer_1);



出力

Observer1 got a value: 1

Observer1 got a value: 2
Observer1 got a value: 3
Observer1 got Complete

ポイントは以下の2点です。



  • createではなくofを使った

  • Subscribe関数を自分で作らなかった

  • ofで作ったObservableはsubscribeメソッドを初めから持っていた

次のようにしても同じです。

observable_1 = Rx.Observable.from([1,2,3]);// createではなくfromを使う

このような、Observableを生成する関数を「Creation Operator」といいます。

RxJSには、ちょうどいいSubscribe関数を持ったObservableを自動で作ってくれるCreation Operatorがいろいろ用意されてます。


Observerオブジェクトの省略記法

次のコードを見てください


コード

Rx = require('./rx.min.js');//import文などで代用しても可

Rx.Observable.of(1,2,3).subscribe(x => console.log("The observer got a value: "+x))



出力

The observer got a value: 1

The observer got a value: 2
The observer got a value: 3

なんか、えらく短いコードで、似たような出力が出ましたね。

ライブラリを読み込む部分を除くと、一行しかありません。

ちょっと、コードの中身を見てみましょう。

Rx.Observable.of(1,2,3).subscribe(なんたらかんたら)

全体はこういう形をしています。

ofを使ってObservableを作り、そのsubscribeメソッドを呼び出したんですね。

ということは「なんたらかんたら」の部分にはObserverオブジェクトが入ってるはずですね。

でもコード中では

x => console.log("The observer got a value: "+x)

が入ってます。これ、ただの関数ですね。しかも無名の。

実は、subscribeメソッドの引数にいきなり関数を与えると、Observerのnext関数を指定したことになるんです。

subscribeの引数には3つまで関数を指定することができて、順にnext,error,completeとして扱われます。

今回のコードは、Observerオブジェクトの代わりに、next関数だけをsubscribeに渡した、ということになるんですね。

complete関数を渡していないから、さっきまで出力されていたObserver got Completeにあたる行は出力されないわけです。


まとめ


  • Observableはsubscribeというメソッドを持つ

  • ObservableはcreateかCreation Operatorを使って作る

  • Observableのsubscribeには関数を直接渡せる


Observableのメリット

本当の基礎中の基礎は、ここまででおわりです。とりあえずObservableを作って、動かせる。

でもこれじゃあ、Observableを使う意味がわかりませんね。

一番最初にObservableを使わずにできたことを、Observableを使ってなぞっただけ。

Observableいらねえじゃねえか!って話です。

じゃあObservableは一体何がすごいのか。


非同期処理ができるのがメリット??

よく言われるのが非同期処理です。

では「非同期処理ができること」が、Observableのメリットなんでしょうか??

次のコードを見てください。「// 大事」って書いてあるとこだけが大事です。


コード

Rx = require('./rx.min.js');//import文などで代用しても可

var observer_1 = {
next: x => console.log('Observer1 got a value: ' + x),
error: err => console.log('Observer1 got an error: ' + err),
complete: () => console.log('Observer1 got Complete'),
}; // さっきと同じ

function subscribe_1(observer){
observer.next(1);
observer.next(2);
setTimeout(function(){// 大事
observer.next(3); // 大事
},1000); //大事
}

var observable_1 = Rx.Observable.create(subscribe_1);
// Observableオブジェクトを作る

console.log("before");// 大事
observable_1.subscribe(observer_1);// 大事
console.log("after");// 大事


before

Observer1 got a value: 1
Observer1 got a value: 2
after
Observer1 got a value: 3

「3」が出力されるより先に「after」が出力されてますね。

setTimeout(処理,1000)は、1秒(1000ミリ秒)だけ待ってから処理を実行する関数ですが

それを待たずして非同期に「after」が出力されましたね。なるほど、非同期処理ができている。

では次のコードを見てください。


コード

var observer_1 = {

next: x => console.log('Observer1 got a value: ' + x),
error: err => console.log('Observer1 got an error: ' + err),
complete: () => console.log('Observer1 got Complete'),
}; // さっきと同じ

function subscribe_1(observer){
observer.next(1);
observer.next(2);
setTimeout(function(){
observer.next(3);
},1000);
}

console.log("before");
subscribe_1(observer_1);// 大事
console.log("after");



出力

before

Observer1 got a value: 1
Observer1 got a value: 2
after
Observer1 got a value: 3

同じ結果が出ましたが、ポイントは、RxJSを使っていないということです。

あれ?非同期処理できてますね????

はい。別にRxJSを使わなくても非同期処理自体はできるんですね。

RxJSは非同期処理をするためのライブラリではありません。

では何なのかというと

非同期処理をスッキリ書くためのライブラリなんです!


オペレータが便利!!

先ほどCreation Operatorというものを使いましたが、これはRxJSの数あるOperatorのうちの1カテゴリーにすぎません。

他にもOperatorのカテゴリーはいろいろあり、それぞれのカテゴリーにはたくさんのOperatorがあります。

その便利さを紹介するために、次のような例を考えてみましょう。

「1秒ごとに流れてくる数字を偶数だけ足していき、随時それを文字列に変換して後ろに!!を付けて出力する」

説明のために作った例なので「そんな場面あります???」って感じですが、まあ、やっていきましょう。


コード

Rx = require('./rx.min.js');//import文などで代用しても可

Rx.Observable.interval(1000)// 1秒ごとに1増える整数を生成
.filter(x => x%2 === 0)// 偶数だけ残す
.scan((acc,curr) => acc+curr,0)// 随時足す
.map(x => String(x)+"!!")// 文字列に変換して!!を付ける
.subscribe(x => console.log(x));// 出力する



出力

0!!

2!!
6!!
12!!
20!!
30!!
(以下、Ctrl+Cなどで止めるまで永遠に続く)

2秒に1つずつ出力されてきます。できましたね!

requireの行を除くと、たったの5行です。すごい。

ここで使っているinterval,filter,scan,mapがOperatorです。

一つ一つのOperatorの解説を始めると「RxJSの基礎の基礎」の範疇じゃなくなるので解説しません。

これらのOperatorは、返り値としてまたObserverを返すので、.を使ってメソッドチェーンとして繋げて書けるわけです。

どんどんOperatorを適用して、Observerの中身(subscribeメソッドの中身)を変えていくわけですね!

これ、RxJS使わないで書くと何行になるんでしょうね…?

そもそもどうやって書いたら良いのか考えるのが面倒です。

とにかくRxJSを使うと、この手の処理が簡単に書けるんです。


注意(RxJS 6.x以上では、オペレーターの書き方が変わっています)

2018年04月24日に、RxJSのバージョン6.0.0がリリースされました。

RxJS 6.x以上では、この記事内にあるコードはそのままでは動かなくなっています。(まだ記事書いて半年なのに動かなくなるなんて)

必要な変更を簡単に書くと


  • operatorはobservable1.pipe(map(...),reduce(...)).subscribe(...);のように書く

  • import文をimport { Observable, of } from 'rxjs';のように書く

  • operatorのimportは import { map, reduce } from 'rxjs/operators';のように書く

となっています。

詳しくは下記のサイトを御覧ください。

RxJS v5.x to v6 Update Guide (本家、英語)

RxJS 6.0 変更点まとめ(適宜更新) (Qiita記事、日本語)


Operatorは配列の操作に似ている

mapfilterといったOperatorは、実は配列Arrayオブジェクトのメソッドにも同じものがあります。

またscanは、配列で言う所のreduceと似た動きをしています。(そしてObservableにもreduceOperatorがあります。)

「配列の操作ならそこそこ慣れてるぞ!」という方は、Observableを「値が順々に送られてくる配列」として考えると、わかりやすいと思います。


まとめ


  • Observableは、Operatorをどんどん繋いで操作する

  • 非同期処理を、配列を操作する感覚で書けるのがすごい

さて。この記事で、本意気で説明するのはここまでです。

「基礎の基礎」と呼べるのは、このくらいまでの範囲かな。

「ObservableやSubscribeやObserverという『コア』を、Operatorで操作する」ということです。これが基礎中の基礎。

ですが、まだ触れていない、重要な概念がいくつかあるので、ここからはそれらをサクサク紹介していきます。


Subscription

突然ですが、実は、subscribeメソッドは返り値を持ちます。

最初に作ったsubscribe_1という関数には返り値がありませんでしたが、あれは単に返り値を書かなかっただけで、本当は返り値をちゃんと書くのがあるべき姿です。

ではどんな返り値が要るのかというと、「Subscriptionオブジェクト」というものです。

Subscriptionオブジェクトは、unsubscribeというメソッドを持ちます。

このメソッドを呼び出すと、Subscribe関数からObserverオブジェクトへデータを渡すのを停止することができます。

例で見てみましょう。

Creation Operatorで作ったObservableのSubscribe関数は、ちゃんとSubscriptionオブジェクトを返してくれます。

先ほど作った止まらないプログラムを、時間指定で止めてみましょう。


コード

Rx = require('./rx.min.js');//import文などで代用しても可

subscription_1 = Rx.Observable.interval(1000)// 返り値を代入
.filter(x => x%2 === 0)
.scan((acc,curr) => acc+curr,0)
.map(x => String(x)+"!!")
.subscribe(x => console.log(x));

setTimeout(function(){
subscription_1.unsubscribe();// unsubscribeを呼び出す
},10000);



出力

0!!

2!!
6!!
12!!
20!!

10秒(10000ミリ秒)で止まりました。想定通りの動きです!


Subject

Subjectというオブジェクトがあります。

これは「Observableでもあり、Observerでもある」と説明されます。

でも、僕はどっちかっていうと、Observerの方が近いかな〜と思います。

基本的なイメージは、複数のObserverをまとめたものがSubjectという感じです。


SubjectにObserverを登録する

Subjectオブジェクトは、subscribeメソッドを持ちます。

Observableにもsubscribeメソッドがありましたね。これが「SubjectはObservableでもある」と言われる所以です。

subscribeメソッドにはもちろん、Observerオブジェクトを渡します。


コード

Rx = require('./rx.min.js');//import文などで代用しても可

var observer_1 = {
next: x => console.log('Observer1 got a value: ' + x),
}; // さっきとほぼ同じObserverオブジェクト
// errorとcompleteを省略しました。それでも動くので大丈夫。

var subject = new Rx.Subject();// Subjectを作成

subject.subscribe(observer_1);// subscribeを呼び出す



出力


ところがこのコード、何も出力しません。

よく見ると、実は、Observerのnextに渡すデータが、どこからも発生していません。

subscribeメソッドを呼び出してObserverを渡しただけでは、特に何も実行されないんですね。

こういう所が、Observerと違う所です。

では、SubjectのsubscribeメソッドにObserverを渡すと何が起こるかというと

そのSubjectに、渡したObserverが登録されます。

一つのSubjectには、複数のObserverを登録することができます。

そして、Subjectのnext関数を呼び出して実行すると、

登録したObserverたちのnext関数が一斉に実行されます。

Subjectはnextという関数をメソッドとして持つんですね。こういう所が、「SubjectはObserverでもある」と言われる所以ですね。

例で見てみましょう。


コード

x = require('./rx.min.js');//import文などで代用しても可

var observer_1 = {
next: x => console.log('Observer1 got a value: ' + x),
complete: x => console.log('Observer1 got Complete'),
}; // さっきとほぼ同じObserverオブジェクト
// errorは使ってなかったので省略しました。

var observer_2 = {
next: x => console.log('Observer2 got a value: ' + x),
complete: x => console.log('Observer2 got Complete'),
}; // さっきとほぼ同じObserverオブジェクト

var observer_3 = {
next: x => console.log('Observer3 got a value: ' + x),
complete: x => console.log('Observer3 got Complete'),
}; // さっきとほぼ同じObserverオブジェクト

var subject_1 = new Rx.Subject();// Subjectを作成

subject_1.subscribe(observer_1);// Observerを登録
subject_1.subscribe(observer_2);// Observerを登録
subject_1.subscribe(observer_3);// Observerを登録

subject_1.next("オラオラオラ");// Subjectのnextメソッドを実行



出力

Observer1 got a value: オラオラオラ

Observer2 got a value: オラオラオラ
Observer3 got a value: オラオラオラ

Subjectのnext関数に渡した「オラオラオラ」が、各Observerにも渡されたのがわかります。


SubjectをObservableのsubscribeメソッドに渡す

このSubjectをObservableのsubscribeメソッドに渡してみましょう

大事なのは一番下の行だけです。あとはさっきとほぼ同じなので。


コード

Rx = require('./rx.min.js');//import文などで代用しても可

var observer_1 = {
next: x => console.log('Observer1 got a value: ' + x),
complete: x=> console.log('Observer1 got Complete'),
}; // さっきとほぼ同じObserverオブジェクト

var observer_2 = {
next: x => console.log('Observer2 got a value: ' + x),
complete: x=> console.log('Observer2 got Complete'),
}; // さっきとほぼ同じObserverオブジェクト

var observer_3 = {
next: x => console.log('Observer3 got a value: ' + x),
complete: x=> console.log('Observer3 got Complete'),
}; // さっきとほぼ同じObserverオブジェクト

var subject_1 = new Rx.Subject();// Subjectを作成

subject_1.subscribe(observer_1);// Observerを登録
subject_1.subscribe(observer_2);// Observerを登録
subject_1.subscribe(observer_3);// Observerを登録

Rx.Observable.of(1,2).subscribe(subject_1);// 大事



出力

Observer1 got a value: 1

Observer2 got a value: 1
Observer3 got a value: 1
Observer1 got a value: 2
Observer2 got a value: 2
Observer3 got a value: 2
Observer1 got Complete
Observer2 got Complete
Observer3 got Complete

SubjectをObservableに渡したら、そのSubjectに登録されていた複数のObserverにデータが複製されて流されていきましたね。

このように、Subjectを使うと、一つのObservableから出てきたデータを複数のObserverに渡すことができます。


Subjectの種類

Subjectには、Observerにデータを渡すタイミングが異なるいくつかの亜種みたいなものがあります。


Behavior Subject

最後に送ったデータを保持していて、新しくObserverが登録されるとそのObserverに保持しているデータを送る。(最初は「最後に送ったデータ」がないので、初期値を設定する)

このケースでは、Subjectのsubscribeメソッドを呼び出してObserverを渡すと、すぐにObserverのnextが実行されることになります。


Replay Subject

Behavior Subjectに似ていますが、こちらは指定した個数のデータを保持します。

新しくObserverが登録されると、保持しているデータを古い順に全部Observerに渡します。


Async Subject

こちらは、Subjectのnextメソッドを呼んでも、Observerにまでデータを渡しません。(登録されているObserverのnextメソッドを呼ばない)

Subjectのcompleteが呼ばれた時に、最後のデータと完了通知を各Observerに送ります。


Scheduler

最後に、Schedulerオブジェクトの話をします。

Schedulerオブジェクトは、その名の通り、ObservableからObserverにデータを送るタイミングを調整するもので

例えば、Creation OperatorでObservableを作る時に引数として渡したります。

例で見てみましょう。


コード

Rx = require('./rx.min.js');//import文などで代用しても可

var scheduler_1 = Rx.Scheduler.async;// Schedulerを作る

var observer_1 = {
next: x => console.log('Observer1 got a value: ' + x),
complete: x=> console.log('Observer1 got Complete'),
}; // さっきとほぼ同じObserverオブジェクト

console.log("before");
Rx.Observable.of(1,2,3,scheduler_1).subscribe(observer_1);// Schedulerを渡してる
console.log("after");



出力

before

after
Observer1 got a value: 1
Observer1 got a value: 2
Observer1 got a value: 3
Observer1 got Complete

ポイントは、afterObserver1 got a value: 1よりも先に来ている、という所です。

Schedulerを渡していなかったら、afterObserver1 got Completeの次に来るはずです。

Schedulerの具体的な動きについては今日は説明しませんが、とにかく、Schedulerを使うと実行順序を調整することが出来ます。

createを使ってObservableを作る場合は、onSubscribeというメソッドを呼び出してSchedulerを渡すことになります。


次回予告

さて。

本当はこの記事内で、あとSubjectとSchedulerについて扱おうと思っていたのですが

ちょっと長くなってきたし、「基礎の基礎」から若干外れてくるので

別の記事に回したいと思います。

書いたらリンク貼っておきます。

SubjectとSchedulerについても簡単にこの記事内に記述したので「次回」は特にありません!

それでは!