2年前にRxJSで始めるRx(Reactive Extensions)入門1 - Qiitaという記事を書いたのですが、当時のRxJSのバージョンがもう古いもの(4系)で、それが残り続けてるのも申し訳ないので、RxJS5に関しての記事を書き直してみました。
RxJS (Observable) は何がうれしいのか?
非同期処理、イベント処理などをすべて、統一したストリームというもので扱える。RubyのEnumerationやScalaのそれみたいな、コレクション操作のライブラリっぽく扱えます。
ES2015になってPromiseというとても強力な環境がJavaScriptで使えるようになりましたが、Promiseは1回処理が完了してしまうとそこでおしまいです。たとえば、EventEmitterのように何回も処理するような目的には使えません。
使い方
$ npm i rxjs -S
$ yarn add rxjs
npmでもyarnでもお好きな方でインストールしてください。
const Rx = require('rxjs')
Rx.Observable.interval(1000)
.subscribe(msg => console.log(msg, new Date().toLocaleTimeString()))
// 0 '20:37:32'
// 1 '20:37:33'
// 2 '20:37:34'
// ...
setIntervalの使いやすくなったヤツです。Rx.Observable.interval(1000)
で1000ms(1秒)ごとに0からカウントアップする数字を流すストリームを作ります。
subscribeは言ってみればストリームの終着点です。interval
であれば設定した時間ごとにsubscribeに登録した関数が呼び出されますし、Promiseベースであれば、Promiseのthenの代わりに呼び出されます。
ストリームを生成する
interval
の他にも、Eventから生成する、Promiseから生成する、固定値から生成するなど色々な手段でストリームは生成できます。Creation · learn-rxjsを読むのが手っ取り早いでしょう。
地味に便利なのがrangeです。
const Rx = require('rxjs')
Rx.Observable.range(0, 10)
.subscribe(msg => console.log(msg))
// 0
// 1
// ...
どうしてもJSでレンジを扱うにはRubyと比べて面倒な感じがありました。generatorを作る、あるいはObject.keys(Array(10))みたいなトリッキーなコードを書く羽目になりましたが、Rx.Observable.range
ならストリームとして簡単に使えます。
オペレータで加工する
const Rx = require('rxjs')
Rx.Observable.range(0, 10)
.map(n => n * 2)
.subscribe(msg => console.log(msg))
さっきのrange
の出力を加工するオペレータです。JSでも配列などのIterableではmapメソッドを便利に皆さん使っていると思いますが、同じことがRxJSでも可能です。
const Rx = require('rxjs')
const source = Rx.Observable.of(0, 0, 1, 2, 1, 2, 2)
source.subscribe(n => process.stdout.write(`${n} `))
// 0 0 1 2 1 2 2
console.log('\n')
source
.distinctUntilChanged()
.subscribe(n => process.stdout.write(`${n} `))
// 0 1 2 1 2
console.log('')
mapだけだととても寂しいですが、様々なオペレータがあるのがRxJS (というかRx)の特徴です。distinctUntilChanged
というオペレータを使えば、連続する同じ値を間引く事ができます。
Rxとしての強みとしては、ストリームで時間軸を取り扱える点です。
const Rx = require('rxjs')
Rx.Observable.interval(10)
.throttleTime(2000)
.subscribe(n => console.log(n, new Date().toLocaleTimeString()))
// 0 '21:11:56'
// 176 '21:11:58'
// 353 '21:12:00'
intervalのソースは10msですが、2000ms(2秒)経つまでのメッセージを破棄するのがthrottleTime
です。
エラーハンドリング
const Rx = require('rxjs')
Rx.Observable.throw(new Error('hoge'))
.subscribe(
() => console.log('waai'),
err => console.log(`handling!: ${err.toString()}`)
)
// handling!: Error: hoge
subscribeに第二引数を渡すとエラーをキャッチできます。もちろんオペレータをいくつはさんでも大丈夫です。
Promiseを使っている人ならこの利点は理解していただけると思います。
まとめ
非同期処理をいい感じにコレクションライブラリっぽく扱えるのがRxJSです。