Posted at

RxJS Observable まとめ

More than 1 year has passed since last update.


はじめに

何故こんなものを書いてしまったのか・・・orz

いつも触っているフレームワークが Angular.js から Angular に変わり、

HTTPClient の戻り値が ng.IPromise から Observable になっていました。

ReactiveX 自体は AndroidiOS を開発していた頃に RxJavaRxSwift を通して、名前だけ知ってましたので、RxJS もその辺のものなんだなーという程度の認識で、Angularチュートリアルの知識で Observable を使ってました。

Angular を触っている限り Observable は付いてくるし、

RxJS を知っておけば他の RxSwift とかでも活かせるだろうなと思って、

軽い気持ちで Observable の機能をまとめてみようと思ったのがこの記事執筆の始まりです。

※途中までやって他の方々がもっと詳しくまとめている記事を見つけたりしましたが、

※途中までやっちゃったし、自分でまとめた方が理解できるし。って思って最後までまとめました。

Google 翻訳を駆使して、実際に動かしてみて、

自分なりの解釈でまとめてますので全然違うのもあるかと思います。


バージョンとか


  • rxjs 5.5.6


Observable

Category

Create Operators

Mathematical and Aggregate Operators

Conditional and Boolean Operators

Error Handling Operators

Utility Operators

Filtering Operators

Combination Operators

Transformation Operators

Multicasting Operators


  • カテゴリは こちら を参考に。

  • リファレンスは こちら を参考に。

  • メソッド名が太字なものは上記カテゴリに載ってなかったので勝手にカテゴリ分けしたもの

  • 赤文字はちゃんと理解できていないもの


Create Operators

Method Type
Method Name

class method
bindCallback

class method
bindNodeCallback

class method
create

class method
defer

class method
empty

class method
from

class method
fromEvent

class method
fromEventPattern

class method
fromPromise

class method
interval

class method
never

class method
of

class method
range

class method
throw

class method
timer

instance method
repeat

instance method
repeatWhen


bindCallback


  • コールバック付きの関数から Observable を作成する

function func(cb: () => void) {

setTimeout(() => {
cb();
}, 1000);
}

const observable = Rx.Observable.bindCallback(func);
observable().subscribe(() => console.log('success'));

// => success

function func(cb: (s1) => void) {

setTimeout(() => {
cb('Hello');
}, 1000);
}

const observable = Rx.Observable.bindCallback(func);
observable().subscribe(res => console.log(res));

// => Hello

function func(cb: (s1, s2) => void) {

setTimeout(() => {
cb('Hello', 'World!');
}, 1000);
}

const observable = Rx.Observable.bindCallback(func);
observable().subscribe(res => console.log(res));

// => [ 'Hello', 'World' ]

function func(value: number, cb: (n1) => void) {

setTimeout(() => {
cb(value * 10);
}, 1000);
}

const observable = Rx.Observable.bindCallback(func);
observable(5).subscribe(res => console.log(res));

// => 50


bindNodeCallback



  • (err, data) => void のような良くあるコールバック付きの関数から Observable を作成する



    • bindCallback() を先に知っておいた方が良い


    • bindCallback() とだいたい同じ



function func(cb: (err, data) => void) {

setTimeout(() => {
cb(null, 'Hello');
}, 1000);
}

const observable = Rx.Observable.bindNodeCallback(func);
observable().subscribe(res => console.log(res));

// => Hello

function func(cb: (err, data1, data2) => void) {

setTimeout(() => {
cb(null, 'Hello', 'World!');
}, 1000);
}

const observable = Rx.Observable.bindNodeCallback(func);
observable().subscribe(res => console.log(res));

// => [ 'Hello', 'World!' ]

function func(cb: (err, data?) => void) {

setTimeout(() => {
cb('Error');
}, 1000);
}

const observable = Rx.Observable.bindNodeCallback(func);
observable().subscribe(
res => console.log('ここには来ない'),
err => console.log(err)
);

// => Error


create


  • 新しい Observable を作成する

const observable = Rx.Observable.create(observer => {

observer.next(1);
observer.next(2);
observer.next(3);
observer.complete();
});
observable.subscribe(res => console.log(res));

// => 1
// => 2
// => 3

const observable = Rx.Observable.create(observer => {

observer.next(1);
observer.error('Error');
observer.next(2);
observer.complete();
});
observable.subscribe(
res => console.log(res),
err => console.log(err)
);

// => 1
// => Error


defer


  • 遅延して Observable を作成する

Rx.Observable.defer(() => Rx.Observable.of('a', 'b', 'c'))

.subscribe(res => console.log(res));

// => a
// => b
// => c


empty


  • 何も発行せず complete する Observable を作成する

Rx.Observable.empty()

.subscribe(
res => console.log('ここには来ない'),
err => console.log('ここには来ない'),
() => console.log('complete')
);

// => complete


from


  • 配列から Observable を作成する

Rx.Observable.from([10, 20, 30])

.subscribe(res => console.log(res));

// => a
// => b
// => c


