http://reactivex.io/rxjs/manual/overview.html を勝手に日本語訳
冒頭からObservableまで
#Introduction
RxJSは、observableシーケンスを使用して非同期およびイベントベースのプログラムを構成するためのライブラリです。 1つのコアタイプとしてObservableと、その派生クラス(Observer、Schedulers、Subjects)と、Arrayのメソッド(map、filter、reduce、everyなど)の影響を受けた演算子を提供し、非同期イベントをコレクションとして扱うことができます。
RxJSをイベントにおけるLodashと考えてください。
ReactiveXはObserverパターンとIteratorパターンを組み合わせ、コレクションを使って関数型プログラミングを行うことで、イベントのシーケンスを管理する理想的な方法の要求を満たすことができます。
非同期イベント管理を解決するRxJSの基本的な概念は次のとおりです。
-
Observable
: 呼び出すことのできる未来の値やイベントコレクションという考えを表します。 -
Observer
:Observable
によって配信された値をlistenする方法を知っているコールバックのコレクションです。 -
Subscription
:Observable
の実行を表し、主に実行をキャンセルするのに便利です。 -
Operator
:map
,filter
,concat
,flatMap
などのコレクションを操作するように関数型プログラミングスタイルを可能にする純粋な関数です。 -
Subject
: EventEmitterと同等であり、複数のObservaer
に値またはイベントをマルチキャストする唯一の方法です。 -
Schedulers
: 中央化されたディスパッチャであり、並列性を制御し、計算がいつ起こるかを調整することができます。setTimeout
またはrequestAnimationFrame
など
First examples
通常、イベントリスナーを登録します。
var button = document.querySelector('button');
button.addEventListener('click', () => console.log('Clicked!'));
RxJSを使用すると、代わりにObservableを作成できます。
var button = document.querySelector('button');
Rx.Observable.fromEvent(button, 'click')
.subscribe(() => console.log('Clicked!'));
Purity
RxJSを強力にするのは、純粋な関数を使用して値を生成する能力です。 つまり、コードのエラーが発生しにくくなります。
通常、不純な(副作用のある)関数を作成すると思いますが、これはコードの他の部分が状態を壊すことがあります。
var count = 0;
var button = document.querySelector('button');
button.addEventListener('click', () => console.log(`Clicked ${++count} times`));
RxJSを使用すると、状態を分離できます。
var button = document.querySelector('button');
Rx.Observable.fromEvent(button, 'click')
.scan(count => count + 1, 0)
.subscribe(count => console.log(`Clicked ${count} times`));
scan
演算子は、配列の reduce
と同様に機能します。 それはコールバックに値をとります。 コールバックの戻り値は、次にコールバックが実行されるときに公開される次の値になります。
Flow
RxJSには、observableをどのようにイベントが流れるかを制御するためのさまざまな演算子が用意されています。
これは、1秒間に1回のクリックを許可する方法です。単純なJavaScriptではこうなります:
var count = 0;
var rate = 1000;
var lastClick = Date.now() - rate;
var button = document.querySelector('button');
button.addEventListener('click', () => {
if (Date.now() - lastClick >= rate) {
console.log(`Clicked ${++count} times`);
lastClick = Date.now();
}
});
RxJSでは
var button = document.querySelector('button');
Rx.Observable.fromEvent(button, 'click')
.throttleTime(1000)
.scan(count => count + 1, 0)
.subscribe(count => console.log(`Clicked ${count} times`));
他のフロー制御演算子は、filter、delay、debounceTime、take、takeUntil、distinct、distinctUntilChangedなどです。
Values
observableから渡された値を変換することができます。
単純なJavaScriptで、クリックごとに現在のマウスxの位置を加算する方法は次のとおりです。
var count = 0;
var rate = 1000;
var lastClick = Date.now() - rate;
var button = document.querySelector('button');
button.addEventListener('click', (event) => {
if (Date.now() - lastClick >= rate) {
count += event.clientX;
console.log(count)
lastClick = Date.now();
}
});
RxJSでは
var button = document.querySelector('button');
Rx.Observable.fromEvent(button, 'click')
.throttleTime(1000)
.map(event => event.clientX)
.scan((count, clientX) => count + clientX, 0)
.subscribe(count => console.log(count));
他の値生成演算子は、pluck, pairwise, sampleなどがあります。
Observable
Observablesは複数の値のlazy Pushコレクションです。彼らは次の表の欠落箇所を埋めます:
Single | Multiple | |
---|---|---|
Pull | Function | Iterator |
Push | Promise | Observable |
例: サブスクライブ時に直ちに(同期的に)値1,2,3をプッシュし、サブスクライブ呼び出しから1秒後に値4をプッシュし、その後完了するObservableを次に示します。
var observable = Rx.Observable.create(function (observer) {
observer.next(1);
observer.next(2);
observer.next(3);
setTimeout(() => {
observer.next(4);
observer.complete();
}, 1000);
});
Observableを呼び出してこれらの値を見るには、それを subscribe する必要があります:
console.log('just before subscribe');
observable.subscribe({
next: x => console.log('got value ' + x),
error: err => console.error('something wrong occurred: ' + err),
complete: () => console.log('done'),
});
console.log('just after subscribe');
コンソール上でこれを実行すると:
just before subscribe
got value 1
got value 2
got value 3
just after subscribe
got value 4
done
Pull versus Push
PullとPushは、データプロデューサがデータコンシューマとどのように通信できるかという2つの異なるプロトコルです。
Pullとは何ですか? Pullシステムでは、コンシューマはデータプロデューサからデータを受信するタイミングを決定します。プロデューサ自体は、データがコンシューマにいつ配信されるかを認識していません。
すべてのJavaScript関数はPullシステムです。関数はデータのプロデューサであり、関数を呼び出すコードは呼び出しから単一の戻り値を「引き出す」ことによってそれを消費しています。
ES2015では、ジェネレータ関数とイテレータ( function*
)、Pullシステムの別のタイプが導入されました。 iterator.next()
を呼び出すコードはコンシューマであり、イテレータ(プロデューサ)から複数の値を引き出す。
Producer | Consumer | |
---|---|---|
Pull | Passive: 要求されたときにデータを生成します。 | Active: データの要求時期を決定します。 |
Push | Active: 独自のペースでデータを生成します。 | Passive: 受信したデータに反応します |
Pushとは何ですか? Pushシステムでは、プロデューサがデータをコンシューマに送信するタイミングを決定します。コンシューマは、そのデータをいつ受信するのか分からない。
Promiseは今日のJavaScriptの中で最も一般的なPushシステムのタイプです。Promise(プロデューサ)は、登録されたコールバック(コンシューマ)に解決された値を渡しますが、関数とは異なり、その値がコールバックに「プッシュ」されたときを正確に決定するのはプロミスです。
RxJSはObservable、JavaScriptのための新しいPushシステムを導入しました。 Observableは複数の値のプロデューサで、Observer(消費者)に「Push」します。
- 関数 は遅延評価された計算で、呼出し時に単一の値を同期的に返します。
- ジェネレータ は遅延評価された計算で、反復でゼロから(潜在的に)無限の値を同期的に返します。
- Promise は、最終的に単一の値を返す可能性がある(またはしない)計算です。
- Observable は遅延評価された計算で、それ以降に呼び出された時刻からゼロから(潜在的に)無限の値を同期的または非同期的に返すことができます。
Observables as generalizations of functions
一般的な主張とは異なり、ObservableはEventEmittersのようなものではなく、複数の値に対するPromiseのようなものでもありません。ObservableはEventEmittersのように動作する場合もあります。つまり、RxJS Subject
を使用してマルチキャストされている場合ですが、通常はEventEmittersのように動作しません。
Observableは引数がゼロの関数と似ていますが、複数の値を許容するために一般化しています。
次の点を考慮してください。
function foo() {
console.log('Hello');
return 42;
}
var x = foo.call(); // same as foo()
console.log(x);
var y = foo.call(); // same as foo()
console.log(y);
期待される出力:
"Hello"
42
"Hello"
42
Observablesを使って、上記と同じ動作を書くことができます:
var foo = Rx.Observable.create(function (observer) {
console.log('Hello');
observer.next(42);
});
foo.subscribe(function (x) {
console.log(x);
});
foo.subscribe(function (y) {
console.log(y);
});
そして出力は同じです:
"Hello"
42
"Hello"
42
これは、関数とObservableの両方が遅延計算であるために発生します。この関数を呼び出さないと、 console.log('Hello')
は実行されません。 Observableでも、( subscribe
によって)それを呼び出さないと、 console.log('Hello')
は発生しません。さらに、「呼び出し」または「サブスクライブ」は独立した操作です。2つの関数呼び出しが2つの独立した副作用を引き起こし、2つのObservableサブスクリプションが2つの別々の副作用を引き起こします。 Observableは、副作用を共有し、subscriberの有無に関わらず即時実行するEventEmitterとは対照的に、共有実行を持たず、遅延実行です。
Observableをsubscribeすることは、Functionに対する呼び出しと似ています。
Observablesは非同期であると主張する人もいます。それは本当ではありません。関数呼び出しをログで囲む場合、次のようになります。
console.log('before');
console.log(foo.call());
console.log('after');
次の出力が表示されます。
"before"
"Hello"
42
"after"
Observablesと同じ動作です。
console.log('before');
foo.subscribe(function (x) {
console.log(x);
});
console.log('after');
出力は次のとおりです。
"before"
"Hello"
42
"after"
これはfooのサブスクリプションが関数のように同期していることを証明しています。
Observableは、値を同期的または非同期的に配信できます。
Observableと関数の違いは何ですか? Observableは時間の経過とともに複数の値を返すことができます が、関数はそうすることはできません。あなたはこれをすることはできません:
function foo() {
console.log('Hello');
return 42;
return 100; // dead code. will never happen
}
関数は1つの値だけしか返すことができませn。 Observablesはこれを行うことができます:
var foo = Rx.Observable.create(function (observer) {
console.log('Hello');
observer.next(42);
observer.next(100); // "return" another value
observer.next(200); // "return" yet another
});
console.log('before');
foo.subscribe(function (x) {
console.log(x);
});
console.log('after');
同期出力の場合:
"before"
"Hello"
42
100
200
"after"
しかし、値を非同期的に「返す」こともできます。
var foo = Rx.Observable.create(function (observer) {
console.log('Hello');
observer.next(42);
observer.next(100);
observer.next(200);
setTimeout(() => {
observer.next(300); // happens asynchronously
}, 1000);
});
console.log('before');
foo.subscribe(function (x) {
console.log(x);
});
console.log('after');
出力:
"before"
"Hello"
42
100
200
"after"
300
結論:
-
func.call()
は、「1つの値を同期的に渡して下さい」という意味です。 -
observable.subscribe()
は、「同期的または非同期的に、任意の量の値を渡してください」という意味です。
Anatomy of an Observable
Observable
は、 Rx.Observable.create
または作成オペレータを使用して 作成 され、 Observer
に
subscribe され、 Observer
に next
/ error
/ complete
を通知するために 実行 され、それらの実行は dispose
(廃棄)されます。これらの4つの側面はすべてObservableインスタンスでエンコードされますが、ObserverやSubscriptionなどの他のタイプに関連するものもあります。
Observable
のコアな関心:
- Creating Observables
- Subscribing to Observables
- Executing the Observable
- Disposing Observables
Creating Observables
Rx.Observable.create
は Observable
コンストラクタのエイリアスであり、1つの引数、 subscribe
関数をとります。
次の例では、Observableを作成してオブザーバに1秒ごとに文字列 'hi'
を出力します。
var observable = Rx.Observable.create(function subscribe(observer) {
var id = setInterval(() => {
observer.next('hi')
}, 1000);
});
Observable
はcreate
で作成できますが、通常、of
、from
、interval
などのいわゆる作成オペレータを使用します。
上記の例では、 subscribe
関数はObservableを記述するための最も重要な部分です。subscribeの意味を見てみましょう。
Subscribing to Observables
この例のObservable observable
は、次のようにサブスクライブすることができます。
observable.subscribe(x => console.log(x));
Observable.create(function subscribe(observer) {...})
の observable.subscribe
と
subscribe
が同じ名前を持つことは偶然ではありません。ライブラリの内部では、それらは異なっていますが、実用的な目的では概念的にはそれらを同じものと見なすことができます。
これは、同じObservableの複数のObserver間で subscribe
の実行が共有されない様子を示しています。 Observerを使用して observable.subscribe
を呼び出すとき、 Observable.create(function subscribe(observer) {...})
の中の subscribe
関数がそのObserverに対して実行されます。
observable.subscribe
への各呼び出しは、与えられたObserverに対する独自のセットアップを引き起こします。
Observableをsubscribeすることは、データを配信するコールバックを提供して関数を呼び出すことに似ています。
これは、 addEventListener
/ removeEventListener
などのイベントハンドラAPIとは大きく異なります。
observable.subscribe
では、指定されたObserverはObservableのリスナーとして登録されません。 Observableは添付されたオブザーバーのリストを維持していません。
subscribe
の実行は、「Observable execution」を開始し、その実行のオブザーバーに値またはイベントを渡すための単なる方法です。
Executing Observables
Observable.create(function subscribe(observer) {...})
内のコードは、Observable Executionを表します。これは、登録しているObserverごとに発生する遅延計算です。実行は、時間の経過とともに、同期的または非同期的に複数の値を生成します。
Observable Executionが提供できる値には3つのタイプがあります。
- "Next":数値、文字列、オブジェクトなどの値を送信します。
- "Error":JavaScriptエラーまたは例外を送信します。
- "Complete":値を送信しません。
Nextの通知は、最も重要で最も一般的なタイプです。これらは、Observerに配信される実際のデータを表します。ErrorとCompleteな通知はObservable Execution中に一度だけ発生し、どちらか一方しか存在できません。
これらの制約は、いわゆる Observable Grammar or Contract でよく表現され、正規表現として書かれています。
next*(error|complete)?
Observable Executionでは、ゼロから無限のNext通知が配信されることがあります。ErrorまたはComplete通知のいずれかが配信された場合、その後は他のものは配信されません。
3つのNextの通知を配信し、その後完了するObservable実行の例を次に示します。
var observable = Rx.Observable.create(function subscribe(observer) {
observer.next(1);
observer.next(2);
observer.next(3);
observer.complete();
});
Observableは厳密にObservable Contractを遵守しているため、次のコードは 4
のNext通知を提供しません。
var observable = Rx.Observable.create(function subscribe(observer) {
observer.next(1);
observer.next(2);
observer.next(3);
observer.complete();
observer.next(4); // Is not delivered because it would violate the contract
});
例外をキャッチすると、エラー通知を送信するtry / catchブロックを使用して、 subscribe
でコードをラップすることをお勧めします。
var observable = Rx.Observable.create(function subscribe(observer) {
try {
observer.next(1);
observer.next(2);
observer.next(3);
observer.complete();
} catch (err) {
observer.error(err); // delivers an error if it caught one
}
});
Disposing Observable Executions
Observable Executionは無限大である可能性があるので、、Observerが有限時間内に中止の実行をしたいのが一般的です。実行をキャンセルするAPIが必要です。各実行は1つのオブザーバにのみ排他的であるため、オブザーバが値を受け取ると、計算パワーまたはメモリリソースの浪費を避けるために、実行を停止する方法が必要です。
observable.subscribe
が呼び出されると、Observerは新しく作成されたObservable実行にアタッチされますが、この呼び出しは Subscription
オブジェクトを返します。:
var subscription = observable.subscribe(x => console.log(x));
サブスクリプションは進行中の実行を表し、その実行を取り消すことができる最小限のAPIを持っています。サブスクリプションタイプの詳細はこちらをご覧ください。
subscription.unsubscribe()
を使用すると、実行中の実行を取り消すことができます。
var observable = Rx.Observable.from([10, 20, 30]);
var subscription = observable.subscribe(x => console.log(x));
// Later:
subscription.unsubscribe();
subscribeすると、進行中の実行を表すサブスクリプションが返されます。実行をキャンセルするには、
unsubscribe()
を呼び出してください。
各Observableは、 create()
を使用してObservableを作成するときに、その実行のリソースを処分する方法を定義する必要があります。これは、 function
subscribe()内からカスタムの
unsubscribe` 関数を返すことで行うことができます。
たとえば、これは setInterval
でインターバル実行セットをクリアする方法です:
var observable = Rx.Observable.create(function subscribe(observer) {
// Keep track of the interval resource
var intervalID = setInterval(() => {
observer.next('hi');
}, 1000);
// Provide a way of canceling and disposing the interval resource
return function unsubscribe() {
clearInterval(intervalID);
};
});
observable.subscribe
が Observable.create(function subscribe() {...})
に似ているのと同様に、私たちが subscribe
から返す unsubscribe
は概念的には subscription.unsubscribe
と同じです。実際に、これらのコンセプトを取り巻くReactiveX特有のコードを削除すると、わかりやすいJavaScriptが残っています。
function subscribe(observer) {
var intervalID = setInterval(() => {
observer.next('hi');
}, 1000);
return function unsubscribe() {
clearInterval(intervalID);
};
}
var unsubscribe = subscribe({next: (x) => console.log(x)});
// Later:
unsubscribe(); // dispose the resources
Observable、Observer、SubscriptionのようなRxの型を使用する理由は、安全性(Observable Contractなど)とオペレータとのcomposabilityを得るためです。