RxJS
Rx

RxJSで始めるRx(Reactive Extensions)入門1

RxJS4についての記事だったんですが、古くなってしまったので、RxJS5で始めるRx入門 - Qiitaという記事に書き直しました。今後はそちらをご覧ください。

最近RxJSやRxJavaやり始めたばかりの入門者です。間違ってる事書いてたりしてたら是非とも突っ込みを入れてください。あと、RxJSとは何かがわかりにくかったりイメージがつかみにくければ、お気軽にコメントください。

今北産業

  • 非同期処理やイベント処理を簡単に書ける
  • エラー及び終了をうまくハンドリングできる
  • 関数型のコレクションライブラリみたいなもの

RxJSの在処

Rxとは何?

Rx(Reactive Extensions)はFRP(関数型リアクティブプログラミング)を実現するライブラリで、元々Microsoftが.Netで開発したもので今はMicrosoftやNetflixが様々な言語に移植しているものです。

FRPが何かについては僕には全くもって語ることができないので特に触れません。

まずはRxJSを用いたサンプルコードを書いてみます。

var clickStream = Rx.Observable.fromEvent(document, "mouseup");
clickStream
    .buffer(clickStream.throttle(250))
    .map(function(x) {return x.length})
    .filter(function(n) {return n >= 2})
    .subscribe(function(n) {console.log(n + "click")});

ダブルクリック(ないしトリプル以上の連続クリック)をしたら、コンソールログに、その旨を表示するというサンプルです。これだけのコードでダブルクリック判定がいとも簡単にできてしまいます。

ストリーム

Rxではストリームと呼ばれるものを扱います。ストリームはデータの流れるシーケンスであり、時間軸も存在しています (非同期とか考えないならスルーしてもOK)。ストリームはObservableとも呼ばれ、型のある言語バージョンのRxであれば、Observable<T>型でT型のデータが流れるストリームになります。ちなみにストリームに流れるデータの事をメッセージと呼びます。

生成

Rx.Observable.just("hoge", "fuga")

"hoge", "fuga" という文字列の流れるストリームが生成されます。

Rx.Observable.interval(100)

100ミリ秒ごとにメッセージの流れるストリームが生成できます。

Rx.Observable.fromEvent(document, "mousemove")

JavaScriptのイベントからストリームを生成することもできます。

上述のように様々な方法でストリームが生成できるように、(型のある言語では型が合う限りは) あらゆるメッセージを同じストリームに流すことも可能です。もちろんそれはそうする意味がある場合ですが。

処理する

ストリームを、Observable#subscribe すれば流れてくるデータを受け取って処理を行う事ができます。

Rx.Observable.just("hoge", "fuga")
             .subscribe(function(s){
                 console.dir(s);
             });

オペレータ

ストリームはそのままではあまり面白みがありません。そこで加工することにします。加工するメソッドをオペレータと呼びます。

Rx.Observable.just("hoge", "foo")
             .map(function(s) {
                 return s.length;
             })
             .subscribe(function(n){
                 console.dir(n);
             });

Rubyなんかをやってる人にはおなじみの Enumerable#map と同じ map オペレータです。引数に渡された関数によって、コレクションの要素をコンバートするものです。ここでは、"hoge", "foo" の文字列を、長さの数字 4, 3 に変換しています。

original stream: -a-b-|
                  v v
  mapped stream: -4-3-|

オペレータが返す物は、処理後の別のストリームなので、とても関数型のコレクションライブラリっぽいです。また、メソッドチェインを続けることも可能ですが、やり過ぎるとわかりにくくなってしまうかもしれません。

エラーハンドリング

Rxの真骨頂はエラー時に例外を飛ばさず、ちゃんとストリームで扱えるという点だと思っています。

Rx.Observable.throw(new Error('hoge')).subscribe(
    function(x) {},
    function(err) {console.dir(err)}
);

Rx.Observable.throw(new Error('hoge')) で、エラーの投げられるストリームが生成されます。そして subscribe の二つ目の引数に関数を渡すと、エラー時のハンドリングが行われます。

上述のサンプルでは throw でストリームを生成した直後に subscribe していますが、途中にいくつオペレータを挟んでも全く問題ありません。

終了ハンドリング

subscribeに三つ目の引数を与えると、終了時のハンドリングが可能になります。

Rx.Observable.just('hoge').subscribe(
    function(x) {console.dir(x)},
    function(err) {console.dir(err)},
    function() {console.log("completed!")});

まとめ

導入してうれしいプロダクトの条件としては

  • 非同期処理がある
  • イベントドリブンの処理がある
  • エラーリカバリの必要性がある

となると思いますが、これらの条件に当てはまらないプロダクトはそうそうないのではないでしょうか。