fromEvent


  • DOM Events や EventEmitter から Observable を作成する

Rx.Observable.fromEvent(document, 'click')

.subscribe(evt => console.log(`x: ${evt.x}, y: ${evt.y}`));

// => x: 299, y: 527
// => x: 332, y: 547
// => x: 343, y: 328


fromEventPattern



  • addHandler , removeHandler に基づいて Observable を作成する



    • fromEvent() を先に知っておいた方が良い

    • あまり良くわかってないが、たぶん fromEvent() とだいたい同じ



Rx.Observable.fromEventPattern(

(handler: any) => document.addEventListener('click', handler),
(handler: any) => document.removeEventListener('click', handler)
).subscribe(evt => console.log(`x: ${evt.x}, y: ${evt.y}`));

// => x: 299, y: 527
// => x: 332, y: 547
// => x: 343, y: 328


fromPromise


  • Promise から Observable を作成する

function func(cb: () => void) {

return new Promise((resolve, reject) => {
setTimeout(() => {
resolve('success');
}, 1000);
});
}

Rx.Observable.fromPromise(func())
.subscribe(res => console.log(res));

// => success

function func(cb: () => void) {

return new Promise((resolve, reject) => {
setTimeout(() => {
reject('error');
}, 1000);
});
}

Rx.Observable.fromPromise(func())
.subscribe(
res => console.log('ここには来ない'),
err => console.log(err)
);

// => error


interval


  • 定期的に発行する Observable を作成する

Rx.Observable.interval(1000)

.subscribe(res => console.log(res));

// => 0
// => 1
// => 2
// ...


never


  • 何もしない Observable を作成する

Rx.Observable.never()

.subscribe(
res => console.log('ここには来ない'),
err => console.log('ここには来ない'),
() => console.log('ここには来ない')
);

// => (何も起きない)


of


  • 指定した値を発行する Observable を作成する

Rx.Observable.of(1, 2, 3)

.subscribe(res => console.log(res));

// => 1
// => 2
// => 3

Rx.Observable.of('a', 'b', 'c')

.subscribe(res => console.log(res));

// => a
// => b
// => c

Rx.Observable.of<any>([1, 2, 3], ['a', 'b', 'c'])

.subscribe(res => console.log(res));

// => [ 1, 2, 3 ]
// => [ 'a', 'b', 'c' ]


range



  • start から count 個の数値を発行する Observable を作成する

Rx.Observable.range(1, 10)

.subscribe(res => console.log(res));

// => 1
// => 2
// => 3
// => ...
// => 9
// => 10

Rx.Observable.range(5, 10)

.subscribe(res => console.log(res));

// => 5
// => 6
// => 7
// => ...
// => 13
// => 14


throw


  • エラーを発行する Observable を作成する

Rx.Observable.throw('Error')

.subscribe(
res => console.log('ここには来ない'),
err => console.log(err)
);

// => Error


timer


  • 開始時間を指定して定期的に発行する Observable を作成する



    • interval() とだいたい同じ



Rx.Observable.timer(3000, 1000)

.subscribe(res => console.log(res));

// => 0
// => 1
// => 2
// => ...


repeat


  • 本線を指定回数繰り返す Observable を作成する

Rx.Observable.of('a', 'b', 'c')

.repeat(3)
.subscribe(res => console.log(res));

// => a
// => b
// => c
// => a
// => b
// => c
// => a
// => b
// => c


repeatWhen



  • notifier が発行された時に本線を繰り返す Observable を作成する?



    • repeat() を先に知っておいた方が良い

    • あまり良くわかってないが、たぶん repeat() とだいたい同じ



Rx.Observable.of('a', 'b', 'c')

.repeatWhen(notifier => notifier.take(3)) // +3回繰り返される
.subscribe(res => console.log(res));

// => a
// => b
// => c
// => a
// => b
// => c
// => a
// => b
// => c
// => a
// => b
// => c


Mathematical and Aggregate Operators

Method Type
Method Name

instance method
count

instance method
max

instance method
min


count


  • Observable が発行する値の個数を発行する

Rx.Observable.of('a', 'bb', 'ccc')

.count()
.subscribe(res => console.log(res));

// => 3

Rx.Observable.of('a', 'bb', 'ccc')

.count(v => v.length >= 2)
.subscribe(res => console.log(res));

// => 2


max


  • Observable が発行する値の中で最大値を発行する

Rx.Observable.of(5, 4, -1, 8, 2)

.max()
.subscribe(res => console.log(res));

// => 8


min


  • Observable が発行する値の中で最小値を発行する

Rx.Observable.of(5, 4, -1, 8, 2)

.min()
.subscribe(res => console.log(res));

// => -1


Conditional and Boolean Operators

Method Type
Method Name

instance method
defaultIfEmpty

instance method
every

instance method
find

instance method
findIndex

instance method
isEmpty

instance method
sequenceEqual

instance method
single


defaultIfEmpty


  • Observable の発行が Empty だった場合にデフォルト値を発行する

Rx.Observable.empty()

.defaultIfEmpty('defaultValue')
.subscribe(res => console.log(res));

