Angularの依存ライブラリにも採用されるなど、近年注目を集めているRxJS。
その基礎中の基礎を確認しておきましょう。
ObservableとかObserverとかSubscribeとか謎概念がいろいろあるので、そのあたりの整理が目的です。
正直、僕もここまで理解するだけで結構時間がかかりました。「ストリームを『購読』する」とか言われても、全然意味がわからなかった。意味がわからなかったというか、「結局どう動いているのか」がイマイチ掴めない感じでした。
この記事ではそうならないように、できるだけ簡単な所から初めて、一つずつ理解を積み上げていきたいと思います。
あえてTypeScriptは使わずに、JavaScriptでコードを書いていきます。TypeScriptの方が、型の記述ができるので参考情報が増えるのですが、理解するための必要知識も増えてしまうからです。必要知識は少ない方がいい。
ObserverとSubscribeの関係
Observer
Observerはオブジェクトです。
3つの関数をまとめて一つのオブジェクトにしたものです。
3つの関数とは、 「next」「error」「complete」の3つです。
Subscribe
Subscribeは関数です。なので以下では時々「Subscribe関数」と呼びます。
Subscribe関数は、Observerを受け取って、Observerの中にある関数(next,error,complete)を実行します。
どういう順序で、どういう引数を与えて実行するかは、Subscribe関数内に記述します。
例
コードで確認しましょう。
今回は、Observer内のnext関数を、引数を変えながら3回実行したあと、Observer内のcomplete関数を実行してみます。
まだRxJSは出てきません(なんですって!!!)
var observer_1 = {
next: x => console.log('Observer1 got a value: ' + x),
error: err => console.log('Observer1 got an error: ' + err),
complete: () => console.log('Observer1 got Complete'),
}; // next,error,completeという3つの関数の集まり
function subscribe_1(observer){
observer.next(1);
observer.next(2);
observer.next(3);
observer.complete();
}// observerを受け取り、observerが持つ関数を実行する関数
subscribe_1(observer_1);
Observer1 got a value: 1
Observer1 got a value: 2
Observer1 got a value: 3
Observer1 got Complete
解釈を変えてみる
ところで、このコードは
SubscribeがObserverに、『1,2,3』というデータを順に渡した後、『完了』を通知した
と解釈することができます。
この観点で、同じコードをもう一度みてみましょう。
var observer_1 = {
next: x => console.log('Observer1 got a value: ' + x),
error: err => console.log('Observer1 got an error: ' + err),
complete: () => console.log('Observer1 got Complete'),
}; // データを受け取った時に何をするのかが書かれている
function subscribe_1(observer){
observer.next(1);
observer.next(2);
observer.next(3);
observer.complete();
}// 指定のObserverに、『1,2,3』というデータと『完了』を送る
subscribe_1(observer_1);
// subscribe_1が送るデータの受け取り手としてobserver_1を指定する
Observer1 got a value: 1
Observer1 got a value: 2
Observer1 got a value: 3
Observer1 got Complete
同じ出力が出ましたね。コメント以外同じコードですから、そりゃあそう。
まとめ
- Observerはnext,error,completeという3つの関数を持っている。
- Subscribe関数にObserverを渡すと、Observer内の関数が実行される。
- SubscribeがObserverにデータを渡している、と解釈できる。
ObservableとSubscribeの関係
ではいよいよ、RxJSライブラリを読み込んで使いましょう。
Observable
RxJSで一番基本になるのは、Observableというオブジェクトです。
Observerオブジェクトと名前が似ていますが、別物なので気をつけてください。こういう紛らわしい所も嫌ですね。しっかり区別してください。
Observableオブジェクトは、subscribe
という名前のメソッドを持ちます。
イメージ、こんな感じです。
observable_a = {
subscribe : function(observer){
observer.next(1);
observer.next(2);
observer.next(3);
observer.complete();
}
}
さっき作った関数subscribe_1
の内容をそのまま書いてみました。
subscribe
メソッドは、Observerオブジェクトを受け取って中の関数(next,error,complete)を実行してくれるようなメソッドになっています。(今の所errorは一回も呼んでないですけどね)
てもこれじゃ本当にメソッドを一つ持つだけのオブジェクトになっていて、他に何の機能もないですね。
それではいけないので、ちゃんとRxJSライブラリを使って、本物のObservableオブジェクトを作りましょう。
Observableを作る
create
で作る
Observableオブジェクトを1から作るには、Rx.Observable.create
という関数を使います。
このRx.Observable.create
の引数に関数を渡すと、それがそのまま、subscribe
という名前のメソッドとして登録されます。
そして、そのsubscribeメソッドにObserverオブジェクトを渡すと、Observer内の関数(next,error,complete)を実行してくれるわけです。
やってみましょう。
Rx = require('./rx.min.js');//import文などで代用しても可
function subscribe_1(observer){
observer.next(1);
observer.next(2);
observer.next(3);
observer.complete();
}// さっきと同じSubscribe関数
// 「指定のObserverに、『1,2,3』というデータと『完了』を送るもの」と解釈することもできる
var observable_1 = Rx.Observable.create(subscribe_1);
// Observableオブジェクトを作る
// subscribeメソッドとして、関数subscribe_1が登録される。
var observer_1 = {
next: x => console.log('Observer1 got a value: ' + x),
error: err => console.log('Observer1 got an error: ' + err),
complete: () => console.log('Observer1 got Complete'),
}; // さっきと同じObserverオブジェクト
// 「データを受け取った時の挙動が書かれているもの」と解釈することも出来る
observable_1.subscribe(observer_1);
// subscribeメソッドを呼び出し、Observerを渡す
// 「データの受け取り手としてobserver_1を指定した」と解釈することもできる
Observer1 got a value: 1
Observer1 got a value: 2
Observer1 got a value: 3
Observer1 got Complete
同じ出力がでました。
さっき、ObserverオブジェクトとSubscribe関数を作って実行した時とほとんど変わりませんね。
Subscribe関数を直接呼ぶ前に、一度Observableのメソッドにしてから呼び出しただけの違いです。
createを使わずに作る
今のと同じObservableオブジェクトを、create
を使わずに作ることが出来ます。
Rx = require('./rx.min.js');//import文などで代用しても可
observable_1 = Rx.Observable.of(1,2,3);//createではなくofを使う
var observer_1 = {
next: x => console.log('Observer1 got a value: ' + x),
error: err => console.log('Observer1 got an error: ' + err),
complete: () => console.log('Observer1 got Complete'),
}; // さっきと同じObserverオブジェクト
observable_1.subscribe(observer_1);
Observer1 got a value: 1
Observer1 got a value: 2
Observer1 got a value: 3
Observer1 got Complete
ポイントは以下の2点です。
-
create
ではなくof
を使った - Subscribe関数を自分で作らなかった
- ofで作ったObservableは
subscribe
メソッドを初めから持っていた
次のようにしても同じです。
observable_1 = Rx.Observable.from([1,2,3]);// createではなくfromを使う
このような、Observableを生成する関数を「Creation Operator」といいます。
RxJSには、ちょうどいいSubscribe関数を持ったObservableを自動で作ってくれるCreation Operatorがいろいろ用意されてます。
Observerオブジェクトの省略記法
次のコードを見てください
Rx = require('./rx.min.js');//import文などで代用しても可
Rx.Observable.of(1,2,3).subscribe(x => console.log("The observer got a value: "+x))
The observer got a value: 1
The observer got a value: 2
The observer got a value: 3
なんか、えらく短いコードで、似たような出力が出ましたね。
ライブラリを読み込む部分を除くと、一行しかありません。
ちょっと、コードの中身を見てみましょう。
Rx.Observable.of(1,2,3).subscribe(なんたらかんたら)
全体はこういう形をしています。
of
を使ってObservableを作り、そのsubscribe
メソッドを呼び出したんですね。
ということは「なんたらかんたら」の部分にはObserverオブジェクトが入ってるはずですね。
でもコード中では
x => console.log("The observer got a value: "+x)
が入ってます。これ、ただの関数ですね。しかも無名の。
実は、subscribe
メソッドの引数にいきなり関数を与えると、Observerのnext関数を指定したことになるんです。
subscribe
の引数には3つまで関数を指定することができて、順にnext
,error
,complete
として扱われます。
今回のコードは、Observerオブジェクトの代わりに、next
関数だけをsubscribe
に渡した、ということになるんですね。
complete
関数を渡していないから、さっきまで出力されていたObserver got Complete
にあたる行は出力されないわけです。
まとめ
- Observableは
subscribe
というメソッドを持つ - Observableは
create
かCreation Operatorを使って作る - Observableの
subscribe
には関数を直接渡せる
Observableのメリット
本当の基礎中の基礎は、ここまででおわりです。とりあえずObservableを作って、動かせる。
でもこれじゃあ、Observableを使う意味がわかりませんね。
一番最初にObservableを使わずにできたことを、Observableを使ってなぞっただけ。
Observableいらねえじゃねえか!って話です。
じゃあObservableは一体何がすごいのか。
非同期処理ができるのがメリット??
よく言われるのが非同期処理です。
では「非同期処理ができること」が、Observableのメリットなんでしょうか??
次のコードを見てください。「// 大事」って書いてあるとこだけが大事です。
Rx = require('./rx.min.js');//import文などで代用しても可
var observer_1 = {
next: x => console.log('Observer1 got a value: ' + x),
error: err => console.log('Observer1 got an error: ' + err),
complete: () => console.log('Observer1 got Complete'),
}; // さっきと同じ
function subscribe_1(observer){
observer.next(1);
observer.next(2);
setTimeout(function(){// 大事
observer.next(3); // 大事
},1000); //大事
}
var observable_1 = Rx.Observable.create(subscribe_1);
// Observableオブジェクトを作る
console.log("before");// 大事
observable_1.subscribe(observer_1);// 大事
console.log("after");// 大事
before
Observer1 got a value: 1
Observer1 got a value: 2
after
Observer1 got a value: 3
「3」が出力されるより先に「after」が出力されてますね。
setTimeout(処理,1000)
は、1秒(1000ミリ秒)だけ待ってから処理を実行する関数ですが
それを待たずして非同期に「after」が出力されましたね。なるほど、非同期処理ができている。
では次のコードを見てください。
var observer_1 = {
next: x => console.log('Observer1 got a value: ' + x),
error: err => console.log('Observer1 got an error: ' + err),
complete: () => console.log('Observer1 got Complete'),
}; // さっきと同じ
function subscribe_1(observer){
observer.next(1);
observer.next(2);
setTimeout(function(){
observer.next(3);
},1000);
}
console.log("before");
subscribe_1(observer_1);// 大事
console.log("after");
before
Observer1 got a value: 1
Observer1 got a value: 2
after
Observer1 got a value: 3
同じ結果が出ましたが、ポイントは、RxJSを使っていないということです。
あれ?非同期処理できてますね????
はい。別にRxJSを使わなくても非同期処理自体はできるんですね。
RxJSは非同期処理をするためのライブラリではありません。
では何なのかというと
非同期処理をスッキリ書くためのライブラリなんです!
オペレータが便利!!
先ほどCreation Operatorというものを使いましたが、これはRxJSの数あるOperatorのうちの1カテゴリーにすぎません。
他にもOperatorのカテゴリーはいろいろあり、それぞれのカテゴリーにはたくさんのOperatorがあります。
その便利さを紹介するために、次のような例を考えてみましょう。
「1秒ごとに流れてくる数字を偶数だけ足していき、随時それを文字列に変換して後ろに!!を付けて出力する」
説明のために作った例なので「そんな場面あります???」って感じですが、まあ、やっていきましょう。
Rx = require('./rx.min.js');//import文などで代用しても可
Rx.Observable.interval(1000)// 1秒ごとに1増える整数を生成
.filter(x => x%2 === 0)// 偶数だけ残す
.scan((acc,curr) => acc+curr,0)// 随時足す
.map(x => String(x)+"!!")// 文字列に変換して!!を付ける
.subscribe(x => console.log(x));// 出力する
0!!
2!!
6!!
12!!
20!!
30!!
(以下、Ctrl+Cなどで止めるまで永遠に続く)
2秒に1つずつ出力されてきます。できましたね!
require
の行を除くと、たったの5行です。すごい。
ここで使っているinterval
,filter
,scan
,map
がOperatorです。
一つ一つのOperatorの解説を始めると「RxJSの基礎の基礎」の範疇じゃなくなるので解説しません。
これらのOperatorは、返り値としてまたObserverを返すので、.
を使ってメソッドチェーンとして繋げて書けるわけです。
どんどんOperatorを適用して、Observerの中身(subscribeメソッドの中身)を変えていくわけですね!
これ、RxJS使わないで書くと何行になるんでしょうね…?
そもそもどうやって書いたら良いのか考えるのが面倒です。
とにかくRxJSを使うと、この手の処理が簡単に書けるんです。
注意(RxJS 6.x以上では、オペレーターの書き方が変わっています)
2018年04月24日に、RxJSのバージョン6.0.0がリリースされました。
RxJS 6.x以上では、この記事内にあるコードはそのままでは動かなくなっています。(まだ記事書いて半年なのに動かなくなるなんて)
必要な変更を簡単に書くと
- operatorは
observable1.pipe(map(...),reduce(...)).subscribe(...);
のように書く - import文を
import { Observable, of } from 'rxjs';
のように書く - operatorのimportは
import { map, reduce } from 'rxjs/operators';
のように書く
となっています。
詳しくは下記のサイトを御覧ください。
RxJS v5.x to v6 Update Guide (本家、英語)
RxJS 6.0 変更点まとめ(適宜更新) (Qiita記事、日本語)
Operatorは配列の操作に似ている
map
やfilter
といったOperatorは、実は配列Arrayオブジェクト
のメソッドにも同じものがあります。
またscan
は、配列で言う所のreduce
と似た動きをしています。(そしてObservableにもreduce
Operatorがあります。)
「配列の操作ならそこそこ慣れてるぞ!」という方は、Observableを「値が順々に送られてくる配列」として考えると、わかりやすいと思います。
まとめ
- Observableは、Operatorをどんどん繋いで操作する
- 非同期処理を、配列を操作する感覚で書けるのがすごい
さて。この記事で、本意気で説明するのはここまでです。
「基礎の基礎」と呼べるのは、このくらいまでの範囲かな。
「ObservableやSubscribeやObserverという『コア』を、Operatorで操作する」ということです。これが基礎中の基礎。
ですが、まだ触れていない、重要な概念がいくつかあるので、ここからはそれらをサクサク紹介していきます。
Subscription
突然ですが、実は、subscribeメソッドは返り値を持ちます。
最初に作ったsubscribe_1
という関数には返り値がありませんでしたが、あれは単に返り値を書かなかっただけで、本当は返り値をちゃんと書くのがあるべき姿です。
ではどんな返り値が要るのかというと、「Subscriptionオブジェクト」というものです。
Subscriptionオブジェクトは、unsubscribe
というメソッドを持ちます。
このメソッドを呼び出すと、Subscribe関数からObserverオブジェクトへデータを渡すのを停止することができます。
例で見てみましょう。
Creation Operatorで作ったObservableのSubscribe関数は、ちゃんとSubscriptionオブジェクトを返してくれます。
先ほど作った止まらないプログラムを、時間指定で止めてみましょう。
Rx = require('./rx.min.js');//import文などで代用しても可
subscription_1 = Rx.Observable.interval(1000)// 返り値を代入
.filter(x => x%2 === 0)
.scan((acc,curr) => acc+curr,0)
.map(x => String(x)+"!!")
.subscribe(x => console.log(x));
setTimeout(function(){
subscription_1.unsubscribe();// unsubscribeを呼び出す
},10000);
0!!
2!!
6!!
12!!
20!!
10秒(10000ミリ秒)で止まりました。想定通りの動きです!
Subject
Subjectというオブジェクトがあります。
これは「Observableでもあり、Observerでもある」と説明されます。
でも、僕はどっちかっていうと、Observerの方が近いかな〜と思います。
基本的なイメージは、複数のObserverをまとめたものがSubjectという感じです。
SubjectにObserverを登録する
Subjectオブジェクトは、subscribe
メソッドを持ちます。
Observableにもsubscribe
メソッドがありましたね。これが「SubjectはObservableでもある」と言われる所以です。
subscribe
メソッドにはもちろん、Observerオブジェクトを渡します。
Rx = require('./rx.min.js');//import文などで代用しても可
var observer_1 = {
next: x => console.log('Observer1 got a value: ' + x),
}; // さっきとほぼ同じObserverオブジェクト
// errorとcompleteを省略しました。それでも動くので大丈夫。
var subject = new Rx.Subject();// Subjectを作成
subject.subscribe(observer_1);// subscribeを呼び出す
ところがこのコード、何も出力しません。
よく見ると、実は、Observerのnext
に渡すデータが、どこからも発生していません。
subscribe
メソッドを呼び出してObserverを渡しただけでは、特に何も実行されないんですね。
こういう所が、Observerと違う所です。
では、Subjectのsubscribe
メソッドにObserverを渡すと何が起こるかというと
そのSubjectに、渡したObserverが登録されます。
一つのSubjectには、複数のObserverを登録することができます。
そして、Subjectのnext
関数を呼び出して実行すると、
登録したObserverたちのnext
関数が一斉に実行されます。
Subjectはnext
という関数をメソッドとして持つんですね。こういう所が、「SubjectはObserverでもある」と言われる所以ですね。
例で見てみましょう。
x = require('./rx.min.js');//import文などで代用しても可
var observer_1 = {
next: x => console.log('Observer1 got a value: ' + x),
complete: x => console.log('Observer1 got Complete'),
}; // さっきとほぼ同じObserverオブジェクト
// errorは使ってなかったので省略しました。
var observer_2 = {
next: x => console.log('Observer2 got a value: ' + x),
complete: x => console.log('Observer2 got Complete'),
}; // さっきとほぼ同じObserverオブジェクト
var observer_3 = {
next: x => console.log('Observer3 got a value: ' + x),
complete: x => console.log('Observer3 got Complete'),
}; // さっきとほぼ同じObserverオブジェクト
var subject_1 = new Rx.Subject();// Subjectを作成
subject_1.subscribe(observer_1);// Observerを登録
subject_1.subscribe(observer_2);// Observerを登録
subject_1.subscribe(observer_3);// Observerを登録
subject_1.next("オラオラオラ");// Subjectのnextメソッドを実行
Observer1 got a value: オラオラオラ
Observer2 got a value: オラオラオラ
Observer3 got a value: オラオラオラ
Subjectのnext関数に渡した「オラオラオラ」が、各Observerにも渡されたのがわかります。
SubjectをObservableのsubscribe
メソッドに渡す
このSubjectをObservableのsubscribe
メソッドに渡してみましょう
大事なのは一番下の行だけです。あとはさっきとほぼ同じなので。
Rx = require('./rx.min.js');//import文などで代用しても可
var observer_1 = {
next: x => console.log('Observer1 got a value: ' + x),
complete: x=> console.log('Observer1 got Complete'),
}; // さっきとほぼ同じObserverオブジェクト
var observer_2 = {
next: x => console.log('Observer2 got a value: ' + x),
complete: x=> console.log('Observer2 got Complete'),
}; // さっきとほぼ同じObserverオブジェクト
var observer_3 = {
next: x => console.log('Observer3 got a value: ' + x),
complete: x=> console.log('Observer3 got Complete'),
}; // さっきとほぼ同じObserverオブジェクト
var subject_1 = new Rx.Subject();// Subjectを作成
subject_1.subscribe(observer_1);// Observerを登録
subject_1.subscribe(observer_2);// Observerを登録
subject_1.subscribe(observer_3);// Observerを登録
Rx.Observable.of(1,2).subscribe(subject_1);// 大事
Observer1 got a value: 1
Observer2 got a value: 1
Observer3 got a value: 1
Observer1 got a value: 2
Observer2 got a value: 2
Observer3 got a value: 2
Observer1 got Complete
Observer2 got Complete
Observer3 got Complete
SubjectをObservableに渡したら、そのSubjectに登録されていた複数のObserverにデータが複製されて流されていきましたね。
このように、Subjectを使うと、一つのObservableから出てきたデータを複数のObserverに渡すことができます。
Subjectの種類
Subjectには、Observerにデータを渡すタイミングが異なるいくつかの亜種みたいなものがあります。
Behavior Subject
最後に送ったデータを保持していて、新しくObserverが登録されるとそのObserverに保持しているデータを送る。(最初は「最後に送ったデータ」がないので、初期値を設定する)
このケースでは、Subjectのsubscribe
メソッドを呼び出してObserverを渡すと、すぐにObserverのnext
が実行されることになります。
Replay Subject
Behavior Subjectに似ていますが、こちらは指定した個数のデータを保持します。
新しくObserverが登録されると、保持しているデータを古い順に全部Observerに渡します。
Async Subject
こちらは、Subjectのnext
メソッドを呼んでも、Observerにまでデータを渡しません。(登録されているObserverのnext
メソッドを呼ばない)
Subjectのcomplete
が呼ばれた時に、最後のデータと完了通知を各Observerに送ります。
Scheduler
最後に、Schedulerオブジェクトの話をします。
Schedulerオブジェクトは、その名の通り、ObservableからObserverにデータを送るタイミングを調整するもので
例えば、Creation OperatorでObservableを作る時に引数として渡したります。
例で見てみましょう。
Rx = require('./rx.min.js');//import文などで代用しても可
var scheduler_1 = Rx.Scheduler.async;// Schedulerを作る
var observer_1 = {
next: x => console.log('Observer1 got a value: ' + x),
complete: x=> console.log('Observer1 got Complete'),
}; // さっきとほぼ同じObserverオブジェクト
console.log("before");
Rx.Observable.of(1,2,3,scheduler_1).subscribe(observer_1);// Schedulerを渡してる
console.log("after");
before
after
Observer1 got a value: 1
Observer1 got a value: 2
Observer1 got a value: 3
Observer1 got Complete
ポイントは、after
がObserver1 got a value: 1
よりも先に来ている、という所です。
Schedulerを渡していなかったら、after
はObserver1 got Complete
の次に来るはずです。
Schedulerの具体的な動きについては今日は説明しませんが、とにかく、Schedulerを使うと実行順序を調整することが出来ます。
create
を使ってObservableを作る場合は、onSubscribe
というメソッドを呼び出してSchedulerを渡すことになります。
宣伝
この記事を読んでいる人のほとんどは、日々作業を抱えている人だと思います。
みんなで、サボらずに、メリハリをつけて作業に取り組むためのwebサービスをReact+NextJSで作成しております。
ぜひ遊びに作業しに来てください!