この記事は bouzuya's RxJS Advent Calendar 2015 の 3 日目かつ RxJS Advent Calendar 2015 の 3 日目です。
はじめに
今日は reactivex.io にある Operator の Creating カテゴリー からいくつかを紹介します。詳細は公式リポジトリの Observable のドキュメント やソースコードを参照します。
ちなみに、この ReactiveX (reactivex.io) は Reactive Extensions (Rx) の関連プロジェクトの情報をまとめているサイトです。ドキュメントも充実しています。各 Operator のドキュメントには marble diagram もついていて分かりやすいです。
また RxJS 4.0.7 を対象にしています。
Obervable の生成
Observable
の生成は通常 Observable.*
にあるファクトリメソッドを使います。
さっそく例を挙げていきましょう。
Observable.just
import { Observable } from 'rx';
Observable
.just(1)
.subscribe(
value => console.log(`onNext: ${value}`),
error => console.log(`onError: ${error}`),
() => console.log(`onCompleted`)
);
console.log('created');
// onNext: 1
// onCompleted
// created
Observable.just
は渡された値を流して終了する Observable を生成します。
第二引数に Scheduler を指定できたり、例を見れば分かる通りデフォルト (Scheduler.immediate
) で同期的に動くのですが、それはまた別の機会にしましょう。
またソースコードは次のコマンドで動かしています (適当) 。
$ npm i rx babel-cli babel-preset-es2015
$ $(npm bin)/babel --presets es2015 index.js -o out.js && node out.js
Observable.from
import { Observable } from 'rx';
Observable
.from([1, 2, 3])
.subscribe(
value => console.log(`onNext: ${value}`),
error => console.log(`onError: ${error}`),
() => console.log(`onCompleted`)
);
// onNext: 1
// onNext: 2
// onNext: 3
// onCompleted
Observable.from
は iterable を引数にとりそれらを流して終了する Observable を生成します。 Observable.just
が単一値だったのに大してこちらは複数値です。
ちなみに文字列を取ると一文字ずつイテレートしてくれる StringIterator
に変換してくれるようです (ソースコードを参照) 。
import { Observable } from 'rx';
Observable
.from('xyz')
.subscribe(
value => console.log(`onNext: ${value}`),
error => console.log(`onError: ${error}`),
() => console.log(`onCompleted`)
);
// onNext: x
// onNext: y
// onNext: z
// onCompleted
mapFn
を引数に取れるけど、ぼくは Observable.prototype.map
で十分な気がします。
Observable.empty
/ Observable.never
/ Observable.throw
飽きてきたのでペースアップします。
Observable.empty
は値を流さずに終了 (onCompleted
) する Observable を返します。
Observable.never
は終了しない (onNext
も onCompleted
も流さない) Observable を返します。
Observable.throw
はエラーを流す (onError
) Observable を返します。
Observable.throw
は使う場面がありそうですけど、ほかはぼくには分からないです。
Observable.interval
/ Observable.timer
時間を操作する系統のものを紹介します。
Observable.interval
は setInterval
のように一定間隔ごとに値を送ります。 Disposable.prototype.dispose
を呼び出すまで繰り返します。 clearInterval
のように止めてくれるのでしょう (実装上は Observable.timer
と同じものを使っています) 。また ドキュメントによると { value: 0, interval: 500 }
のような値を流してくれる らしいですが、実際には 0
のような count が流れてきます (ドキュメントを修正する Pull Request を送ったほうが良いかもしれませんね) 。
Observable.timer
は dueTime
, period
を引数にとり、 dueTime
だけ待ったあと period
の間隔で実行します。 period
を指定しなければすぐ終了します。Observable.interval
は実装上は Observable.timer
とほとんど同じです (繰り返しかたはもちろん違いますが) 。
Observable.range
/ Observable.repeat
自動で繰り返す系統のものを紹介します。
Observable.range
は start
と count
を指定して、指定した範囲の値を生成します。 count
だったか max
だったか end
だったか混乱するかもしれません。そういう場合は Observable.repeat
の第二引数をイメージすると良いです。
Observable.repeat
は value
を指定して、その値を繰り返し何度も流す Observable を返します。第二引数は count
です。指定した個数だけ value
を流します。range
の引数はこのイメージで良いです。
Observable.start
Observable.start
は関数を引数に取り、指定したスケジューラーのコンテキストで非同期に関数を実行するらしいのですが、ぼくは正直なところ使ったことがないです。
前述の通り、Rx はメソッドによるとは思いますがデフォルトでは非同期にならないです。Promise は必ず非同期になってしまうので、根本的な部分で思想が違うのだと思います。
Observable.defer
Observable.defer
は observableFactoryFn
を引数に取ります。 Observable
か Promise
かを返す関数です。そしてその値を流す Observable
を返します。
Observable.create
最後は Observable.create
です。本当は最初でもいいのですが、あえて最後に持ってきました。
引数にコールバック関数の subscribe
を取ります。 subscribe
は引数に observer
を受け取ります。値を流すための observer
をつくってくれるわけですね。subscribe
の戻り値は Disposable.prototype.dispose
相当です。つまり、これがあれば大体の処理はできるはずです。
import { Observable } from 'rx';
Observable
.create(observer => {
observer.onNext(123);
observer.onNext(456);
observer.onCompleted();
return () => console.log('disposed');
})
.subscribe(
value => console.log(`onNext: ${value}`),
error => console.log(`onError: ${error}`),
() => console.log(`onCompleted`)
);
おわりに
今日は Observable
のファクトリメソッドを見てみました。まだイベントを元に生成するものなどは見ていません。