// => defaultValue

Rx.Observable.empty()

.defaultIfEmpty({ data: 'defaultValue'})
.subscribe(res => console.log(res));

// => d{ data: 'defaultValue' }

Rx.Observable.of('a', 'bb', 'ccc')

.defaultIfEmpty('defaultValue')
.subscribe(res => console.log(res));

// => a
// => bb
// => ccc


every



  • predicate で指定する条件を全ての値が満たしているかどうかを発行する



    • Array.every() と同じ



Rx.Observable.of(1, 2, 3, 4, 5)

.every(v => v <= 5)
.subscribe(res => console.log(res));

// => true

Rx.Observable.of(1, 2, 3, 4, 5)

.every(v => v < 5)
.subscribe(res => console.log(res));

// => false


find



  • predicate で指定する条件を満たす値を発行する



    • Array.find() と同じ



Rx.Observable.of(1, 2, 3, 4, 5)

.find(v => v === 4)
.subscribe(res => console.log(res));

// => 4

Rx.Observable.of(1, 2, 3, 4, 5)

.find(v => v % 2 === 0)
.subscribe(res => console.log(res));

// => 2

Rx.Observable.of(1, 2, 3, 4, 5)

.find(v => v === 10)
.subscribe(res => console.log(res));

// => undefined


findIndex



  • predicate で指定する条件を満たす値のインデックスを発行する



    • Array.findIndex() と同じ



Rx.Observable.of(1, 2, 3, 4, 5)

.findIndex(v => v === 4)
.subscribe(res => console.log(res));

// => 3

Rx.Observable.of(1, 2, 3, 4, 5)

.findIndex(v => v % 2 === 0)
.subscribe(res => console.log(res));

// => 1

Rx.Observable.of(1, 2, 3, 4, 5)

.findIndex(v => v === 10)
.subscribe(res => console.log(res));

// => -1


isEmpty


  • Observable の発行する値が Empty かどうかを発行する

Rx.Observable.empty()

.isEmpty()
.subscribe(res => console.log(res));

// => true

Rx.Observable.of('a', 'bb', 'ccc')

.isEmpty()
.subscribe(res => console.log(res));

// => false


sequenceEqual


  • 同じ発行のある Observable かどうかを発行する

const compareObservable = Rx.Observable.range(5, 10);

Rx.Observable.of(5, 6, 7, 8, 9, 10, 11, 12, 13, 14)
.sequenceEqual(compareObservable)
.subscribe(res => console.log(res));

// => true

const compareObservable = Rx.Observable.range(5, 10);

Rx.Observable.of(5, 6, 7, 8, 9, 10)
.sequenceEqual(compareObservable)
.subscribe(res => console.log(res));

// => false


single



  • predicate で指定する値が1つしかない場合は成功

Rx.Observable.of('a', 'b', 'c', 'b', 'e')

.single(v => v === 'a')
.subscribe(
res => console.log(res),
err => console.log('ここには来ない'),
() => console.log('complete')
);

// => a
// => complete

Rx.Observable.of('a', 'b', 'c', 'b', 'e')

.single(v => v === 'b')
.subscribe(
res => console.log('ここには来ない'),
err => console.log(err),
() => console.log('ここには来ない')
);

// => Sequence contains more than one element


Error Handling Operators

Method Type
Method Name

instance method
catch

instance method
retry

instance method
retryWhen

instance method
onErrorResumeNext


catch


  • Observable がエラーを発行したら次の Observable を発行する

Rx.Observable.throw('Error')

.catch(err => Rx.Observable.of('a', 'bb', 'ccc'))
.subscribe(
res => console.log(res),
err => console.log('ここには来ない'),
() => console.log('complete')
);

// => a
// => bb
// => ccc
// => complete

Rx.Observable.interval(1000)

.map(v => {
if (v === 4) {
throw 'Error';
}
return v;
})
.catch(err => Rx.Observable.of('IV', 'V', 'VI'))
.subscribe(
res => console.log(res),
err => console.log('ここには来ない'),
() => console.log('complete')
);

// => 1
// => 2
// => 3
// => IV
// => V
// => VI
// => complete


retry


  • Observable がエラーを発行したら count だけリトライする

Rx.Observable.throw('Error')

.retry(3)
.subscribe(
res => console.log(res),
err => console.log(err),
() => console.log('ここには来ない')
);

// => Error

Rx.Observable.interval(1000)

.map(v => {
if (v === 4) {
throw 'Error';
}
return v;
})
.retry(3)
.subscribe(
res => console.log(res),
err => console.log(err),
() => console.log('ここには来ない')
);

// => 1
// => 2
// => 3
// => 1
// => 2
// => 3
// => 1
// => 2
// => 3
// => 1
// => 2
// => 3
// => Error


retryWhen


  • Observable がエラーを発行したら notifier を基にリトライを行う



    • retry() を先に知っておいた方が良い

    • あまり良くわかってないが、たぶん retry() とだいたい同じ



Rx.Observable.interval(1000)

