RxJSを触り始めたので、勉強がてらOperators関連のコードを書いていきます\(^o^)/
導入
npm install rx
または、
yarn add rx
生成(Creating Observables)
Create
Observableを生成する。
import Rx from 'rx';
Rx.Observable
.create(observer => {
observer.onNext(9);
observer.onCompleted();
return () => console.log('disposed');
})
.subscribe(
x => console.log(`Next: ${x}`),
err => console.log(`Error: ${err}`),
() => console.log('Completed')
);
Next: 9
Completed
disposed
Empty
OnCompleteだけ飛ばす。
Rx.Observable.empty()
.subscribe(
x => console.log(`Next: ${x}`),
err => console.log(`Error: ${err}`),
() => console.log('Completed')
);
Completed
Never
何も呼ばれない空のObservable。
Rx.Observable.never()
.subscribe(
x => console.log(`Next: ${x}`),
err => console.log(`Error: ${err}`),
() => console.log('Completed')
);
Throw
onErrorを流す。
Rx.Observable.return(9)
.selectMany(Rx.Observable.throw(new Error('error!')))
.subscribe(
x => console.log(`Next: ${x}`),
err => console.log(`Error: ${err}`),
() => console.log('Completed')
);
Error: Error: error!
From
IterableなオブジェクトからObservableを生成する。
Rx.Observable
.from(['apple', 'orange', 'peach'])
.subscribe(
x => console.log(`Next: ${x}`),
err => console.log(`Error: ${err}`),
() => console.log('Completed')
);
Next: apple
Next: orange
Next: peach
Completed
Interval
一定時間ごとに値を流す。
Rx.Observable
.interval(1000)
.timeInterval()
.take(3)
.subscribe(
x => console.log(`Next: ${x.value}`),
err => console.log(`Error: ${err}`),
() => console.log('Completed')
);
Next: 0
Next: 1
Next: 2
Completed
Just
値を直接指定して、Observableを生成する。
Rx.Observable
.just(88)
.subscribe(
x => console.log(`Next: ${x}`),
err => console.log(`Error: ${err}`),
() => console.log('Completed')
);
Next: 88
Completed
Range
始まりと終わりを指定し、その間の数字のObservableを生成する。
Rx.Observable
.range(0, 6)
.subscribe(
x => console.log(`Next: ${x}`),
err => console.log(`Error: ${err}`),
() => console.log('Completed')
);
Next: 0
Next: 1
Next: 2
Next: 3
Next: 4
Next: 5
Completed
Repeat
繰り返す。
Rx.Observable
.repeat('Hello world', 3)
.subscribe(
x => console.log(`Next: ${x}`),
err => console.log(`Error: ${err}`),
() => console.log('Completed')
);
Next: Hello world
Next: Hello world
Next: Hello world
Completed
Start
指定したスケジューラーのコンテキストで、非同期に関数を実行するっぽい??。
Rx.Observable
.start(() => 'start', Rx.Scheduler.timeout)
.subscribe(
x => console.log(`Next: ${x}`),
err => console.log(`Error: ${err}`),
() => console.log('Completed')
);
Next: start
Completed
Timer
タイマー。
Rx.Observable
.timer(100, 1000)
.timeInterval()
.pluck('interval')
.take(3)
.subscribe(
x => console.log(`Next: ${x}`),
err => console.log(`Error: ${err}`),
() => console.log('Completed')
);
Next: 222
Next: 879
Next: 999
Completed
変換(Transforming Observables)
Buffer
指定した数で、Observableのデータをまとめる。
Rx.Observable
.timer(0, 100)
.buffer(() => Rx.Observable.timer(200))
.take(3)
.subscribe(
x => console.log(`Next: ${x}`),
err => console.log(`Error: ${err}`),
() => console.log('Completed')
);
Next:
Next: 0,1
Next: 2,3
Completed
FlatMap
流れてくるアイテムを変換し、合成する。また、Observableを生成する。
Rx.Observable
.just('apple,orange,peach')
.flatMap(x => Rx.Observable.from(x.split(',')))
.subscribe(
x => console.log(`Next: ${x}`),
err => console.log(`Error: ${err}`),
() => console.log('Completed')
);
Next: apple
Next: orange
Next: peach
Completed
GroupBy
グルーピング。
Rx.Observable
.range(0, 10)
.groupBy(x => x % 2 == 0)
.flatMap(group => group.toArray())
.subscribe(
x => console.log(`Next: ${x}`),
err => console.log(`Error: ${err}`),
() => console.log('Completed')
);
Next: 0,2,4,6,8
Next: 1,3,5,7,9
Completed
[Map] (http://reactivex.io/documentation/operators/map.html)
データの整形。
Rx.Observable
.from(['田中', '佐藤', '山田'])
.map(x => `${x}さん`)
.subscribe(
x => console.log(`Next: ${x}`),
err => console.log(`Error: ${err}`),
() => console.log('Completed')
);
Next: 田中さん
Next: 佐藤さん
Next: 山田さん
Completed
Scan
順番に足していく。
Rx.Observable
.from(['Hello', 'World', '!!'])
.scan((acc, x) => `${acc} ${x}`)
.subscribe(
x => console.log(`Next: ${x}`),
err => console.log(`Error: ${err}`),
() => console.log('Completed')
);
Next: Hello
Next: Hello World
Next: Hello World !!
Completed
Window
ストリームを束ねて、新たなストリームを生成する???
Rx.Observable
.timer(1, 50)
.window(() => Rx.Observable.timer(100))
.take(3)
.flatMap(x => x.toArray())
.subscribe(
x => console.log(`Next: ${x}`),
err => console.log(`Error: ${err}`),
() => console.log('Completed')
);
Next:
Next: 0,1
Next: 2,3
Completed
フィルタリング(Filtering Observables)
Debounce
指定した時間データが流れてこなければ、直前のデータでイベントが発火する。
Rx.Observable
.interval(1000)
.timeInterval()
.take(3)
.debounce(1000)
.subscribe(
x => console.log(`Next: ${x}`),
err => console.log(`Error: ${err}`),
() => console.log('Completed')
);
Next: 2
Completed
Distinct
重複なく流す。
Rx.Observable
.from(['apple', 'orange', 'apple', 'apple', 'りんご'])
.distinct()
.subscribe(
x => console.log(`Next: ${x}`),
err => console.log(`Error: ${err}`),
() => console.log('Completed')
);
Next: apple
Next: orange
Next: りんご
Completed
Elementat
指定した番号の要素だけ流す。
Rx.Observable
.from(['apple', 'orange', 'りんご'])
.elementAt(1)
.subscribe(
x => console.log(`Next: ${x}`),
err => console.log(`Error: ${err}`),
() => console.log('Completed')
);
Next: orange
Completed
Filter
フィルター。
Rx.Observable
.range(0, 10)
.filter(x => x % 2 == 0)
.subscribe(
x => console.log(`Next: ${x}`),
err => console.log(`Error: ${err}`),
() => console.log('Completed')
);
Next: 0
Next: 2
Next: 4
Next: 6
Next: 8
Completed
First
最初のやつを流す。
Rx.Observable
.range(0, 10)
.first()
.subscribe(
x => console.log(`Next: ${x}`),
err => console.log(`Error: ${err}`),
() => console.log('Completed')
);
Next: 0
Completed
[IgnoreElements] (http://reactivex.io/documentation/operators/ignoreelements.html)
すべての要素を無視する。そして、onNextが呼ばれず、onError, OnCompletedのみ呼ばれる。
Rx.Observable
.range(0, 10)
.ignoreElements()
.subscribe(
x => console.log(`onNext: ${x}`),
err => console.log(`onError: ${err}`),
() => console.log('onCompleted')
);
Completed
Last
最後のやつだけ流す。
Rx.Observable
.range(0, 10)
.last()
.subscribe(
x => console.log(`Next: ${x}`),
err => console.log(`Error: ${err}`),
() => console.log('Completed')
);
Next: 9
Completed
[Sample] (http://reactivex.io/documentation/operators/sample.html)
指定した間隔(一定間隔)で値を流す。
Rx.Observable
.interval(500)
.sample(2000)
.take(3)
.subscribe(
x => console.log(`Next: ${x}`),
err => console.log(`Error: ${err}`),
() => console.log('Completed')
);
Next: 3
Next: 7
Next: 11
Completed
Skip
スキップ。
Rx.Observable
.range(0, 10)
.skip(8)
.subscribe(
x => console.log(`Next: ${x}`),
err => console.log(`Error: ${err}`),
() => console.log('Completed')
);
Next: 8
Next: 9
Completed
SkipLast
後ろから省く。
Rx.Observable
.range(0, 10)
.skipLast(3)
.subscribe(
x => console.log(`Next: ${x}`),
err => console.log(`Error: ${err}`),
() => console.log('Completed')
);
Next: 0
Next: 1
Next: 2
Next: 3
Next: 4
Next: 5
Next: 6
Completed
Take
指定した値だけ取り出す。
Rx.Observable
.range(0, 10)
.take(2)
.subscribe(
x => console.log(`Next: ${x}`),
err => console.log(`Error: ${err}`),
() => console.log('Completed')
);
Next: 0
Next: 1
Completed
TakeLast
後ろから指定した値だけ取り出す。
Rx.Observable
.range(0, 10)
.takeLast(2)
.subscribe(
x => console.log(`Next: ${x}`),
err => console.log(`Error: ${err}`),
() => console.log('Completed')
);
Next: 8
Next: 9
Completed
結合(Combining Observables)
And/Then/When
複数のObservableを制御をしながら、結合してひとつのObservableを作るっぽい??
Rx.Observable
.when(
Rx.Observable.timer(200).and(Rx.Observable.timer(300)).thenDo((x, y) => 'first'),
Rx.Observable.timer(400).and(Rx.Observable.timer(500)).thenDo((x, y) => 'second'))
.subscribe(
x => console.log(`Next: ${x}`),
err => console.log(`Error: ${err}`),
() => console.log('Completed')
);
Next: first
Next: second
Completed
CombineLatest
簡単にいうと、Observable同士を合成するやつ。
var source1 = Rx.Observable
.interval(100)
.map(x => `First: ${x}`);
var source2 = Rx.Observable
.interval(150)
.map(x => `Second: ${x}`);
source1
.combineLatest(
source2,
(s1, s2) => `${s1} ${s2}`)
.take(5)
.subscribe(
x => console.log(`Next: ${x}`),
err => console.log(`Error: ${err}`),
() => console.log('Completed')
);
Next: First: 0 Second: 0
Next: First: 1 Second: 0
Next: First: 2 Second: 0
Next: First: 2 Second: 1
Next: First: 3 Second: 1
Completed
Join
2つのObservableをjoinする感じ??
var source1 = Rx.Observable
.interval(100)
.map(x => `First: ${x}`);
var source2 = Rx.Observable
.interval(100)
.map(x => `Second: ${x}`);
source1
.join(
source2,
() => Rx.Observable.timer(0),
() => Rx.Observable.timer(0),
(x, y) => `${x}_${y}`)
.take(5)
.subscribe(
x => console.log(`Next: ${x}`),
err => console.log(`Error: ${err}`),
() => console.log('Completed')
);
Next: First: 0_Second: 0
Next: First: 1_Second: 1
Next: First: 2_Second: 2
Next: First: 3_Second: 3
Next: First: 4_Second: 4
Completed
Merge
マージです。
const source1 = Rx.Observable
.range(0, 10)
.filter(x => x % 2 == 0);
const source2 = Rx.Observable
.range(0, 10)
.filter(x => x % 2 == 1);
Rx.Observable
.merge(
source1,
source2)
.subscribe(
x => console.log(`Next: ${x}`),
err => console.log(`Error: ${err}`),
() => console.log('Completed')
);
Next: 0
Next: 1
Next: 2
Next: 3
Next: 4
Next: 5
Next: 6
Next: 7
Next: 8
Next: 9
Completed
StartWith
与えた値を先に流してから、その後にObservableの値を流す。
Rx.Observable
.from(['ん', 'ご'])
.startWith('り')
.subscribe(
x => console.log(`Next: ${x}`),
err => console.log(`Error: ${err}`),
() => console.log('Completed')
);
Next: り
Next: ん
Next: ご
Completed
Switch
次のObservableへと、次々スイッチする。
Rx.Observable
.from([
Rx.Observable.from(['まぐろ', 'あじ', 'ひらめ']),
Rx.Observable.from(['りんご', 'みかん', 'おれんじ']),
Rx.Observable.from(['いぬ', 'ねこ', 'しか']),
])
.switch()
.subscribe(
x => console.log(`Next: ${x}`),
err => console.log(`Error: ${err}`),
() => console.log('Completed')
);
Next: まぐろ
Next: りんご
Next: いぬ
Next: ねこ
Next: しか
Completed
Zip
2つ以上のObservableから流れてきた値を合成して新しい一つの値を作る。
const source1 = Rx.Observable.range(0, 3);
const source2 = Rx.Observable.range(5, 8);
Rx.Observable
.zip(
source1,
source2,
(x, y) => `${x} + ${y} = ${x + y}`
)
.subscribe(
x => console.log(`Next: ${x}`),
err => console.log(`Error: ${err}`),
() => console.log('Completed')
);
Next: 0 + 5 = 5
Next: 1 + 6 = 7
Next: 2 + 7 = 9
Completed
エラーハンドリング(Error Handling Operators)
Catch
onErrorの時に代わりに流すObservableなどを指定する。
Rx.Observable
.from([0, 1, 2])
.map(x => {
if(x > 0) return x;
throw new Error('error');
})
.catch(e => {
console.log(`catcht: ${e}`);
return Rx.Observable.just(2);
})
.subscribe(
x => console.log(`Next: ${x}`),
err => console.log(`Error: ${err}`),
() => console.log('Completed')
);
catcht: Error: error
Next: 2
Completed
Retry
例外が発生しても指定の回数リトライする。
Rx.Observable
.range(0, 5)
.map(x => {
if(x > 3) {
throw new Error('error: x > 3');
}
return x;
})
.retry(2)
.subscribe(
x => console.log(`Next: ${x}`),
err => console.log(`Error: ${err}`),
() => console.log('Completed')
);
Next: 0
Next: 1
Next: 2
Next: 3
Next: 0
Next: 1
Next: 2
Next: 3
Error: Error: error: x > 3
Observable Utility Operators
Delay
delay。
Rx.Observable
.range(0, 5)
.delay(1000)
.subscribe(
x => console.log(`Next: ${x}`),
err => console.log(`Error: ${err}`),
() => console.log('Completed')
);
Next: 0
Next: 1
Next: 2
Next: 3
Next: 4
Completed
残りは、また今度書きます...