ReactiveExtensions
RxJS
Rx
RxJSDay 3

RxJS の Operators (1) - Observable の生成

More than 3 years have passed since last update.

この記事は bouzuya's RxJS Advent Calendar 2015 の 3 日目かつ RxJS Advent Calendar 2015 の 3 日目です。


はじめに

今日は reactivex.io にある Operator の Creating カテゴリー からいくつかを紹介します。詳細は公式リポジトリの Observable のドキュメント やソースコードを参照します。

ちなみに、この ReactiveX (reactivex.io) は Reactive Extensions (Rx) の関連プロジェクトの情報をまとめているサイトです。ドキュメントも充実しています。各 Operator のドキュメントには marble diagram もついていて分かりやすいです。

また RxJS 4.0.7 を対象にしています。


Obervable の生成

Observable の生成は通常 Observable.* にあるファクトリメソッドを使います。

さっそく例を挙げていきましょう。


Observable.just

import { Observable } from 'rx';

Observable
.just(1)
.subscribe(
value => console.log(`onNext: ${value}`),
error => console.log(`onError: ${error}`),
() => console.log(`onCompleted`)
);
console.log('created');

// onNext: 1
// onCompleted
// created

Observable.just は渡された値を流して終了する Observable を生成します。

第二引数に Scheduler を指定できたり、例を見れば分かる通りデフォルト (Scheduler.immediate) で同期的に動くのですが、それはまた別の機会にしましょう。

またソースコードは次のコマンドで動かしています (適当) 。

$ npm i rx babel-cli babel-preset-es2015

$ $(npm bin)/babel --presets es2015 index.js -o out.js && node out.js


Observable.from

import { Observable } from 'rx';

Observable
.from([1, 2, 3])
.subscribe(
value => console.log(`onNext: ${value}`),
error => console.log(`onError: ${error}`),
() => console.log(`onCompleted`)
);
// onNext: 1
// onNext: 2
// onNext: 3
// onCompleted

Observable.from は iterable を引数にとりそれらを流して終了する Observable を生成します。 Observable.just が単一値だったのに大してこちらは複数値です。

ちなみに文字列を取ると一文字ずつイテレートしてくれる StringIterator に変換してくれるようです (ソースコードを参照) 。

import { Observable } from 'rx';

Observable
.from('xyz')
.subscribe(
value => console.log(`onNext: ${value}`),
error => console.log(`onError: ${error}`),
() => console.log(`onCompleted`)
);
// onNext: x
// onNext: y
// onNext: z
// onCompleted

mapFn を引数に取れるけど、ぼくは Observable.prototype.map で十分な気がします。


Observable.empty / Observable.never / Observable.throw

飽きてきたのでペースアップします。

Observable.empty は値を流さずに終了 (onCompleted) する Observable を返します。

Observable.never は終了しない (onNextonCompleted も流さない) Observable を返します。

Observable.throw はエラーを流す (onError) Observable を返します。

Observable.throw は使う場面がありそうですけど、ほかはぼくには分からないです。


Observable.interval / Observable.timer

時間を操作する系統のものを紹介します。

Observable.intervalsetInterval のように一定間隔ごとに値を送ります。 Disposable.prototype.dispose を呼び出すまで繰り返します。 clearInterval のように止めてくれるのでしょう (実装上は Observable.timer と同じものを使っています) 。また ドキュメントによると { value: 0, interval: 500 } のような値を流してくれる らしいですが、実際には 0 のような count が流れてきます (ドキュメントを修正する Pull Request を送ったほうが良いかもしれませんね) 。

Observable.timerdueTime, period を引数にとり、 dueTime だけ待ったあと period の間隔で実行します。 period を指定しなければすぐ終了します。Observable.interval は実装上は Observable.timer とほとんど同じです (繰り返しかたはもちろん違いますが) 。


Observable.range / Observable.repeat

自動で繰り返す系統のものを紹介します。

Observable.rangestartcount を指定して、指定した範囲の値を生成します。 count だったか max だったか end だったか混乱するかもしれません。そういう場合は Observable.repeat の第二引数をイメージすると良いです。

Observable.repeatvalue を指定して、その値を繰り返し何度も流す Observable を返します。第二引数は count です。指定した個数だけ value を流します。range の引数はこのイメージで良いです。


Observable.start

Observable.start は関数を引数に取り、指定したスケジューラーのコンテキストで非同期に関数を実行するらしいのですが、ぼくは正直なところ使ったことがないです。

前述の通り、Rx はメソッドによるとは思いますがデフォルトでは非同期にならないです。Promise は必ず非同期になってしまうので、根本的な部分で思想が違うのだと思います。


Observable.defer

Observable.deferobservableFactoryFn を引数に取ります。 ObservablePromise かを返す関数です。そしてその値を流す Observable を返します。


Observable.create

最後は Observable.create です。本当は最初でもいいのですが、あえて最後に持ってきました。

引数にコールバック関数の subscribe を取ります。 subscribe は引数に observer を受け取ります。値を流すための observer をつくってくれるわけですね。subscribe の戻り値は Disposable.prototype.dispose 相当です。つまり、これがあれば大体の処理はできるはずです。

import { Observable } from 'rx';

Observable
.create(observer => {
observer.onNext(123);
observer.onNext(456);
observer.onCompleted();
return () => console.log('disposed');
})
.subscribe(
value => console.log(`onNext: ${value}`),
error => console.log(`onError: ${error}`),
() => console.log(`onCompleted`)
);


おわりに

今日は Observable のファクトリメソッドを見てみました。まだイベントを元に生成するものなどは見ていません。