.map(v => {
if (v === 4) {
throw 'Error';
}
return v;
})
.do(
res => console.log(`do(res): ${res}`),
err => console.log(`do(err): ${err}`),
() => console.log(`do(complete)`)
)
.retryWhen(errors => {
return errors.delay(200).do(
res => console.log(`errors(res): ${res}`), // エラーが発生するとここに飛んでくるんだな?
err => console.log(`errors(err): ${err}`),
() => console.log(`errors(complete)`)
);
})
.subscribe(
res => console.log(`res: ${res}`),
err => console.log(`err: ${err}`),
() => console.log(`complete`),
);

// => do(res): 0
// => res: 0
// => do(res): 1
// => res: 1
// => do(res): 2
// => res: 2
// => do(res): 3
// => res: 3
// => do(err): Error
// => errors(res): Error
// => do(res): 0
// => res: 0
// => do(res): 1
// => res: 1
// => do(res): 2
// => res: 2
// => do(res): 3
// => res: 3
// => do(err): Error
// => errors(res): Error
// ...


onErrorResumeNext


  • Observable がエラーを発行したら次の Observable を発行する



    • catch と同じような感じ

    • 但し、コールバックに err は飛んでこないのでエラーハンドリングしたい場合は catch を使うと良い?



Rx.Observable.interval(1000)

.map(v => {
if (v === 4) {
throw 'Error';
}
return v;
})
.onErrorResumeNext(Rx.Observable.of('IV', 'V', 'VI'))
.subscribe(
res => console.log(res),
err => console.log('ここには来ない'),
() => console.log('complete')
);

// => 0
// => 1
// => 2
// => 3
// => IV
// => V
// => VI
// => complete


Utility Operators

Method Type
Method Name

instance method
delay

instance method
delayWhen

instance method
dematerialize

instance method
do

instance method
materialize

instance method
observeOn

instance method
subscribeOn

instance method
timeInterval

instance method
timeout

instance method
timeoutWith

instance method
timestamp

instance method
toArray

class method
webSocket

instance method
finally

instance method
forEach

instance method
let

instance method
lift

instance method
pipe

instance method
toPromise


delay


  • Observable の発行を遅延させる

Rx.Observable.of('a', 'bb', 'ccc')

.delay(5000)
.subscribe(res => console.log(res));

// => a (5s after)
// => bb
// => ccc


delayWhen


  • Observable の発行を delayDurationSelector で返却する Observable が発行させる



    • delay() を先に知っておいた方が良い



Rx.Observable.of('a', 'bbbb', 'cccccccc')

.delayWhen(v => {
return Rx.Observable.timer(v.length * 1000)
})
.subscribe(res => console.log(res));

// => a (1s after)
// => bbbb (4s after)
// => cccccccc (8s after)


dematerialize



  • Rx.NotificationObservable に変換する?



    • materialize() の逆(先に materialize() 見たほうがいいかも)

    • あまり良くわかってない。。。



const notifA = new Rx.Notification('N', 'a');

const notifB = new Rx.Notification('N', 'b');
const notifE = new Rx.Notification('E', undefined, new TypeError('x.toUpperCase() is not a function'));

Rx.Observable.of(notifA, notifB, notifE)
.dematerialize()
.subscribe(
res => console.log(res),
err => console.log(err.message),
() => console.log('ここには来ない'));

// => a
// => b
// => x.toUpperCase() is not a function


do


  • Observable の発行を subscribe する前にフックする

Rx.Observable.of('a', 'bb', 'ccc')

.do(
res => console.log(`do(res): ${res}`),
err => console.log(`ここには来ない`),
() => console.log(`do(complete)`)
)
.subscribe(
res => console.log(`res: ${res}`),
err => console.log(`ここには来ない`),
() => console.log(`complete`)
);

// => do(res): a
// => res: a
// => do(res): bb
// => res: bb
// => do(res): ccc
// => res: ccc
// => do(complete)
// => complete


materialize



  • ObservableRx.Notification に変換する?


    • あまり良くわかってない。。。



Rx.Observable.of<any>('a', 'b', undefined, 'd').map(v => v.toUpperCase())

.materialize()
.subscribe(
res => console.log(res),
err => console.log('ここには来ない'),
() => console.log('complete')
);

// => Notification { kind: 'N', value: 'A', error: undefined, hasValue: true }
// => Notification { kind: 'N', value: 'B', error: undefined, hasValue: true }
// => Notification {
// kind: 'E',
// value: undefined,
// error: TypeError: Cannot read property 'toUpperCase' of undefined
// ...
// complete


observeOn


  • Observable の発行を「次のタイミング」で行う?


    • あまり良くわかってない。。。



const observable: Rx.Observable<any> = Rx.Observable.create(observer => {

console.log('[1] before next()');
observer.next(1);
console.log('[1] after next()');

console.log('[2] before next()');
observer.next(2);
console.log('[2] after next()');

console.log('[3] before complete()');
observer.complete();
console.log('[3] after complete()');
});

