はじめに
前回の記事の続きです。
前回触れたように、非同期処理を操作しやすくする仕組みはありますが、より高機能に非同期処理を扱うためのライブラリが存在します。その一つがRxJSです。
RxJS
RxJS is a library for composing asynchronous and event-based programs by using observable sequences. It provides one core type, the Observable, satellite types (Observer, Schedulers, Subjects) and operators inspired by Array#extras (map, filter, reduce, every, etc) to allow handling asynchronous events as collections.
RxJSはobservable sequenceを利用した、非同期やイベント処理のためのライブラリです。
Observableを中心として、その他いくつかの型や、非同期イベントをコレクションとして扱うためのオペレーターを提供します。
引用元:https://rxjs-dev.firebaseapp.com/guide/overview
中心的な概念であるObservableについて整理します。
Observable
GoFのデザインパターンの一つである「Observer」パターンを使用しています。
詳しい説明等は割愛しますが、観察する人(Observer)とされる人(Subject)が存在し、Subjectの状態が変化した時に、Observerに通知されるといった仕組みです。
非同期処理の文脈で考えると、非同期処理によって値(Subject)の状態が変化した時に、呼び出し元(Observer)に通知(非同期処理結果)が渡される、といった流れになります。
これだけを見ると「Promiseでいいじゃん」となりますが、Promiseは、一回の非同期処理を呼び出し、その結果を待って処理をするものです。
いわゆるpull型で、「状態の観察」となると定期的に処理を呼び出す必要があります。
push型、つまり状態が変化するたびに通知を受領することができるのがObservableです。
observable sequenceと呼ばれるデータが流れる道(Stream)に、Subjectの状態が変化するたびにデータが流れ、それをObserverが受け取る、といったイメージです。
サンプル
では、Angular公式にあるサンプルコードを見て、実際の使い方を学んでいきます。
コードの内容は、「位置情報の更新の取得」です。
コメントを編集・追加・和訳しています。
// Observableの作成。
const locations = new Observable((observer) => {
// observerを引数にとる、subscribeが呼び出された時のメソッドを渡す
// nextは値が提供された時のメソッド、errorはエラー時のメソッド
const {next, error} = observer;
let watchId;
if ('geolocation' in navigator) {
// 位置情報取得処理呼び出し
// 成功時にはnextにpositionが渡され、watchIdが更新される。
watchId = navigator.geolocation.watchPosition(next, error);
} else {
error('Geolocation not available');
}
// 監視終了(unsubscribe)時、watchIdをクリアする
return {unsubscribe() { navigator.geolocation.clearWatch(watchId); }};
});
// Observableを監視(subscribe)開始
const locationsSubscription = locations.subscribe({
// observer(通知を受け取る処理)を渡す
next(position) { console.log('Current Position: ', position); },
error(msg) { console.log('Error Getting Location: ', msg); }
});
// 10秒後に監視終了
setTimeout(() => { locationsSubscription.unsubscribe(); }, 10000);
Observableを作成後、subscribeメソッドを呼び出すことで、監視を開始します。
Observableは、渡されたobserverのnextメソッドに監視対象(例だとposition)を渡すことで変更を通知し、observerが処理を行います。
まとめると
- Observableの作成(
new Observable()
) - subscribeメソッドで監視開始
- 状態変化をobserverが処理(
next(position) { console.log('Current Position: ', position); }
) - unsubscribeメソッドで監視終了(
locationsSubscription.unsubscribe()
)
という流れで非同期処理を扱っています。
その他、Observableを利用したAngularでよくある例としては、HttpClientを用いたAPIの呼び出しがあります。
http.get('/api/url').subscribe((response => {
// APIの結果を処理するコード
}));
なんとなくで使っていましたが、これも
- Observableの作成(
http.get('/api/url')
) - subscribeメソッドで監視開始
- 状態変化をobserverが処理(
(response)=>{}
)
という処理となっています。
PromiseとObservable
改めてPromiseとObservableの違いについて整理します。
項目 | Promise | Observable |
---|---|---|
非同期処理開始タイミング | クラス作成時 | subscribe呼び出し時 |
値を渡す回数 | 1回 | unsubscribeするまで値が変化した回数 |
非同期処理キャンセル | 不可 | 可能(unsubscribe) |
参考:https://angular.jp/guide/comparing-observables#observable-と-promise-の比較 |
その他にも様々な違いがあるとは思いますが、一番の違いは1回限りか、継続するかの違いだと思います。
オペレーター
RxJSにはオペレーターという関数が用意されています。
Observableを受け取り、受け取った値を変更後、新たなObservableを返す関数です。
渡される値を配列のように操作するための関数、というイメージで良いと思います。
いくつかのオペレーターについて、再び公式のサンプルを見て確認していきます。
map
渡された値それぞれに対して処理を行った結果をObservableとして返します。
import { map } from 'rxjs/operators';
// Observableの作成
// 1,2,3を順番に渡すObservable
const nums = of(1, 2, 3);
// 受け取った値を二乗にするmapオペレーター
const squareValues = map((val: number) => val * val);
// mapにObservableを渡す
const squaredNums = squareValues(nums);
// mapを通した結果がObservableとして返る
squaredNums.subscribe(x => console.log(x));
// Logs
// 1
// 4
// 9
pipe
渡された値に対して、複数の関数を順次実行した結果をObservableとして返します。
import { filter, map } from 'rxjs/operators';
// 1,2,3,4,5を順番に渡すObservable
const nums = of(1, 2, 3, 4, 5);
// filterとmapを組み合わせたオペレーターを作成
// filterで奇数のみを抽出後、mapで二乗する
const squareOddVals = pipe(
filter((n: number) => n % 2 !== 0),
map(n => n * n)
);
// pipeにObservableを渡す
const squareOdd = squareOddVals(nums);
// pipeを通した結果が新たなObservableで返る
squareOdd.subscribe(x => console.log(x));
とりあえず取り上げるのはここまでです。
そのほかにもたくさんのオペレーターがあるので、公式APIを参考にしてください。
まとめ
- Promise
- 非同期処理を実行し、値を取得する
- 値の取得は1回限り
- Observable
- subscribe~unsubscribeまでの間、値の状態を監視する
- 値が変化するたびにobserverに値を渡す
- オペレーター
- Observableを加工するための関数
PromiseとObservableの違いについては整理できました。
間違い・指摘・アドバイス等あればコメントをよろしくお願いします。
参考
デザインパターン「Observer」
AngularのRxJSを使ってデータの受け渡しをする
Angular2のHttpモジュールを眺めてベストプラクティスを考える