はじめに
この記事の対象者
- これから Angular を始める方
- Rx, RxJS についてイメージをなんとなくイメージをつかみたい方
この記事は何?
Angular は、非同期処理を効率よく実装するために RxJS というライブラリを標準で採用しています。
ただ、この RxJS というライブラリを扱うためには、従来のプログラミングとは少し違った思考が必要です。
自分の場合は、何も知らない状態で公式ドキュメントを読みに行っても利点や具体的な使い方がピンときませんでした。
理解が進まなかった最大の要因は、Rx という概念のイメージを持っていなかったからです。
もし最初からそのイメージを持っていたのであれば、私の学習はもう少しスムーズに進んでいたと思います...orz
というわけで この記事では
- これから Angular6 を始めたいけど RxJS とか言うライブラリも学ぶ必要があるらしい...どうしよう
- Rx を学びたいけど、どこから手をつけて良いかわからない
- Rx がなんとなくどんなものか知りたい
といった方向けに、RxJS を使うメリットと概念の大まかなイメージ、そして頻出する使い方に的を絞って紹介してみます。
RxJS とは
とりあえず引用
リアクティブエクステンション
Reactive Extensions(Rx)は、観測可能なシーケンスとLINQ形式のクエリ演算子を使用して、非同期およびイベントベースのプログラムを作成するためのライブラリです。
データシーケンスには、ファイルやWebサービスからのデータストリーム、Webサービスへのリクエスト、システム通知、ユーザーが行うアクションのイベント等、様々な形式があります。
- 軽く読み流して次に行きます
Angular で RxJS を使うメリット
Rx の世界で扱う値は固定されたものではなく、常に変動する可能性があるストリームです。
ストリームにはユーザーアクションによるイベント値や API のレスポンス結果などの非同期的なもの、数字や文字列などの同期的な値等、何でも流し込むことができます。
どんな値でもストリームに流し込み、Rx が提供する共通のフォーマットで加工やタイミングに関する処理を行うことができる。
イベントであろうと、API のレスポンスであろうと、同じ形式で便利なスニペットを介した見通しの良いコードが書ける。
これが Rx を利用する最大の利点です。
JavaScript では配列やオブジェクトの操作を行う標準 API が不足しているため、lodash
というライブラリを利用する機会が多いと思います。
Rx は Promise 版 lodash
と表すとしっくりくる?かもしれません。
私が勝手に タイミング処理 と呼んでいるものは、具体的に言うと次のような実装のことです。
- 高速で連続するイベントを 50ms 毎に抑える
- ブラウザのスクロール制御で頻出
- 最後に検知したイベントから 100ms 経過したら
- フォーム関連の処理で頻出
- イベントが一定時間内に 2 回以上発生していたら
- ダブルクリック制御等
Angular ( SPA ) では、こういったタイミングに関わる処理が頻出し、その都度標準の setTimeout
等を使用して実装していては無駄が多く、非常に読みづらいものになってしまうでしょう。
RxJS を利用することで、あらゆるデータのマージ、フィルター、マッピング等を時間軸の処理と合わせて簡単に行うことができます。
Rx のイメージ
物事を理解する上で最も大事なことはイメージへの変換です。
この章では、ここまで文章の羅列だった Rx の概念をイメージへ昇華します。
今回は Rx を全く知らない方向けに簡単なストーリーを作ってみました。
このストーリーをなぞっていくことで、Rx の大まかな概念を学ぶことができるかと思います。
- 夏になると稀に桃が流れてくる川 ( stream ) があります
- もちろん、桃 ( value ) 以外の魚 ( value ) 等も流れてきます
- 秋、冬、春 に桃は流れません
- あなたはこの不思議な川を使って材料費 0 で桃缶を製造・販売し、大儲けしたいと考えています
- そこで、川から自動的に桃を回収 ( filter ) し、桃缶へ変換 ( map ) するシステムを構築しました
- システムの稼働には電力を消費します
- 夏になったらシステムを稼働させます ( subscribe )
- 夏以外に桃は流れてこないので、無駄な電力を消費しないようにシステムを停止します ( unsubscribe )
Rx の最重要キーワードである「ストリーム」は、よく川に例えられます。
現実に存在している川は、私達が何をするまでもなく勝手に流れ続けています。
同じようにスクロールのイベントストリームを用意したとしても、それだけでは何も起きません。
お話と同じように、管理者が装置のスイッチを入れる・購読する ( subscribe ) ことで、初めて缶詰製造装置 ( operators ) に桃 ( value ) が流れ、目的の処理が実行されます。
Rx 初心者が一瞬ハマる事例として、subscribe をし忘れてしまい値が流れない原因を探し回る...といった事があると思います?
しかし最初からこのストーリーを思い浮かべることができれば、同じようなミスで時間をムダにすることは無いはずです。
また、お話の中に出てくる電力は、現実の問題で言うとクライアントの CPU リソースとなります。毎月請求の来る電気代と異なり、メモリリークは借金が積み重なってやっと姿を表す事が多いでしょう。
そのため、コンポーネントを破棄した際は内部で購読しているストリームの購読解除を忘れないようにしましょう。
最後に、このお話を実際のコードに変換すると次のようになります。
private subscription: Subscription;
ngOnInit() {
this.subscription = of('桃', 'ヤマメ').pipe(
filter(v => v === '桃'),
map(v => v + '缶')
).subscribe(console.log);
}
ngOnDestroy() {
this.subscription.unsubscribe();
}
桃缶
RxJS の概念まとめ
Rx の概念についてはおおよそ前章のストーリーで理解できたと思いますが、もう一度文章でも振り返ってみます。
公式のガイドラインでは、Rx というライブラリを次の式で表しています。
Rx = Observables + LINQ (Operators) + Schedulers
先程の話で言うと Observables が川、イベントや非同期処理などの観測対象でありストリームの出発地点です。
また、Operators はストリームに流れてくる値をどう加工するかを決める装置と見なせます。
そしてそれらを購読 ( subscribe ) すれば、リアクティブプログラミングとして一連の流れを実行することができます。
なお Schedulers は利用頻度が低いため、説明を省略します。
Subject について
一段落したような感じになってしまったのですが、もう一つ Subject という概念を説明しなければなりません。
この Subject というクラスは、Rx で組まれたロジックの中で通知や、一時的な値の保存など、様々な使われ方で頻出します。
本当は、話の流れを遮ってしまうので説明を省きたいところではあるのですが、最低限のイメージを掴むための説明はしておこうと思います。
Subject は先程のイメージと合わせて例えると、ダムに近い存在だと思っています。
そのダムは川につながっており、ダムから流れてきた値を観測することもできますし、逆に外側から値をセットすることもできます。
言うなれば、ストリーム版の変数といったところです。
Subject には様々な種類があるので一概には説明できませんが、イメージはこんな感じだと思います。
頻出 Observable & Operators
ここまで理解したら、あとはひたすらどんな川 ( Observable ) があるのか?どんな工具や装置 ( Operators ) があるのか?を覚えていくだけです。
今回は実際に自分がよく使うと感じたものを中心に、簡単な利用シーンと共にまとめてみました。
それぞれに詳しく説明を入れると文章量がとんでもないことになってしまうため、このリストでは具体的な用途とイメージを優先して記載します。
詳しい使い方は公式ドキュメントを参照していただければと思います。
Observable
from
- Promise や iterator を持つ値 ( string, array 等 ) を Observable に変換
**利用例:**API のレスポンス結果を加工して必要な値を取り出す
from(fetch('https://jsonplaceholder.typicode.com/posts/1').then(r => r.json())).map(v => v. userId).subscribe(console.log);
// 1
fromEvent
- イベントを Observable に変換
**利用例:**クリックイベントの捕捉
const click$ = fromEvent(document, 'click');
click$.subscribe(console.log);
merge
- 流れてきた値をそのまま結合
**利用シーン:**別々のイベントを合成して1つのトリガーとしてまとめる
const click$ = fromEvent(targetElement, 'click');
const mouseover$ = fromEvent(targetElement, 'mouseover');
merge(click$, mouseover$).subscribe(() => {
// 目的の処理
});
of
- 値を Observable に変換
**利用シーン:**テスト、確認用、ストリーム分岐・結合時のつじつま合わせ
const hoge$ = of(1, 2, 3);
const huga$ = fromEvent(document, 'click');
merge(hoge$, huga$).subscribe(console.log);
interval
- 一定時間ごとに値を流す
**利用シーン:**経過時間表示
this.count$ = interval(1000);
// wait 1sec
// 0
// wait 1sec
// 1
// wait 1sec
// 2
// ...
<div>count: {{ count$ | async }}</div>
concat
- ストリームの順序を保ったまま結合
**利用シーン:**アプリに保持しているコンテンツキャッシュと API レスポンスを合成し、瞬時にキャッシュを表示 → 正確なデータに切り替え
this.article$ = concat(this.store.select(getSelectArticle), this.articleDb.findByKey(articleKey));
Operators
tap ( 旧 do )
- ストリームに影響を与えず、任意の処理を行う
**利用シーン:**ログの表示
stream$
.pipe(
tap(console.log),
tap(console.warn),
tap(console.error),
)
.subscribe();
map / pluck
- ストリームの値を加工・変換・抽出
**利用シーン:**API のレスポンス結果を加工する ( 必要な値を取り出す )
const apiResponse$ = of({ userId: 1, body: 'hoge huga piyo' });
const userId$ = apiResponse$.pipe(map(v => v.userId));
// ---------------------------------------------------
// 値を取得するだけなら pluck で少しシンプルに書ける
const userId$ = apiResponse$.pipe(pluck('userId'));
filter
- 値の取捨選択
**利用シーン:**ルーティング開始イベント発生時にローディングスピナーを表示
const routerEvent$ = this.router.events;
routerEvent$
.pipe(filter(e => e instanceof NavigationStart))
.subscribe(() => this.store.dispatch(new ShowLoadingSpinnerAction()));
skip
- 値のスキップ
**利用シーン:**コンポーネント生成直後のみ連動する処理をスキップする
// 初回に流れてくる値は、ユーザーアクションによって変更された値ではないのでスキップ
this.route.params.pipe(pluck('categoryId'), skip(1)).subscribe(categoryId => {
console.log(`changed categoryId: ${ categoryId }`);
});
scan
- 直前の値を使い回す
**利用シーン:**無限スクロールのアイテムリスト管理
this.items$ = nextItemSubject$.scan((acc, curr) => {
return acc.concat(curr);
}, []);
take
- 値を流す回数を決定
**利用シーン:**変動する値の最初の x 回を利用する
// take(1) しないとストアの値が更新されるたびに API をコールしてしまう
this.store.select(getUserId).pipe(
take(1),
concatMap(userId => this.apiService.get(userId))
).subscribe();
startWith
- 最初に流す値を指定
**利用シーン:**経過時間表示(改良版)
※ interval だけを使用した場合は、最初の一秒間何も表示されていませんでした
this.count$ = interval(1000).pipe(map(v => v + 1), startWith(0));
// 0
// wait 1sec
// 1
// wait 1sec
// 2
// ...
<div>count: {{ count$ | async }}</div>
takeUntil
- 値が流れたら処理を中断
**利用シーン:**コンポーネントを破棄する際、Subject を介してストリーム終了通知を送る
※ ただし、この記事 で紹介されているメモリリークに注意
private onDestroy$ = new Subject();
ngOnInit() {
interval(1000).pipe(takeUntil(this.onDestroy$)).subscribe(console.log);
}
ngOnDestroy() {
this.onDestroy$.next();
}
concatMap
- Observable を解決しつつ結合(実行中の処理が終了してから次に移る)
**利用シーン:**API1 のレスポンス結果を使って API2 を呼び出す
switchMap
- Observable を解決しつつ結合(次の値が来た場合、実行中の処理を中断)
**利用シーン:**auto complete 機能の実装
debounceTime
- 最後の値から一定時間経過したら次に進む
**利用シーン:**auto complete 機能の実装
this.autoCompleteList$ = this.form.valueChanges.pipe(
debounceTime(100),
switchMap(input => this.apiService.get(input)),
);
throttleTime
- 値の流れる速度を抑制
**利用シーン:**スクロールイベントの抑制
fromEvent(window, 'scroll').pipe(throttleTime(50)).subscribe(console.log);
withLatestFrom
- 合成したストリームの最新値を結合する
**利用シーン:**クリックしたユーザーの ID を GA イベントとして送信
fromEvent(targetElement, 'click').pipe(
withLatestFrom(this.store.select(getUserId))
).subscribe(([_, userId]) => {
this.analyticsService.sendEvent({ category: 'test', action: 'click', userId });
})
combineLatest
- メイン・合成されたストリーム共に変更が加わった場合、それぞれの最新値を流す
**利用シーン:**フォームの入力値とストア情報を結合
this.form.valueChanges.pipe(
combineLatest(this.store.select(getUserId))
).subscribe(([input, userId]) => {
console.log(`userId( ${ userId } ) さんが ${ input } を入力中です...`);
});
publish, share, refCount...etc
- cold ストリームを hot 化
cold / hot についての概念はコチラ の記事に良くまとまっていて、大変参考になります
**利用シーン:**API1 のレスポンス結果を使って API2、API3 を並列で実行する
※ API1 のストリームを hot 化することで、API1 が複数回コールされることを防ぎます
const userId$ = from(fetch('https://jsonplaceholder.typicode.com/posts/1').then(r => r.json())).pipe(
pluck('userId'),
publishReplay(1),
refCount()
);
userId$.concatMap(userId => this.api2Service.get(userId)).subscribe(console.log);
userId$.concatMap(userId => this.api3Service.get(userId)).subscribe(console.log);
RxJS6 の import について
最後に実際に使う際避けては通れない import について軽く説明します。
バージョン 6 未満の RxJS を利用するためには import { Observable } from 'rxjs/Observable'
や import { Subject } from 'rxjs/Subject'
のように、それぞれ直に import する必要がありました。
しかし、6 からは使用していないコードを上手く省いてくれるようになりました。( tree shaking )
RxJS6 ~ の import サイトは下記の 5 つです。
- rxjs
- rxjs/operators
- rxjs/testing
- rxjs/webSocket
- rxjs/ajax
とりあえず、太字の 2 つを覚えれば OK です。
Observable、Subject 系 及び Subscription
import { Observable, concat, Subject, Subscription } from 'rxjs';
Operators
import { map, tap } from 'rxjs/operators';
おわり
本記事では自分が初めて Rx に触れた時のことを思い出し、その際知りたかった Rx のイメージや具体的な利用シーンを中心にまとめてみました。
Rx の入門記事はWeb 上に多々存在しますが イラストで例えた記事は見つからなかったので、この記事が Rx のイメージを構築する手助けになれば幸いです。
なお、文章中では Observable や Subject を無理やり川とダムに変換しているため、実際の挙動と異なる部分もあると思いますが、おおまかなイメージを掴むため...ということでご了承下さい。
※ 2018/08 追記
この神記事 ( https://blog.angularindepth.com/learn-to-combine-rxjs-sequences-with-super-intuitive-interactive-diagrams-20fce8e6511) で各オペレーターの動きがアニメーションでわかりやすく理解できます。是非チェックしてみて下さい!