console.log('[4] before subscribe');
observable.observeOn(Rx.Scheduler.async)
.subscribe(
next => console.log(`[emit] next: ${next}`),
err => console.log(`[emit] error: ${err}`),
() => console.log(`[emit] complete`)
);
console.log('[4] after subscribe');

// => [4] before subscribe
// => [1] before next()
// => [1] after next()
// => [2] before next()
// => [2] after next()
// => [3] before complete()
// => [3] after complete()
// => [4] after subscribe
// => [emit] next: 1
// => [emit] next: 2
// => [emit] complete


subscribeOn


  • Observable の実行を「次のタイミング」で行う?


    • あまり良くわかってない。。。



const o1: Rx.Observable<any> = Rx.Observable.create(observer => {

console.log('[1] before next()');
observer.next(1);
console.log('[1] after next()');

console.log('[2] before next()');
observer.next(2);
console.log('[2] after next()');

console.log('[3] before complete()');
observer.complete();
console.log('[3] after complete()');
});

console.log('[4] before subscribe');
o1.subscribeOn(Rx.Scheduler.async)
.subscribe(
next => console.log(`[emit] next: ${next}`),
err => console.log(`[emit] error: ${err}`),
() => console.log(`[emit] complete`)
);
console.log('[4] after subscribe');

// => [4] before subscribe
// => [4] after subscribe
// => [1] before next()
// => [emit] next: 1
// => [1] after next()
// => [2] before next()
// => [emit] next: 2
// => [2] after next()
// => [3] before complete()
// => [emit] complete:
// => [3] after complete()


timeInterval


  • 前の発行からどれくらい経ったかを発行する

Rx.Observable.of('a', 'bbbb', 'cccccccc')

.delayWhen(v => Rx.Observable.timer(v.length * 1000))
.timeInterval()
.subscribe(
res => console.log(res),
err => console.log('ここには来ない'),
res => console.log('complete'),
);

// => TimeInterval { value: 'a', interval: 1005 }
// => TimeInterval { value: 'bbbb', interval: 3000 }
// => TimeInterval { value: 'cccccccc', interval: 4000 }
// => complete


timeout


  • タイムアウト時刻まで発行が無い場合はエラーを発行する

Rx.Observable.timer(5000, 1000)

.timeout(1000)
.subscribe(
res => console.log('ここには来ない'),
err => console.log(err.message),
() => console.log('ここには来ない')
);

// => Timeout has occurred


timeoutWith


  • タイムアウト時刻まで発行が無い場合は別の Observable を発行する



    • timeout() を先に知っておいた方が良い


    • timeout() + catch() な感じ?



Rx.Observable.timer(5000, 1000)

.timeoutWith(1000, Rx.Observable.of('a', 'bb', 'ccc'))
.subscribe(
res => console.log(res),
err => console.log('ここには来ない'),
() => console.log('complete')
);

// => a
// => bb
// => ccc
// => complete


timestamp


  • 発行時刻が付加される

Rx.Observable.of('a', 'bbbb', 'cccccccc')

.delayWhen(v => Rx.Observable.timer(v.length * 1000))
.timestamp()
.subscribe(
res => console.log(res),
err => console.log('ここには来ない'),
() => console.log('complete')
);

// => Timestamp { value: 'a', timestamp: 1521783521464 }
// => Timestamp { value: 'bbbb', timestamp: 1521783524459 }
// => Timestamp { value: 'cccccccc', timestamp: 1521783528462 }
// => complete


toArray


  • 発行された値を配列に変換する

Rx.Observable.of('a', 'b', 'c')

.toArray()
.subscribe(
res => console.log(res),
err => console.log('ここには来ない'),
() => console.log('complete')
);

// => [ 'a', 'b', 'c' ]
// => complete


webSocket


  • WebSocket を WebSocketSubject にラップする


    • あまり良くわかってない。。。

    • うまく動かせなかった。。。



const ws = Rx.Observable.webSocket('ws://localhost:8081');

ws.subscribe(
res => console.log('message received: ' + res),
err => console.log(err),
() => console.log('complete')
);
ws.next(JSON.stringify({ op: 'hello' }));


finally


  • 一番最後に発火する関数を定義する

Rx.Observable.of('a', 'bb', 'ccc')

.finally(() => {
console.log('Finally!')
})
.subscribe(
res => console.log(res),
err => console.log('ここには来ない'),
() => console.log('complete')
);

// => a
// => bb
// => ccc
// => complete
// => Finally!

Rx.Observable.of('a', 'bb', 'ccc')

.map(v => {
if (v.length > 2) {
throw new Error('Error!');
}
return v;
})
.finally(() => {
console.log('Finally!')
})
.subscribe(
res => console.log(res),
err => console.log(err.message),
() => console.log('ここには来ない')
);

// => a
// => bb
// => Error!
// => Finally!


forEach


  • Subscribe ではなく Promise で購読する

Rx.Observable.of('a', 'bb', 'ccc')

.forEach(v => console.log(`v: ${v}`))
.then(res => console.log(`then: ${res}`))
.catch(err => console.log('ここには来ない'));

// => v: a
// => v: bb
// => v: ccc
// => then: undefined

Rx.Observable.of('a', 'bb', 'ccc')

.map(v => {
if (v.length > 2) {
throw new Error('Error!')
}
return v;
})
.forEach(v => console.log(`v: ${v}`))
.then(res => console.log('ここには来ない'))
.catch(err => console.log(`catch: ${err.message}`));

// => v: a
// => v: bb
// => catch: Error!


let


  • Observable が発行を Subscribeする前にフックできる?



    • do() の Observable 版?

    • あまり良くわかってない。。。



Rx.Observable.of('a', 'bbbb', 'cccccccc')

.let(selector => {
return selector.do(
res => console.log(`let.do(res): ${res}`),
err => console.log(`let.do(err): ${err}`),
() => console.log(`let.do(complete)`)
);
})
.do(
res => console.log(`do(res): ${res}`),
err => console.log(`do(err): ${err}`),
() => console.log(`do(complete)`)
)
.subscribe(
res => console.log(`res: ${res}`),
err => console.log(`err: ${err}`),
() => console.log(`complete`)
);

// => let.do(res): a
// => do(res): a
// => res: a
// => let.do(res): bbbb
// => do(res): bbbb
// => res: bbbb
// => let.do(res): cccccccc
// => do(res): cccccccc
// => res: cccccccc
// => let.do(complete)
// => do(complete)
// => complete


lift


  • カスタムオペレータを差し込めるらしい


    • あまり良くわかってない。。。

    • うまく動かせなかった。。。




pipe


  • メソッドチェーンではない書き方ができる


    • メリットがわからない



Rx.Observable.range(1, 10)

.pipe(
filter(v => v % 2 === 0),
map(v => v * 10),
scan((acc, v) => acc + v, 0)
)
.subscribe(
res => console.log(res),
err => console.log('ここには来ない'),
() => console.log('complete')
);

// => 20
// => 60
// => 120
// => 200
// => 300
// => complete


toPromise


  • Observable ではなく Promise で最後の値を取得する


    • Promise で購読する場合は forEach() を使う?



Rx.Observable.of('a', 'bbbb', 'cccccccc')

.toPromise()
.then(res => console.log(res))
.catch(err => console.log('ここには来ない'));

// => cccccccc

Rx.Observable.of('a', 'bbbb', 'cccccccc')

.map(v => {
if (v.length > 4) {
// throw しても catch には飛ばない
return new Error('Error!');
}
return v;
})
.toPromise()
.then(res => console.log(res))
.catch(err => console.log('ここには来ない'));

// => Error: Error!


Filtering Operators

Method Type
Method Name

instance method
audit

instance method
auditTime

instance method
debounce

instance method
debounceTime

instance method
distinct

instance method
distinctUntilChanged

instance method
distinctUntilKeyChanged

instance method
elementAt

instance method
filter

instance method
first

instance method
ignoreElements

instance method
last

instance method
sample

instance method
sampleTime

instance method
skip

instance method
skipLast

instance method
skipUntil

instance method
skipWhile

instance method
take

instance method
takeLast

instance method
takeUntil

instance method
takeWhile

instance method
throttle

instance method
throttleTime


audit


  • Observable は durationSelector で指定した Observable が発行したら最新の値を発行する

Rx.Observable.interval(100)

.audit(v => Rx.Observable.timer(10000))
.subscribe(res => console.log(res));

// => 97
// => 195
// => 293
// ...

Rx.Observable.interval(100)

.map(v => {
if (v > 100) {
throw 'Error!';
}
return Rx.Observable.timer(10000);
})
.subscribe(
res => console.log(res),
err => console.log(err.message),
() => console.log('ここには来ない')
);

// => 96
// => 195
// => Error!


auditTime


  • Observable は duration で指定された時刻が来たら最新の値を発行する



    • audit() を先に知っておいた方が良い



Rx.Observable.interval(100)

.auditTime(10000)
.subscribe(res => console.log(res));

// => 98
// => 198
// => 298
// ...


debounce



  • durationSelector で指定した Observable が発行するまで遅延する



    • durationSelector が発火する前に次の値が発行したら遅延した値は消える

    • あまり良くわかってないが、そんな感じと思われる・・・。



Rx.Observable.interval(1000)

.debounce(v => Rx.Observable.timer(500))
.subscribe(res => console.log(res));

// => 0
// => 1
// => 2
// ...


debounceTime



  • dueTime で指定した時間まで遅延する



    • debounce() を先に知っておいた方が良い


    • dueTime が経つ前に次の値が発行したら遅延した値は消える

    • あまり良くわかってないが、そんな感じと思われる・・・。



Rx.Observable.interval(1000)

.debounceTime(2000)
.subscribe(res => console.log(res));


distinct


  • Observable が発行する値の重複を排除する

Rx.Observable.of(1, 1, 2, 2, 2, 1, 2, 4, 3, 2, 3, 1)

.distinct()
.subscribe(
res => console.log(res),
err => console.log('ここには来ない'),
() => console.log('complete')
);

// => 1
// => 2
// => 4
// => 3
// => complete


distinctUntilChanged


  • Observable が発行する値のうち、直前の値と同じ場合は排除する



    • distinct() を先に知っておいた方が良い



Rx.Observable.of(1, 1, 2, 2, 2, 1, 2, 4, 3, 2, 3, 1)

.distinctUntilChanged()
.subscribe(
res => console.log(res),
err => console.log('ここには来ない'),
() => console.log('complete')
);

// => 1
// => 2
// => 1
// => 2
// => 4
// => 3
// => 2
// => 3
// => 1
// => complete


distinctUntilKeyChanged


  • Observable が発行する値のうち、特定の Key に対して直前の値と同じ場合は排除する



    • distinct() を先に知っておいた方が良い



Rx.Observable.of(

{ age: 10, name: 'Hoge' },
{ age: 20, name: 'Foo' },
{ age: 30, name: 'Foo' },
{ age: 10, name: 'Hoge' })
.distinctUntilKeyChanged('name')
.subscribe(
res => console.log(res),
err => console.log('ここには来ない'),
() => console.log('complete')
);

// => { age: 10, name: 'Hoge' }
// => { age: 20, name: 'Foo' }
// => { age: 10, name: 'Hoge' }
// => complete

Rx.Observable.of(

{ age: 10, name: 'Hoge' },
{ age: 20, name: 'Foo' },
{ age: 20, name: 'Hoge' },
{ age: 10, name: 'Foo' })
.distinctUntilKeyChanged('age')
.subscribe(
res => console.log(res),
err => console.log('ここには来ない'),
() => console.log('complete')
);

// => { age: 10, name: 'Hoge' }
// => { age: 20, name: 'Foo' }
// => { age: 10, name: 'Foo' }
// => complete


elementAt


  • Observable が発行する値のうち、特定の値を発行する

Rx.Observable.of('A', 'B', 'C')

.elementAt(1)
.subscribe(
res => console.log(res),
err => console.log('ここには来ない'),
() => console.log('complete')
);

// => B
// => complete


filter


  • Observable が発行する値を predicate で指定した条件にフィルタする



    • Array.filter() と同じ



Rx.Observable.range(1, 5)

.filter(v => v % 2 === 0)
.subscribe(
res => console.log(res),
err => console.log('ここには来ない'),
() => console.log('complete')
);

// => 2
// => 4
// => complete


first


  • Observable が最初に発行する値のみにフィルタする



    • predicate を指定して条件を指定することも可能



Rx.Observable.range(1, 5)

.first()
.subscribe(
res => console.log(res),
err => console.log('ここには来ない'),
() => console.log('complete')
);

// => 1
// => complete

Rx.Observable.range(1, 5)

.first(v => v % 2 === 0)
.subscribe(
res => console.log(res),
err => console.log('ここには来ない'),
() => console.log('complete')
);

// => 2
// => complete


ignoreElements


  • Observable が発行する全ての値を無視する

Rx.Observable.range(1, 5)

.ignoreElements()
.subscribe(
res => console.log('ここには来ない'),
err => console.log('ここには来ない'),
() => console.log('complete')
);

// => complete


last


  • Observable が最初に発行する値のみにフィルタする



    • predicate を指定して条件を指定することも可能



Rx.Observable.range(1, 5)

.last()
.subscribe(
res => console.log(res),
err => console.log('ここには来ない'),
() => console.log('complete')
);

// => 5
// => complete

Rx.Observable.range(1, 5)

.last(v => v % 2 === 0)
.subscribe(
res => console.log(res),
err => console.log('ここには来ない'),
() => console.log('complete')
);

// => 4
// => complete


sample


  • notifier で指定された Observable による通知があった時に最新の値を発行する


    • 最新の値に変化がない場合は何も起きない



Rx.Observable.interval(1000)

.sample(Rx.Observable.interval(5000))
.subscribe(res => console.log(res));

// => 3
// => 8
// => 13
// => ...

Rx.Observable.interval(5000)

.sample(Rx.Observable.interval(1000))
.subscribe(res => console.log(res));

// => 0
// => 1
// => 2
// => 3
// => ...


sampleTime



  • period で指定された時間が経った時に最新の値を発行する



    • sample() を先に知っておいた方が良い

    • 最新の値に変化がない場合は何も起きない



Rx.Observable.interval(1000)

.sampleTime(5000)
.subscribe(res => console.log(res));

// => 3
// => 8
// => 13
// => ...

Rx.Observable.interval(5000)

.sampleTime(1000)
.subscribe(res => console.log(res));

// => 0
// => 1
// => 2
// => 3
// => ...


skip


  • Observable が発行する値を count だけスキップする

Rx.Observable.range(1, 5)

.skip(3)
.subscribe(
res => console.log(res),
err => console.log('ここには来ない'),
() => console.log('complete')
);

// => 4
// => 5
// => complete


skipLast


  • Observable が発行する値を count だけ後ろからスキップする



    • skip() を先に知っておいた方が良い



Rx.Observable.range(1, 5)

.skipLast(3)
.subscribe(
res => console.log(res),
err => console.log('ここには来ない'),
() => console.log('complete')
);

// => 1
// => 2
// => complete


skipUntil



  • notifier で指定された Observable による通知があるまでスキップする



    • skip() を先に知っておいた方が良い



Rx.Observable.interval(1000)

.skipUntil(Rx.Observable.timer(5000))
.subscribe(res => console.log(res));

// => 4
// => 5
// => ...


skipWhile



  • predicate で指定する条件が true の間はスキップする



    • skip() を先に知っておいた方が良い

    • 一度条件から外れると発行が行われる

    • (途中から再びスキップされる。とかは無い)



Rx.Observable.interval(1000)

.skipWhile(v => v < 5)
.subscribe(res => console.log(res));

// => 5
// => 6
// => ...

Rx.Observable.interval(1000)

.skipWhile(v => v > 5)
.subscribe(res => console.log(res));

// => 0
// => 1
// => 2
// => 3
// => 4
// => 5
// => 6
// => 7
// => ...


take


  • Observable が発行する値のうち count 回だけ発行する

Rx.Observable.interval(1000)

.take(3)
.subscribe(
res => console.log(res),
err => console.log('ここには来ない'),
() => console.log('complete')
);

// => 0
// => 1
// => 2
// => complete


takeLast


  • Observable が発行する値のうち、ラスト count 回だけ発行する



    • take() を先に知っておいた方が良い



Rx.Observable.interval(1000)

.take(5)
.takeLast(2)
.subscribe(
res => console.log(res),
err => console.log('ここには来ない'),
() => console.log('complete')
);

// => 3
// => 4
// => complete


takeUntil



  • notifier に指定された Observable による通知があるまで発行する



    • take() を先に知っておいた方が良い



Rx.Observable.interval(1000)

.takeUntil(Rx.Observable.timer(5000))
.subscribe(
res => console.log(res),
err => console.log('ここには来ない'),
() => console.log('complete')
);

// => 0
// => 1
// => 2
// => 3
// => complete


takeWhile



  • predicate が true の間は発行する



    • take() を先に知っておいた方が良い

    • 一度条件から外れたら終了

    • (途中から再び発行し始める。とかは無い)



Rx.Observable.interval(1000)

.takeWhile(v => v < 5)
.subscribe(
res => console.log(res),
err => console.log('ここには来ない'),
() => console.log('complete')
);

// => 0
// => 1
// => 2
// => 3
// => 4
// => complete


throttle


  • だいたい sample() と同じ動きをする?



    • durationSelector が発行されるまで Observable の発行は無視される



Rx.Observable.interval(1000)

.throttle(v => Rx.Observable.timer(5000))
.subscribe(res => console.log(res));

// => 0
// => 5
// => 10
// => ...


throttleTime



  • duration で指定するバージョン



    • throttle() を先に知っておいた方が良い



Rx.Observable.interval(1000)

.throttleTime(5000)
.subscribe(res => console.log(res));

// => 0
// => 5
// => 10
// => ...


Combination Operators

Method Type
Method Name

class method
combineLatest

class method
concat

class method
forkJoin

class method
merge

class method
zip

instance method
combineAll

instance method
combineLatest

instance method
concat

instance method
concatAll

instance method
exhaust

instance method
merge

instance method
mergeAll

instance method
race

instance method
startWith

instance method
switch

instance method
withLatestFrom

instance method
zip

instance method
zipAll


combineLatest


  • 複数の Observable を組み合わせる

const observable1 = Rx.Observable.timer(0, 1000).map(v => `A${v}`);

const observable2 = Rx.Observable.timer(0, 3000).map(v => `B${v}`);
Rx.Observable.combineLatest(observable1, observable2)
.subscribe(res => console.log(res));

// => [ 'A0', 'B0' ]
// (1s 待機)
// => [ 'A1', 'B0' ]
// (1s 待機)
// => [ 'A2', 'B0' ]
// (1s 待機)
// => [ 'A2', 'B1' ]
// => [ 'A3', 'B1' ]
// (1s 待機)
// => [ 'A4', 'B1' ]
// (1s 待機)
// => [ 'A5', 'B1' ]
// (1s 待機)
// => [ 'A5', 'B2' ]
// => [ 'A6', 'B2' ]
// (1s 待機)
// => [ 'A7', 'B2' ]
// (1s 待機)
// => [ 'A8', 'B2' ]
// (1s 待機)
// => [ 'A8', 'B3' ]
// => [ 'A9', 'B3' ]
// (1s 待機)
// ...


concat


  • 複数の Observable を順次実行する

const observable1 = Rx.Observable.timer(0, 1000).map(v => `A${v}`).take(5);

const observable2 = Rx.Observable.timer(0, 500).map(v => `B${v}`).take(5);
const observable3 = Rx.Observable.timer(0, 2000).map(v => `C${v}`).take(5);
Rx.Observable.concat(observable1, observable2, observable3)