はじめに
何故こんなものを書いてしまったのか・・・orz
いつも触っているフレームワークが Angular.js
から Angular
に変わり、
HTTPClient
の戻り値が ng.IPromise
から Observable
になっていました。
ReactiveX
自体は Android
や iOS
を開発していた頃に RxJava
や RxSwift
を通して、名前だけ知ってましたので、RxJS
もその辺のものなんだなーという程度の認識で、Angularチュートリアルの知識で Observable
を使ってました。
Angular
を触っている限り Observable
は付いてくるし、
RxJS
を知っておけば他の RxSwift
とかでも活かせるだろうなと思って、
軽い気持ちで Observable
の機能をまとめてみようと思ったのがこの記事執筆の始まりです。
※途中までやって他の方々がもっと詳しくまとめている記事を見つけたりしましたが、
※途中までやっちゃったし、自分でまとめた方が理解できるし。って思って最後までまとめました。
Google 翻訳を駆使して、実際に動かしてみて、
自分なりの解釈でまとめてますので全然違うのもあるかと思います。
バージョンとか
- rxjs 5.5.6
Observable
Create Operators
Method Type | Method Name |
---|---|
class method | bindCallback |
class method | bindNodeCallback |
class method | create |
class method | defer |
class method | empty |
class method | from |
class method | fromEvent |
class method | fromEventPattern |
class method | fromPromise |
class method | interval |
class method | never |
class method | of |
class method | range |
class method | throw |
class method | timer |
instance method | repeat |
instance method | repeatWhen |
bindCallback
- コールバック付きの関数から Observable を作成する
function func(cb: () => void) {
setTimeout(() => {
cb();
}, 1000);
}
const observable = Rx.Observable.bindCallback(func);
observable().subscribe(() => console.log('success'));
// => success
function func(cb: (s1) => void) {
setTimeout(() => {
cb('Hello');
}, 1000);
}
const observable = Rx.Observable.bindCallback(func);
observable().subscribe(res => console.log(res));
// => Hello
function func(cb: (s1, s2) => void) {
setTimeout(() => {
cb('Hello', 'World!');
}, 1000);
}
const observable = Rx.Observable.bindCallback(func);
observable().subscribe(res => console.log(res));
// => [ 'Hello', 'World' ]
function func(value: number, cb: (n1) => void) {
setTimeout(() => {
cb(value * 10);
}, 1000);
}
const observable = Rx.Observable.bindCallback(func);
observable(5).subscribe(res => console.log(res));
// => 50
bindNodeCallback
-
(err, data) => void
のような良くあるコールバック付きの関数から Observable を作成する-
bindCallback()
を先に知っておいた方が良い -
bindCallback()
とだいたい同じ
-
function func(cb: (err, data) => void) {
setTimeout(() => {
cb(null, 'Hello');
}, 1000);
}
const observable = Rx.Observable.bindNodeCallback(func);
observable().subscribe(res => console.log(res));
// => Hello
function func(cb: (err, data1, data2) => void) {
setTimeout(() => {
cb(null, 'Hello', 'World!');
}, 1000);
}
const observable = Rx.Observable.bindNodeCallback(func);
observable().subscribe(res => console.log(res));
// => [ 'Hello', 'World!' ]
function func(cb: (err, data?) => void) {
setTimeout(() => {
cb('Error');
}, 1000);
}
const observable = Rx.Observable.bindNodeCallback(func);
observable().subscribe(
res => console.log('ここには来ない'),
err => console.log(err)
);
// => Error
create
- 新しい Observable を作成する
const observable = Rx.Observable.create(observer => {
observer.next(1);
observer.next(2);
observer.next(3);
observer.complete();
});
observable.subscribe(res => console.log(res));
// => 1
// => 2
// => 3
const observable = Rx.Observable.create(observer => {
observer.next(1);
observer.error('Error');
observer.next(2);
observer.complete();
});
observable.subscribe(
res => console.log(res),
err => console.log(err)
);
// => 1
// => Error
defer
- 遅延して Observable を作成する
Rx.Observable.defer(() => Rx.Observable.of('a', 'b', 'c'))
.subscribe(res => console.log(res));
// => a
// => b
// => c
empty
- 何も発行せず
complete
する Observable を作成する
Rx.Observable.empty()
.subscribe(
res => console.log('ここには来ない'),
err => console.log('ここには来ない'),
() => console.log('complete')
);
// => complete
from
- 配列から Observable を作成する
Rx.Observable.from([10, 20, 30])
.subscribe(res => console.log(res));
// => a
// => b
// => c
fromEvent
- DOM Events や EventEmitter から Observable を作成する
Rx.Observable.fromEvent(document, 'click')
.subscribe(evt => console.log(`x: ${evt.x}, y: ${evt.y}`));
// => x: 299, y: 527
// => x: 332, y: 547
// => x: 343, y: 328
fromEventPattern
-
addHandler
,removeHandler
に基づいて Observable を作成する-
fromEvent()
を先に知っておいた方が良い - あまり良くわかってないが、たぶん
fromEvent()
とだいたい同じ
-
Rx.Observable.fromEventPattern(
(handler: any) => document.addEventListener('click', handler),
(handler: any) => document.removeEventListener('click', handler)
).subscribe(evt => console.log(`x: ${evt.x}, y: ${evt.y}`));
// => x: 299, y: 527
// => x: 332, y: 547
// => x: 343, y: 328
fromPromise
- Promise から Observable を作成する
function func(cb: () => void) {
return new Promise((resolve, reject) => {
setTimeout(() => {
resolve('success');
}, 1000);
});
}
Rx.Observable.fromPromise(func())
.subscribe(res => console.log(res));
// => success
function func(cb: () => void) {
return new Promise((resolve, reject) => {
setTimeout(() => {
reject('error');
}, 1000);
});
}
Rx.Observable.fromPromise(func())
.subscribe(
res => console.log('ここには来ない'),
err => console.log(err)
);
// => error
interval
- 定期的に発行する Observable を作成する
Rx.Observable.interval(1000)
.subscribe(res => console.log(res));
// => 0
// => 1
// => 2
// ...
never
- 何もしない Observable を作成する
Rx.Observable.never()
.subscribe(
res => console.log('ここには来ない'),
err => console.log('ここには来ない'),
() => console.log('ここには来ない')
);
// => (何も起きない)
of
- 指定した値を発行する Observable を作成する
Rx.Observable.of(1, 2, 3)
.subscribe(res => console.log(res));
// => 1
// => 2
// => 3
Rx.Observable.of('a', 'b', 'c')
.subscribe(res => console.log(res));
// => a
// => b
// => c
Rx.Observable.of<any>([1, 2, 3], ['a', 'b', 'c'])
.subscribe(res => console.log(res));
// => [ 1, 2, 3 ]
// => [ 'a', 'b', 'c' ]
range
-
start
からcount
個の数値を発行する Observable を作成する
Rx.Observable.range(1, 10)
.subscribe(res => console.log(res));
// => 1
// => 2
// => 3
// => ...
// => 9
// => 10
Rx.Observable.range(5, 10)
.subscribe(res => console.log(res));
// => 5
// => 6
// => 7
// => ...
// => 13
// => 14
throw
- エラーを発行する Observable を作成する
Rx.Observable.throw('Error')
.subscribe(
res => console.log('ここには来ない'),
err => console.log(err)
);
// => Error
timer
- 開始時間を指定して定期的に発行する Observable を作成する
-
interval()
とだいたい同じ
-
Rx.Observable.timer(3000, 1000)
.subscribe(res => console.log(res));
// => 0
// => 1
// => 2
// => ...
repeat
- 本線を指定回数繰り返す Observable を作成する
Rx.Observable.of('a', 'b', 'c')
.repeat(3)
.subscribe(res => console.log(res));
// => a
// => b
// => c
// => a
// => b
// => c
// => a
// => b
// => c
repeatWhen
-
notifier
が発行された時に本線を繰り返す Observable を作成する?-
repeat()
を先に知っておいた方が良い - あまり良くわかってないが、たぶん
repeat()
とだいたい同じ
-
Rx.Observable.of('a', 'b', 'c')
.repeatWhen(notifier => notifier.take(3)) // +3回繰り返される
.subscribe(res => console.log(res));
// => a
// => b
// => c
// => a
// => b
// => c
// => a
// => b
// => c
// => a
// => b
// => c
Mathematical and Aggregate Operators
Method Type | Method Name |
---|---|
instance method | count |
instance method | max |
instance method | min |
count
- Observable が発行する値の個数を発行する
Rx.Observable.of('a', 'bb', 'ccc')
.count()
.subscribe(res => console.log(res));
// => 3
Rx.Observable.of('a', 'bb', 'ccc')
.count(v => v.length >= 2)
.subscribe(res => console.log(res));
// => 2
max
- Observable が発行する値の中で最大値を発行する
Rx.Observable.of(5, 4, -1, 8, 2)
.max()
.subscribe(res => console.log(res));
// => 8
min
- Observable が発行する値の中で最小値を発行する
Rx.Observable.of(5, 4, -1, 8, 2)
.min()
.subscribe(res => console.log(res));
// => -1
Conditional and Boolean Operators
Method Type | Method Name |
---|---|
instance method | defaultIfEmpty |
instance method | every |
instance method | find |
instance method | findIndex |
instance method | isEmpty |
instance method | sequenceEqual |
instance method | single |
defaultIfEmpty
- Observable の発行が Empty だった場合にデフォルト値を発行する
Rx.Observable.empty()
.defaultIfEmpty('defaultValue')
.subscribe(res => console.log(res));
// => defaultValue
Rx.Observable.empty()
.defaultIfEmpty({ data: 'defaultValue'})
.subscribe(res => console.log(res));
// => d{ data: 'defaultValue' }
Rx.Observable.of('a', 'bb', 'ccc')
.defaultIfEmpty('defaultValue')
.subscribe(res => console.log(res));
// => a
// => bb
// => ccc
every
-
predicate
で指定する条件を全ての値が満たしているかどうかを発行する-
Array.every()
と同じ
-
Rx.Observable.of(1, 2, 3, 4, 5)
.every(v => v <= 5)
.subscribe(res => console.log(res));
// => true
Rx.Observable.of(1, 2, 3, 4, 5)
.every(v => v < 5)
.subscribe(res => console.log(res));
// => false
find
-
predicate
で指定する条件を満たす値を発行する-
Array.find()
と同じ
-
Rx.Observable.of(1, 2, 3, 4, 5)
.find(v => v === 4)
.subscribe(res => console.log(res));
// => 4
Rx.Observable.of(1, 2, 3, 4, 5)
.find(v => v % 2 === 0)
.subscribe(res => console.log(res));
// => 2
Rx.Observable.of(1, 2, 3, 4, 5)
.find(v => v === 10)
.subscribe(res => console.log(res));
// => undefined
findIndex
-
predicate
で指定する条件を満たす値のインデックスを発行する-
Array.findIndex()
と同じ
-
Rx.Observable.of(1, 2, 3, 4, 5)
.findIndex(v => v === 4)
.subscribe(res => console.log(res));
// => 3
Rx.Observable.of(1, 2, 3, 4, 5)
.findIndex(v => v % 2 === 0)
.subscribe(res => console.log(res));
// => 1
Rx.Observable.of(1, 2, 3, 4, 5)
.findIndex(v => v === 10)
.subscribe(res => console.log(res));
// => -1
isEmpty
- Observable の発行する値が Empty かどうかを発行する
Rx.Observable.empty()
.isEmpty()
.subscribe(res => console.log(res));
// => true
Rx.Observable.of('a', 'bb', 'ccc')
.isEmpty()
.subscribe(res => console.log(res));
// => false
sequenceEqual
- 同じ発行のある Observable かどうかを発行する
const compareObservable = Rx.Observable.range(5, 10);
Rx.Observable.of(5, 6, 7, 8, 9, 10, 11, 12, 13, 14)
.sequenceEqual(compareObservable)
.subscribe(res => console.log(res));
// => true
const compareObservable = Rx.Observable.range(5, 10);
Rx.Observable.of(5, 6, 7, 8, 9, 10)
.sequenceEqual(compareObservable)
.subscribe(res => console.log(res));
// => false
single
-
predicate
で指定する値が1つしかない場合は成功
Rx.Observable.of('a', 'b', 'c', 'b', 'e')
.single(v => v === 'a')
.subscribe(
res => console.log(res),
err => console.log('ここには来ない'),
() => console.log('complete')
);
// => a
// => complete
Rx.Observable.of('a', 'b', 'c', 'b', 'e')
.single(v => v === 'b')
.subscribe(
res => console.log('ここには来ない'),
err => console.log(err),
() => console.log('ここには来ない')
);
// => Sequence contains more than one element
Error Handling Operators
Method Type | Method Name |
---|---|
instance method | catch |
instance method | retry |
instance method | retryWhen |
instance method | onErrorResumeNext |
catch
- Observable がエラーを発行したら次の Observable を発行する
Rx.Observable.throw('Error')
.catch(err => Rx.Observable.of('a', 'bb', 'ccc'))
.subscribe(
res => console.log(res),
err => console.log('ここには来ない'),
() => console.log('complete')
);
// => a
// => bb
// => ccc
// => complete
Rx.Observable.interval(1000)
.map(v => {
if (v === 4) {
throw 'Error';
}
return v;
})
.catch(err => Rx.Observable.of('IV', 'V', 'VI'))
.subscribe(
res => console.log(res),
err => console.log('ここには来ない'),
() => console.log('complete')
);
// => 1
// => 2
// => 3
// => IV
// => V
// => VI
// => complete
retry
- Observable がエラーを発行したら
count
だけリトライする
Rx.Observable.throw('Error')
.retry(3)
.subscribe(
res => console.log(res),
err => console.log(err),
() => console.log('ここには来ない')
);
// => Error
Rx.Observable.interval(1000)
.map(v => {
if (v === 4) {
throw 'Error';
}
return v;
})
.retry(3)
.subscribe(
res => console.log(res),
err => console.log(err),
() => console.log('ここには来ない')
);
// => 1
// => 2
// => 3
// => 1
// => 2
// => 3
// => 1
// => 2
// => 3
// => 1
// => 2
// => 3
// => Error
retryWhen
- Observable がエラーを発行したら
notifier
を基にリトライを行う-
retry()
を先に知っておいた方が良い - あまり良くわかってないが、たぶん
retry()
とだいたい同じ
-
Rx.Observable.interval(1000)
.map(v => {
if (v === 4) {
throw 'Error';
}
return v;
})
.do(
res => console.log(`do(res): ${res}`),
err => console.log(`do(err): ${err}`),
() => console.log(`do(complete)`)
)
.retryWhen(errors => {
return errors.delay(200).do(
res => console.log(`errors(res): ${res}`), // エラーが発生するとここに飛んでくるんだな?
err => console.log(`errors(err): ${err}`),
() => console.log(`errors(complete)`)
);
})
.subscribe(
res => console.log(`res: ${res}`),
err => console.log(`err: ${err}`),
() => console.log(`complete`),
);
// => do(res): 0
// => res: 0
// => do(res): 1
// => res: 1
// => do(res): 2
// => res: 2
// => do(res): 3
// => res: 3
// => do(err): Error
// => errors(res): Error
// => do(res): 0
// => res: 0
// => do(res): 1
// => res: 1
// => do(res): 2
// => res: 2
// => do(res): 3
// => res: 3
// => do(err): Error
// => errors(res): Error
// ...
onErrorResumeNext
- Observable がエラーを発行したら次の Observable を発行する
-
catch
と同じような感じ - 但し、コールバックに
err
は飛んでこないのでエラーハンドリングしたい場合はcatch
を使うと良い?
-
Rx.Observable.interval(1000)
.map(v => {
if (v === 4) {
throw 'Error';
}
return v;
})
.onErrorResumeNext(Rx.Observable.of('IV', 'V', 'VI'))
.subscribe(
res => console.log(res),
err => console.log('ここには来ない'),
() => console.log('complete')
);
// => 0
// => 1
// => 2
// => 3
// => IV
// => V
// => VI
// => complete
Utility Operators
Method Type | Method Name |
---|---|
instance method | delay |
instance method | delayWhen |
instance method | dematerialize |
instance method | do |
instance method | materialize |
instance method | observeOn |
instance method | subscribeOn |
instance method | timeInterval |
instance method | timeout |
instance method | timeoutWith |
instance method | timestamp |
instance method | toArray |
class method | webSocket |
instance method | finally |
instance method | forEach |
instance method | let |
instance method | lift |
instance method | pipe |
instance method | toPromise |
delay
- Observable の発行を遅延させる
Rx.Observable.of('a', 'bb', 'ccc')
.delay(5000)
.subscribe(res => console.log(res));
// => a (5s after)
// => bb
// => ccc
delayWhen
- Observable の発行を
delayDurationSelector
で返却する Observable が発行させる-
delay()
を先に知っておいた方が良い
-
Rx.Observable.of('a', 'bbbb', 'cccccccc')
.delayWhen(v => {
return Rx.Observable.timer(v.length * 1000)
})
.subscribe(res => console.log(res));
// => a (1s after)
// => bbbb (4s after)
// => cccccccc (8s after)
dematerialize
-
Rx.Notification
をObservable
に変換する?-
materialize()
の逆(先にmaterialize()
見たほうがいいかも) - あまり良くわかってない。。。
-
const notifA = new Rx.Notification('N', 'a');
const notifB = new Rx.Notification('N', 'b');
const notifE = new Rx.Notification('E', undefined, new TypeError('x.toUpperCase() is not a function'));
Rx.Observable.of(notifA, notifB, notifE)
.dematerialize()
.subscribe(
res => console.log(res),
err => console.log(err.message),
() => console.log('ここには来ない'));
// => a
// => b
// => x.toUpperCase() is not a function
do
- Observable の発行を subscribe する前にフックする
Rx.Observable.of('a', 'bb', 'ccc')
.do(
res => console.log(`do(res): ${res}`),
err => console.log(`ここには来ない`),
() => console.log(`do(complete)`)
)
.subscribe(
res => console.log(`res: ${res}`),
err => console.log(`ここには来ない`),
() => console.log(`complete`)
);
// => do(res): a
// => res: a
// => do(res): bb
// => res: bb
// => do(res): ccc
// => res: ccc
// => do(complete)
// => complete
materialize
-
Observable
をRx.Notification
に変換する?- あまり良くわかってない。。。
Rx.Observable.of<any>('a', 'b', undefined, 'd').map(v => v.toUpperCase())
.materialize()
.subscribe(
res => console.log(res),
err => console.log('ここには来ない'),
() => console.log('complete')
);
// => Notification { kind: 'N', value: 'A', error: undefined, hasValue: true }
// => Notification { kind: 'N', value: 'B', error: undefined, hasValue: true }
// => Notification {
// kind: 'E',
// value: undefined,
// error: TypeError: Cannot read property 'toUpperCase' of undefined
// ...
// complete
observeOn
- Observable の発行を「次のタイミング」で行う?
- あまり良くわかってない。。。
const observable: Rx.Observable<any> = Rx.Observable.create(observer => {
console.log('[1] before next()');
observer.next(1);
console.log('[1] after next()');
console.log('[2] before next()');
observer.next(2);
console.log('[2] after next()');
console.log('[3] before complete()');
observer.complete();
console.log('[3] after complete()');
});
console.log('[4] before subscribe');
observable.observeOn(Rx.Scheduler.async)
.subscribe(
next => console.log(`[emit] next: ${next}`),
err => console.log(`[emit] error: ${err}`),
() => console.log(`[emit] complete`)
);
console.log('[4] after subscribe');
// => [4] before subscribe
// => [1] before next()
// => [1] after next()
// => [2] before next()
// => [2] after next()
// => [3] before complete()
// => [3] after complete()
// => [4] after subscribe
// => [emit] next: 1
// => [emit] next: 2
// => [emit] complete
subscribeOn
- Observable の実行を「次のタイミング」で行う?
- あまり良くわかってない。。。
const o1: Rx.Observable<any> = Rx.Observable.create(observer => {
console.log('[1] before next()');
observer.next(1);
console.log('[1] after next()');
console.log('[2] before next()');
observer.next(2);
console.log('[2] after next()');
console.log('[3] before complete()');
observer.complete();
console.log('[3] after complete()');
});
console.log('[4] before subscribe');
o1.subscribeOn(Rx.Scheduler.async)
.subscribe(
next => console.log(`[emit] next: ${next}`),
err => console.log(`[emit] error: ${err}`),
() => console.log(`[emit] complete`)
);
console.log('[4] after subscribe');
// => [4] before subscribe
// => [4] after subscribe
// => [1] before next()
// => [emit] next: 1
// => [1] after next()
// => [2] before next()
// => [emit] next: 2
// => [2] after next()
// => [3] before complete()
// => [emit] complete:
// => [3] after complete()
timeInterval
- 前の発行からどれくらい経ったかを発行する
Rx.Observable.of('a', 'bbbb', 'cccccccc')
.delayWhen(v => Rx.Observable.timer(v.length * 1000))
.timeInterval()
.subscribe(
res => console.log(res),
err => console.log('ここには来ない'),
res => console.log('complete'),
);
// => TimeInterval { value: 'a', interval: 1005 }
// => TimeInterval { value: 'bbbb', interval: 3000 }
// => TimeInterval { value: 'cccccccc', interval: 4000 }
// => complete
timeout
- タイムアウト時刻まで発行が無い場合はエラーを発行する
Rx.Observable.timer(5000, 1000)
.timeout(1000)
.subscribe(
res => console.log('ここには来ない'),
err => console.log(err.message),
() => console.log('ここには来ない')
);
// => Timeout has occurred
timeoutWith
- タイムアウト時刻まで発行が無い場合は別の Observable を発行する
-
timeout()
を先に知っておいた方が良い -
timeout()
+catch()
な感じ?
-
Rx.Observable.timer(5000, 1000)
.timeoutWith(1000, Rx.Observable.of('a', 'bb', 'ccc'))
.subscribe(
res => console.log(res),
err => console.log('ここには来ない'),
() => console.log('complete')
);
// => a
// => bb
// => ccc
// => complete
timestamp
- 発行時刻が付加される
Rx.Observable.of('a', 'bbbb', 'cccccccc')
.delayWhen(v => Rx.Observable.timer(v.length * 1000))
.timestamp()
.subscribe(
res => console.log(res),
err => console.log('ここには来ない'),
() => console.log('complete')
);
// => Timestamp { value: 'a', timestamp: 1521783521464 }
// => Timestamp { value: 'bbbb', timestamp: 1521783524459 }
// => Timestamp { value: 'cccccccc', timestamp: 1521783528462 }
// => complete
toArray
- 発行された値を配列に変換する
Rx.Observable.of('a', 'b', 'c')
.toArray()
.subscribe(
res => console.log(res),
err => console.log('ここには来ない'),
() => console.log('complete')
);
// => [ 'a', 'b', 'c' ]
// => complete
webSocket
- WebSocket を WebSocketSubject にラップする
- あまり良くわかってない。。。
- うまく動かせなかった。。。
const ws = Rx.Observable.webSocket('ws://localhost:8081');
ws.subscribe(
res => console.log('message received: ' + res),
err => console.log(err),
() => console.log('complete')
);
ws.next(JSON.stringify({ op: 'hello' }));
finally
- 一番最後に発火する関数を定義する
Rx.Observable.of('a', 'bb', 'ccc')
.finally(() => {
console.log('Finally!')
})
.subscribe(
res => console.log(res),
err => console.log('ここには来ない'),
() => console.log('complete')
);
// => a
// => bb
// => ccc
// => complete
// => Finally!
Rx.Observable.of('a', 'bb', 'ccc')
.map(v => {
if (v.length > 2) {
throw new Error('Error!');
}
return v;
})
.finally(() => {
console.log('Finally!')
})
.subscribe(
res => console.log(res),
err => console.log(err.message),
() => console.log('ここには来ない')
);
// => a
// => bb
// => Error!
// => Finally!
forEach
- Subscribe ではなく Promise で購読する
Rx.Observable.of('a', 'bb', 'ccc')
.forEach(v => console.log(`v: ${v}`))
.then(res => console.log(`then: ${res}`))
.catch(err => console.log('ここには来ない'));
// => v: a
// => v: bb
// => v: ccc
// => then: undefined
Rx.Observable.of('a', 'bb', 'ccc')
.map(v => {
if (v.length > 2) {
throw new Error('Error!')
}
return v;
})
.forEach(v => console.log(`v: ${v}`))
.then(res => console.log('ここには来ない'))
.catch(err => console.log(`catch: ${err.message}`));
// => v: a
// => v: bb
// => catch: Error!
let
- Observable が発行を Subscribeする前にフックできる?
-
do()
の Observable 版? - あまり良くわかってない。。。
-
Rx.Observable.of('a', 'bbbb', 'cccccccc')
.let(selector => {
return selector.do(
res => console.log(`let.do(res): ${res}`),
err => console.log(`let.do(err): ${err}`),
() => console.log(`let.do(complete)`)
);
})
.do(
res => console.log(`do(res): ${res}`),
err => console.log(`do(err): ${err}`),
() => console.log(`do(complete)`)
)
.subscribe(
res => console.log(`res: ${res}`),
err => console.log(`err: ${err}`),
() => console.log(`complete`)
);
// => let.do(res): a
// => do(res): a
// => res: a
// => let.do(res): bbbb
// => do(res): bbbb
// => res: bbbb
// => let.do(res): cccccccc
// => do(res): cccccccc
// => res: cccccccc
// => let.do(complete)
// => do(complete)
// => complete
lift
- カスタムオペレータを差し込めるらしい
- あまり良くわかってない。。。
- うまく動かせなかった。。。
pipe
- メソッドチェーンではない書き方ができる
- メリットがわからない
Rx.Observable.range(1, 10)
.pipe(
filter(v => v % 2 === 0),
map(v => v * 10),
scan((acc, v) => acc + v, 0)
)
.subscribe(
res => console.log(res),
err => console.log('ここには来ない'),
() => console.log('complete')
);
// => 20
// => 60
// => 120
// => 200
// => 300
// => complete
toPromise
- Observable ではなく Promise で最後の値を取得する
- Promise で購読する場合は
forEach()
を使う?
- Promise で購読する場合は
Rx.Observable.of('a', 'bbbb', 'cccccccc')
.toPromise()
.then(res => console.log(res))
.catch(err => console.log('ここには来ない'));
// => cccccccc
Rx.Observable.of('a', 'bbbb', 'cccccccc')
.map(v => {
if (v.length > 4) {
// throw しても catch には飛ばない
return new Error('Error!');
}
return v;
})
.toPromise()
.then(res => console.log(res))
.catch(err => console.log('ここには来ない'));
// => Error: Error!
Filtering Operators
Method Type | Method Name |
---|---|
instance method | audit |
instance method | auditTime |
instance method | debounce |
instance method | debounceTime |
instance method | distinct |
instance method | distinctUntilChanged |
instance method | distinctUntilKeyChanged |
instance method | elementAt |
instance method | filter |
instance method | first |
instance method | ignoreElements |
instance method | last |
instance method | sample |
instance method | sampleTime |
instance method | skip |
instance method | skipLast |
instance method | skipUntil |
instance method | skipWhile |
instance method | take |
instance method | takeLast |
instance method | takeUntil |
instance method | takeWhile |
instance method | throttle |
instance method | throttleTime |
audit
- Observable は
durationSelector
で指定した Observable が発行したら最新の値を発行する
Rx.Observable.interval(100)
.audit(v => Rx.Observable.timer(10000))
.subscribe(res => console.log(res));
// => 97
// => 195
// => 293
// ...
Rx.Observable.interval(100)
.map(v => {
if (v > 100) {
throw 'Error!';
}
return Rx.Observable.timer(10000);
})
.subscribe(
res => console.log(res),
err => console.log(err.message),
() => console.log('ここには来ない')
);
// => 96
// => 195
// => Error!
auditTime
- Observable は
duration
で指定された時刻が来たら最新の値を発行する-
audit()
を先に知っておいた方が良い
-
Rx.Observable.interval(100)
.auditTime(10000)
.subscribe(res => console.log(res));
// => 98
// => 198
// => 298
// ...
debounce
-
durationSelector
で指定した Observable が発行するまで遅延する-
durationSelector
が発火する前に次の値が発行したら遅延した値は消える - あまり良くわかってないが、そんな感じと思われる・・・。
-
Rx.Observable.interval(1000)
.debounce(v => Rx.Observable.timer(500))
.subscribe(res => console.log(res));
// => 0
// => 1
// => 2
// ...
debounceTime
-
dueTime
で指定した時間まで遅延する-
debounce()
を先に知っておいた方が良い -
dueTime
が経つ前に次の値が発行したら遅延した値は消える - あまり良くわかってないが、そんな感じと思われる・・・。
-
Rx.Observable.interval(1000)
.debounceTime(2000)
.subscribe(res => console.log(res));
distinct
- Observable が発行する値の重複を排除する
Rx.Observable.of(1, 1, 2, 2, 2, 1, 2, 4, 3, 2, 3, 1)
.distinct()
.subscribe(
res => console.log(res),
err => console.log('ここには来ない'),
() => console.log('complete')
);
// => 1
// => 2
// => 4
// => 3
// => complete
distinctUntilChanged
- Observable が発行する値のうち、直前の値と同じ場合は排除する
-
distinct()
を先に知っておいた方が良い
-
Rx.Observable.of(1, 1, 2, 2, 2, 1, 2, 4, 3, 2, 3, 1)
.distinctUntilChanged()
.subscribe(
res => console.log(res),
err => console.log('ここには来ない'),
() => console.log('complete')
);
// => 1
// => 2
// => 1
// => 2
// => 4
// => 3
// => 2
// => 3
// => 1
// => complete
distinctUntilKeyChanged
- Observable が発行する値のうち、特定の Key に対して直前の値と同じ場合は排除する
-
distinct()
を先に知っておいた方が良い
-
Rx.Observable.of(
{ age: 10, name: 'Hoge' },
{ age: 20, name: 'Foo' },
{ age: 30, name: 'Foo' },
{ age: 10, name: 'Hoge' })
.distinctUntilKeyChanged('name')
.subscribe(
res => console.log(res),
err => console.log('ここには来ない'),
() => console.log('complete')
);
// => { age: 10, name: 'Hoge' }
// => { age: 20, name: 'Foo' }
// => { age: 10, name: 'Hoge' }
// => complete
Rx.Observable.of(
{ age: 10, name: 'Hoge' },
{ age: 20, name: 'Foo' },
{ age: 20, name: 'Hoge' },
{ age: 10, name: 'Foo' })
.distinctUntilKeyChanged('age')
.subscribe(
res => console.log(res),
err => console.log('ここには来ない'),
() => console.log('complete')
);
// => { age: 10, name: 'Hoge' }
// => { age: 20, name: 'Foo' }
// => { age: 10, name: 'Foo' }
// => complete
elementAt
- Observable が発行する値のうち、特定の値を発行する
Rx.Observable.of('A', 'B', 'C')
.elementAt(1)
.subscribe(
res => console.log(res),
err => console.log('ここには来ない'),
() => console.log('complete')
);
// => B
// => complete
filter
- Observable が発行する値を
predicate
で指定した条件にフィルタする-
Array.filter()
と同じ
-
Rx.Observable.range(1, 5)
.filter(v => v % 2 === 0)
.subscribe(
res => console.log(res),
err => console.log('ここには来ない'),
() => console.log('complete')
);
// => 2
// => 4
// => complete
first
- Observable が最初に発行する値のみにフィルタする
-
predicate
を指定して条件を指定することも可能
-
Rx.Observable.range(1, 5)
.first()
.subscribe(
res => console.log(res),
err => console.log('ここには来ない'),
() => console.log('complete')
);
// => 1
// => complete
Rx.Observable.range(1, 5)
.first(v => v % 2 === 0)
.subscribe(
res => console.log(res),
err => console.log('ここには来ない'),
() => console.log('complete')
);
// => 2
// => complete
ignoreElements
- Observable が発行する全ての値を無視する
Rx.Observable.range(1, 5)
.ignoreElements()
.subscribe(
res => console.log('ここには来ない'),
err => console.log('ここには来ない'),
() => console.log('complete')
);
// => complete
last
- Observable が最初に発行する値のみにフィルタする
-
predicate
を指定して条件を指定することも可能
-
Rx.Observable.range(1, 5)
.last()
.subscribe(
res => console.log(res),
err => console.log('ここには来ない'),
() => console.log('complete')
);
// => 5
// => complete
Rx.Observable.range(1, 5)
.last(v => v % 2 === 0)
.subscribe(
res => console.log(res),
err => console.log('ここには来ない'),
() => console.log('complete')
);
// => 4
// => complete
sample
-
notifier
で指定された Observable による通知があった時に最新の値を発行する- 最新の値に変化がない場合は何も起きない
Rx.Observable.interval(1000)
.sample(Rx.Observable.interval(5000))
.subscribe(res => console.log(res));
// => 3
// => 8
// => 13
// => ...
Rx.Observable.interval(5000)
.sample(Rx.Observable.interval(1000))
.subscribe(res => console.log(res));
// => 0
// => 1
// => 2
// => 3
// => ...
sampleTime
-
period
で指定された時間が経った時に最新の値を発行する-
sample()
を先に知っておいた方が良い - 最新の値に変化がない場合は何も起きない
-
Rx.Observable.interval(1000)
.sampleTime(5000)
.subscribe(res => console.log(res));
// => 3
// => 8
// => 13
// => ...
Rx.Observable.interval(5000)
.sampleTime(1000)
.subscribe(res => console.log(res));
// => 0
// => 1
// => 2
// => 3
// => ...
skip
- Observable が発行する値を
count
だけスキップする
Rx.Observable.range(1, 5)
.skip(3)
.subscribe(
res => console.log(res),
err => console.log('ここには来ない'),
() => console.log('complete')
);
// => 4
// => 5
// => complete
skipLast
- Observable が発行する値を
count
だけ後ろからスキップする-
skip()
を先に知っておいた方が良い
-
Rx.Observable.range(1, 5)
.skipLast(3)
.subscribe(
res => console.log(res),
err => console.log('ここには来ない'),
() => console.log('complete')
);
// => 1
// => 2
// => complete
skipUntil
-
notifier
で指定された Observable による通知があるまでスキップする-
skip()
を先に知っておいた方が良い
-
Rx.Observable.interval(1000)
.skipUntil(Rx.Observable.timer(5000))
.subscribe(res => console.log(res));
// => 4
// => 5
// => ...
skipWhile
-
predicate
で指定する条件が true の間はスキップする-
skip()
を先に知っておいた方が良い - 一度条件から外れると発行が行われる
- (途中から再びスキップされる。とかは無い)
-
Rx.Observable.interval(1000)
.skipWhile(v => v < 5)
.subscribe(res => console.log(res));
// => 5
// => 6
// => ...
Rx.Observable.interval(1000)
.skipWhile(v => v > 5)
.subscribe(res => console.log(res));
// => 0
// => 1
// => 2
// => 3
// => 4
// => 5
// => 6
// => 7
// => ...
take
- Observable が発行する値のうち
count
回だけ発行する
Rx.Observable.interval(1000)
.take(3)
.subscribe(
res => console.log(res),
err => console.log('ここには来ない'),
() => console.log('complete')
);
// => 0
// => 1
// => 2
// => complete
takeLast
- Observable が発行する値のうち、ラスト
count
回だけ発行する-
take()
を先に知っておいた方が良い
-
Rx.Observable.interval(1000)
.take(5)
.takeLast(2)
.subscribe(
res => console.log(res),
err => console.log('ここには来ない'),
() => console.log('complete')
);
// => 3
// => 4
// => complete
takeUntil
-
notifier
に指定された Observable による通知があるまで発行する-
take()
を先に知っておいた方が良い
-
Rx.Observable.interval(1000)
.takeUntil(Rx.Observable.timer(5000))
.subscribe(
res => console.log(res),
err => console.log('ここには来ない'),
() => console.log('complete')
);
// => 0
// => 1
// => 2
// => 3
// => complete
takeWhile
-
predicate
が true の間は発行する-
take()
を先に知っておいた方が良い - 一度条件から外れたら終了
- (途中から再び発行し始める。とかは無い)
-
Rx.Observable.interval(1000)
.takeWhile(v => v < 5)
.subscribe(
res => console.log(res),
err => console.log('ここには来ない'),
() => console.log('complete')
);
// => 0
// => 1
// => 2
// => 3
// => 4
// => complete
throttle
- だいたい
sample()
と同じ動きをする?-
durationSelector
が発行されるまで Observable の発行は無視される
-
Rx.Observable.interval(1000)
.throttle(v => Rx.Observable.timer(5000))
.subscribe(res => console.log(res));
// => 0
// => 5
// => 10
// => ...
throttleTime
-
duration
で指定するバージョン-
throttle()
を先に知っておいた方が良い
-
Rx.Observable.interval(1000)
.throttleTime(5000)
.subscribe(res => console.log(res));
// => 0
// => 5
// => 10
// => ...
Combination Operators
Method Type | Method Name |
---|---|
class method | combineLatest |
class method | concat |
class method | forkJoin |
class method | merge |
class method | zip |
instance method | combineAll |
instance method | combineLatest |
instance method | concat |
instance method | concatAll |
instance method | exhaust |
instance method | merge |
instance method | mergeAll |
instance method | race |
instance method | startWith |
instance method | switch |
instance method | withLatestFrom |
instance method | zip |
instance method | zipAll |
combineLatest
- 複数の Observable を組み合わせる
const observable1 = Rx.Observable.timer(0, 1000).map(v => `A${v}`);
const observable2 = Rx.Observable.timer(0, 3000).map(v => `B${v}`);
Rx.Observable.combineLatest(observable1, observable2)
.subscribe(res => console.log(res));
// => [ 'A0', 'B0' ]
// (1s 待機)
// => [ 'A1', 'B0' ]
// (1s 待機)
// => [ 'A2', 'B0' ]
// (1s 待機)
// => [ 'A2', 'B1' ]
// => [ 'A3', 'B1' ]
// (1s 待機)
// => [ 'A4', 'B1' ]
// (1s 待機)
// => [ 'A5', 'B1' ]
// (1s 待機)
// => [ 'A5', 'B2' ]
// => [ 'A6', 'B2' ]
// (1s 待機)
// => [ 'A7', 'B2' ]
// (1s 待機)
// => [ 'A8', 'B2' ]
// (1s 待機)
// => [ 'A8', 'B3' ]
// => [ 'A9', 'B3' ]
// (1s 待機)
// ...
concat
- 複数の Observable を順次実行する
const observable1 = Rx.Observable.timer(0, 1000).map(v => `A${v}`).take(5);
const observable2 = Rx.Observable.timer(0, 500).map(v => `B${v}`).take(5);
const observable3 = Rx.Observable.timer(0, 2000).map(v => `C${v}`).take(5);
Rx.Observable.concat(observable1, observable2, observable3)
.subscribe(
res => console.log(res),
err => console.log('ここには来ない'),
() => console.log('complete')
);
// => A0
// => A1
// => A2
// => A3
// => A4
// => B0
// => B1
// => B2
// => B3
// => B4
// => C0
// => C1
// => C2
// => C3
// => C4
// => complete
forkJoin
- 複数の Observable を並列実行し、最後の値のみ発行する
const observable1 = Rx.Observable.timer(0, 1000).map(v => `A${v}`).take(5);
const observable2 = Rx.Observable.timer(0, 500).map(v => `B${v}`).take(5);
const observable3 = Rx.Observable.timer(0, 2000).map(v => `C${v}`).take(5);
Rx.Observable.forkJoin(observable1, observable2, observable3)
.subscribe(
res => console.log(res),
err => console.log('ここには来ない'),
() => console.log('complete')
);
merge
- 複数の Observable を並列実行し、それぞれ値を発行する
const observable1 = Rx.Observable.timer(0, 1000).map(v => `A${v}`).take(5);
const observable2 = Rx.Observable.timer(0, 500).map(v => `B${v}`).take(5);
const observable3 = Rx.Observable.timer(0, 2000).map(v => `C${v}`).take(5);
Rx.Observable.merge(observable1, observable2, observable3)
.subscribe(
res => console.log(res),
err => console.log('ここには来ない'),
() => console.log('complete')
);
// => A0
// => B0
// => C0
// => B1
// => A1
// => B2
// => B3
// => C1
// => A2
// => B4
// => A3
// => C2
// => A4
// => C3
// => C4
// => complete
zip
- 複数の Observable を合成して値を発行する
const age = Rx.Observable.of(10, 20, 30);
const name = Rx.Observable.of('Hoge', 'Foo', 'Bar');
const developer = Rx.Observable.of(true, true, false);
Rx.Observable.zip(age, name, developer)
.subscribe(
res => console.log(res),
err => console.log('ここには来ない'),
() => console.log('complete')
);
// => [ 10, 'Hoge', true ]
// => [ 20, 'Foo', true ]
// => [ 30, 'Bar', false ]
// => complete
combineAll
- 外側 Observable の発行が完了したら、内側の複数の Observable が組み合わされて発行される
Rx.Observable.timer(0, 1000).map(v => `Outer(${v})`).take(5)
.do(
res => console.log(`do(res): ${res}`),
err => console.log('do(err): ここには来ない'),
() => console.log('do(complete): complete')
)
.map(v => Rx.Observable.of(`Inner: ${v}`))
.combineAll()
.subscribe(
res => console.log(res),
err => console.log('ここには来ない'),
() => console.log('complete')
);
// => do(res): Outer(0)
// => do(res): Outer(1)
// => do(res): Outer(2)
// => do(res): Outer(3)
// => do(res): Outer(4)
// => do(complete): complete
// => [ 'Inner: Outer(0)', 'Inner: Outer(1)', 'Inner: Outer(2)', 'Inner: Outer(3)', 'Inner: Outer(4)' ]
// => complete
combineLatest
- 複数の Observable を組み合わせる
- instance method 版
Rx.Observable.timer(0, 1000).map(v => `A${v}`)
.combineLatest(Rx.Observable.timer(0, 3000).map(v => `B${v}`))
.subscribe(res => console.log(res));
// => [ 'A0', 'B0' ]
// (1s 待機)
// => [ 'A1', 'B0' ]
// (1s 待機)
// => [ 'A2', 'B0' ]
// (1s 待機)
// => [ 'A2', 'B1' ]
// => [ 'A3', 'B1' ]
// (1s 待機)
// => [ 'A4', 'B1' ]
// (1s 待機)
// => [ 'A5', 'B1' ]
// (1s 待機)
// => [ 'A5', 'B2' ]
// => [ 'A6', 'B2' ]
// (1s 待機)
// => [ 'A7', 'B2' ]
// (1s 待機)
// => [ 'A8', 'B2' ]
// (1s 待機)
// => [ 'A8', 'B3' ]
// => [ 'A9', 'B3' ]
concat
- 複数の Observable を順次実行する
- instance method 版
Rx.Observable.timer(0, 1000).map(v => `A${v}`).take(5)
.concat(Rx.Observable.timer(0, 500).map(v => `B${v}`).take(5))
.concat(Rx.Observable.timer(0, 2000).map(v => `C${v}`).take(5))
.subscribe(
res => console.log(res),
err => console.log('ここには来ない'),
() => console.log('complete')
);
// => A0
// => A1
// => A2
// => A3
// => A4
// => B0
// => B1
// => B2
// => B3
// => B4
// => C0
// => C1
// => C2
// => C3
// => C4
// => complete
concatAll
- 外側の Observable が発行されたら内側の Observable の発行が完了するまで待つ
-
concat()
を先に知っておいた方が良い
-
Rx.Observable.of('A', 'B', 'C', 'D', 'E').map(v => `Outer(${v})`)
.map(v => Rx.Observable.timer(0, 1000).take(3).map(vv => `Inner(${vv}): ${v}`))
.concatAll()
.subscribe(
res => console.log(res),
err => console.log('ここには来ない'),
() => console.log('complete')
);
// => Inner(0): Outer(A)
// => Inner(1): Outer(A)
// => Inner(2): Outer(A)
// => Inner(0): Outer(B)
// => Inner(1): Outer(B)
// => Inner(2): Outer(B)
// => Inner(0): Outer(C)
// => Inner(1): Outer(C)
// => Inner(2): Outer(C)
// => Inner(0): Outer(D)
// => Inner(1): Outer(D)
// => Inner(2): Outer(D)
// => Inner(0): Outer(E)
// => Inner(1): Outer(E)
// => Inner(2): Outer(E)
// => complete
exhaust
- 外側の Observable が発行されたら内側の Observable が発行し始める
- 内側の Observable の発行完了前に次の外側 の Observable が発行したら無視される
// concatAll と同じ動き?
Rx.Observable.timer(0, 5000).take(3).map(v => `Outer(${v})`)
.map(v => Rx.Observable.of(`Inner: ${v}`))
.exhaust()
.subscribe(
res => console.log(res),
err => console.log('ここには来ない'),
() => console.log('complete')
);
// => Inner: Outer(0)
// => Inner: Outer(1)
// => Inner: Outer(2)
// => complete
// concatAll と同じ動き?
Rx.Observable.timer(0, 5000).take(3).map(v => `Outer(${v})`)
.map(v => Rx.Observable.timer(0, 1000).take(4).map(vv => `Inner(${vv}): ${v}`))
.exhaust()
.subscribe(
res => console.log(res),
err => console.log('ここには来ない'),
() => console.log('complete')
);
// => Inner(0): Outer(0)
// => Inner(1): Outer(0)
// => Inner(2): Outer(0)
// => Inner(3): Outer(0)
// => Inner(0): Outer(1)
// => Inner(1): Outer(1)
// => Inner(2): Outer(1)
// => Inner(3): Outer(1)
// => Inner(0): Outer(2)
// => Inner(1): Outer(2)
// => Inner(2): Outer(2)
// => Inner(3): Outer(2)
// => complete
Rx.Observable.timer(0, 5000).take(3).map(v => `Outer(${v})`)
.map(v => Rx.Observable.timer(0, 1000).take(7).map(vv => `Inner(${vv}): ${v}`))
.exhaust()
.subscribe(
res => console.log(res),
err => console.log('ここには来ない'),
() => console.log('complete')
);
// => Inner(0): Outer(0)
// => Inner(1): Outer(0)
// => Inner(2): Outer(0)
// => Inner(3): Outer(0)
// => Inner(4): Outer(0)
// => Inner(5): Outer(0)
// => Inner(6): Outer(0)
// => Inner(0): Outer(2)
// => Inner(1): Outer(2)
// => Inner(2): Outer(2)
// => Inner(3): Outer(2)
// => Inner(4): Outer(2)
// => Inner(5): Outer(2)
// => Inner(6): Outer(2)
// => complete
merge
- 複数の Observable を並列実行し、それぞれ値を発行する
- instance method 版
Rx.Observable.timer(0, 1000).take(5).map(v => `A${v}`)
.merge(Rx.Observable.timer(0, 500).take(5).map(v => `B${v}`))
.merge(Rx.Observable.timer(0, 2000).take(5).map(v => `C${v}`))
.subscribe(
res => console.log(res),
err => console.log('ここには来ない'),
() => console.log('complete')
);
// => A0
// => B0
// => C0
// => B1
// => A1
// => B2
// => B3
// => C1
// => A2
// => B4
// => A3
// => C2
// => A4
// => C3
// => C4
// => complete
mergeAll
- 外側の Observable が発行されたら内側の Observable が発行し始める
- 内側の Observable の発行完了前に次の外側の Observable が発行したらそれぞれの値を発行する
-
merge()
を先に知っておいた方が良い
-
Rx.Observable.timer(0, 5000).take(3).map(v => `Outer(${v})`)
.map(v => Rx.Observable.timer(0, 1000).take(7).map(vv => `Inner(${vv}): ${v}`))
.mergeAll()
.subscribe(
res => console.log(res),
err => console.log('ここには来ない'),
() => console.log('complete')
);
// => Inner(0): Outer(0)
// => Inner(1): Outer(0)
// => Inner(2): Outer(0)
// => Inner(3): Outer(0)
// => Inner(4): Outer(0)
// => Inner(0): Outer(1)
// => Inner(5): Outer(0)
// => Inner(1): Outer(1)
// => Inner(6): Outer(0)
// => Inner(2): Outer(1)
// => Inner(3): Outer(1)
// => Inner(4): Outer(1)
// => Inner(0): Outer(2)
// => Inner(5): Outer(1)
// => Inner(1): Outer(2)
// => Inner(6): Outer(1)
// => Inner(2): Outer(2)
// => Inner(3): Outer(2)
// => Inner(4): Outer(2)
// => Inner(5): Outer(2)
// => Inner(6): Outer(2)
// => complete
race
- 一番発行の速い Observable が採用される
Rx.Observable.interval(1000).take(2).map(v => `A${v}`)
.race(Rx.Observable.interval(500).take(5).map(v => `B${v}`))
.race(Rx.Observable.interval(2000).take(1).map(v => `C${v}`))
.subscribe(
res => console.log(res),
err => console.log('ここには来ない'),
() => console.log('complete')
);
// => B0
// => B1
// => B2
// => B3
// => B4
// => complete
startWith
- Observable の先頭に値を追加する
Rx.Observable.range(1, 5)
.startWith('A' as any)
.subscribe(...callbacks);
// => A
// => 1
// => 2
// => 3
// => 4
// => 5
// => complete
switch
- 外側の Observable が発行する度に内側の Observable に切り替わる
Rx.Observable.timer(0, 5000).take(3).map(v => `Outer(${v})`)
.map(v => Rx.Observable.timer(0, 1000).map(vv => `Inner(${vv}): ${v}`))
.switch()
.subscribe(res => console.log(res));
// => Inner(0): Outer(0)
// => Inner(1): Outer(0)
// => Inner(2): Outer(0)
// => Inner(3): Outer(0)
// => Inner(4): Outer(0)
// => Inner(0): Outer(1)
// => Inner(1): Outer(1)
// => Inner(2): Outer(1)
// => Inner(3): Outer(1)
// => Inner(4): Outer(1)
// => Inner(0): Outer(2)
// => Inner(1): Outer(2)
// => Inner(2): Outer(2)
// => Inner(3): Outer(2)
// => Inner(4): Outer(2)
// => Inner(5): Outer(2)
// => Inner(6): Outer(2)
// => Inner(7): Outer(2)
withLatestFrom
- Observable が発行するときに、別の Observable の最新の値と合成して発行する
Rx.Observable.timer(0, 5000).take(3).map(v => `A${v}`)
.withLatestFrom(Rx.Observable.timer(0, 1000).map(v => `B${v}`))
.subscribe(
res => console.log(res),
err => console.log('ここには来ない'),
() => console.log('complete')
);
// => [ 'A(0)', 'B0' ]
// => [ 'A(1)', 'B4' ]
// => [ 'A(2)', 'B9' ]
// => complete
zip
- 複数の Observable を合成して値を発行する
- instance method 版
const age = Rx.Observable.of(10, 20, 30);
const name = Rx.Observable.of('Hoge', 'Foo', 'Bar');
const developer = Rx.Observable.of(true, true, false);
age.zip(name, developer)
.subscribe(
res => console.log(res),
err => console.log('ここには来ない'),
() => console.log('complete')
);
// => [ 10, 'Hoge', true ]
// => [ 20, 'Foo', true ]
// => [ 30, 'Bar', false ]
// => complete
const age = Rx.Observable.of(10, 20, 30);
const name = Rx.Observable.of('Hoge', 'Foo', 'Bar');
const developer = Rx.Observable.of(true, true, false);
age.zip(name)
.zip(developer)
.subscribe(
res => console.log(res),
err => console.log('ここには来ない'),
() => console.log('complete')
);
// => [ [ 10, 'Hoge' ], true ]
// => [ [ 20, 'Foo' ], true ]
// => [ [ 30, 'Bar' ], false ]
// => complete
zipAll
- 外側の Observable を内側の Observable に合成して発行する
-
zip()
を先に知っておいた方が良い
-
Rx.Observable.timer(0, 5000).take(3).map(v => `Outer(${v})`)
.map(v => Rx.Observable.timer(0, 1000).take(7).map(vv => `Inner(${vv}): ${v}`))
.zipAll()
.subscribe(
res => console.log(res),
err => console.log('ここには来ない'),
() => console.log('complete')
);
// => [ 'Inner(0): Outer(0)', 'Inner(0): Outer(1)', 'Inner(0): Outer(2)' ]
// => [ 'Inner(1): Outer(0)', 'Inner(1): Outer(1)', 'Inner(1): Outer(2)' ]
// => [ 'Inner(2): Outer(0)', 'Inner(2): Outer(1)', 'Inner(2): Outer(2)' ]
// => [ 'Inner(3): Outer(0)', 'Inner(3): Outer(1)', 'Inner(3): Outer(2)' ]
// => [ 'Inner(4): Outer(0)', 'Inner(4): Outer(1)', 'Inner(4): Outer(2)' ]
// => [ 'Inner(5): Outer(0)', 'Inner(5): Outer(1)', 'Inner(5): Outer(2)' ]
// => [ 'Inner(6): Outer(0)', 'Inner(6): Outer(1)', 'Inner(6): Outer(2)' ]
Transformation Operators
Method Type | Method Name |
---|---|
instance method | buffer |
instance method | bufferCount |
instance method | bufferTime |
instance method | bufferToggle |
instance method | bufferWhen |
instance method | concatMap |
instance method | concatMapTo |
instance method | exhaustMap |
instance method | expand |
instance method | groupBy |
instance method | map |
instance method | mapTo |
instance method | mergeMap |
instance method | mergeMapTo |
instance method | mergeMapScan |
instance method | pairwise |
instance method | partition |
instance method | pluck |
instance method | scan |
instance method | switchMap |
instance method | switchMapTo |
instance method | window |
instance method | windowCount |
instance method | windowToggle |
instance method | windowWhen |
instance method | reduce |
buffer
- Observable が発行する値をバッファしておく
-
closingNotifier
による通知がある度にバッファを発行する
-
Rx.Observable.interval(1000)
.buffer(Rx.Observable.interval(5000))
.subscribe(res => console.log(res));
// => [ 0, 1, 2, 3 ]
// => [ 4, 5, 6, 7, 8 ]
// => [ 9, 10, 11, 12, 13 ]
// => ...
Rx.Observable.interval(1000)
.buffer(Rx.Observable.interval(5000).take(3))
.subscribe(
res => console.log(res),
err => console.log('ここには来ない'),
() => console.log('complete')
);
// => [ 0, 1, 2, 3 ]
// => [ 4, 5, 6, 7, 8 ]
// => [ 9, 10, 11, 12, 13 ]
// => complete
bufferCount
- Obaservable が発行する値を
bufferSize
だけバッファしておき、溜まったら発行する-
buffer()
を先に知っておいた方が良い
-
Rx.Observable.interval(1000)
.bufferCount(5)
.subscribe(res => console.log(res));
// => [ 0, 1, 2, 3, 4 ]
// => [ 5, 6, 7, 8, 9 ]
// => [ 10, 11, 12, 13, 14 ]
// => ...
bufferTime
- Observable が発行する値を
bufferTimeSpan
が経過するまでバッファしておく-
buffer()
を先に知っておいた方が良い
-
Rx.Observable.interval(1000)
.bufferTime(5000)
.subscribe(res => console.log(res));
// => [ 0, 1, 2, 3 ]
// => [ 4, 5, 6, 7, 8 ]
// => [ 9, 10, 11, 12, 13 ]
// => ...
bufferToggle
- Observable が発行する値を
openings
による通知があった時からclosingSelector
による通知があるまでバッファしておく-
buffer()
を先に知っておいた方が良い
-
Rx.Observable.interval(1000)
.bufferToggle(Rx.Observable.interval(10000), v => Rx.Observable.interval(5000))
.subscribe(res => console.log(res));
// => [ 9, 10, 11, 12, 13 ]
// => [ 19, 20, 21, 22, 23 ]
// => [ 29, 30, 31, 32, 33 ]
// => ...
bufferWhen
- Observable が発行する値を
closingSelector
による通知があるまでバッファしてとく-
buffer()
を先に知っておいた方が良い -
buffer()
との違いはclosingSelector
の Observable に.take(3)
とか付けても効き目がない?
-
Rx.Observable.interval(1000)
.bufferWhen(() => Rx.Observable.interval(5000))
.subscribe(res => console.log(res));
// => [ 0, 1, 2, 3 ]
// => [ 4, 5, 6, 7, 8 ]
// => [ 9, 10, 11, 12, 13 ]
// => ...
Rx.Observable.interval(1000)
.bufferWhen(() => Rx.Observable.interval(5000).take(3))
.subscribe(res => console.log(res));
// => [ 0, 1, 2, 3 ]
// => [ 4, 5, 6, 7, 8 ]
// => [ 9, 10, 11, 12, 13 ]
// => ...
concatMap
- Observable が発行した値を
project
で指定した Observable に使うことができる-
project
が発行中のときは待つ
-
-
concat()
,map()
を先に知っておいた方が良い
Rx.Observable.interval(5000)
.take(3)
.concatMap(v => Rx.Observable.of(v * 10, v * 100, v * 1000))
.subscribe(
res => console.log(res),
err => console.log('ここには来ない'),
() => console.log('complete')
);
// => 0
// => 0
// => 0
// => 10
// => 100
// => 1000
// => 20
// => 200
// => 2000
// => complete
Rx.Observable.interval(5000)
.take(3)
.concatMap(v => Rx.Observable.interval(1000).take(4))
.subscribe(
res => console.log(res),
err => console.log('ここには来ない'),
() => console.log('complete')
);
// => 0
// => 1
// => 2
// => 3
// (1秒待つ)
// => 0
// => 1
// => 2
// => 3
// (1秒待つ)
// => 0
// => 1
// => 2
// => 3
// => complete
Rx.Observable.interval(5000)
.take(3)
.concatMap(v => Rx.Observable.interval(1000).take(10))
.subscribe(
res => console.log(res),
err => console.log('ここには来ない'),
() => console.log('complete')
);
// => 0
// => 1
// => 2
// => 3
// => 4
// => 5
// => 6
// => 7
// => 8
// => 9
// => 0
// => 1
// => 2
// => 3
// => 4
// => 5
// => 6
// => 7
// => 8
// => 9
// => 0
// => 1
// => 2
// => 3
// => 4
// => 5
// => 6
// => 7
// => 8
// => 9
// => complete
concatMapTo
- Observable が発行した値を
observable
として発行する -
concat()
,mapTo()
を先に知っておいた方が良い
Rx.Observable.interval(5000)
.take(3)
.concatMapTo(Rx.Observable.of('A', 'B', 'C'))
.subscribe(
res => console.log(res),
err => console.log('ここには来ない'),
() => console.log('complete')
);
// => A
// => B
// => C
// => A
// => B
// => C
// => A
// => B
// => C
// => complete
exhaustMap
-
concatMap()
とだいたい同じ?-
project
が発行中の時に Observable が発行すると無視される
-
-
exhaust()
,map()
を先に知っておいた方が良い
// `concatMap()` と同じ動き
Rx.Observable.interval(5000)
.take(3)
.exhaustMap(v => Rx.Observable.of(v * 10, v * 100, v * 1000))
.subscribe(
res => console.log(res),
err => console.log('ここには来ない'),
() => console.log('complete')
);
// => 0
// => 0
// => 0
// => 10
// => 100
// => 1000
// => 20
// => 200
// => 2000
// => complete
// `concatMap()` と同じ動き
Rx.Observable.interval(5000)
.take(3)
.exhaustMap(v => Rx.Observable.interval(1000).take(4))
.subscribe(
res => console.log(res),
err => console.log('ここには来ない'),
() => console.log('complete')
);
// => 0
// => 1
// => 2
// => 3
// => 0
// => 1
// => 2
// => 3
// => 0
// => 1
// => 2
// => 3
// => complete
Rx.Observable.interval(5000)
.take(3)
.exhaustMap(v => Rx.Observable.interval(1000).take(10))
.subscribe(
res => console.log(res),
err => console.log('ここには来ない'),
() => console.log('complete')
);
// => 0
// => 1
// => 2
// => 3
// => 4
// => 5
// => 6
// => 7
// => 8
// => 9
// => complete
expand
- Observalue が発行する値を再帰的に処理する
- Observale の出力は1つ目のみを使用?
Rx.Observable.of(1, 2, 3)
.expand(v => Rx.Observable.of(2 * v))
.take(5)
.subscribe(
res => console.log(res),
err => console.log('ここには来ない'),
() => console.log('complete')
);
// => 1
// => 2
// => 4
// => 8
// => 16
// => complete
groupBy
- Observable が発行する値をグループ化する
Rx.Observable.of(
{id: 1, name: "A1"},
{id: 1, name: "B1"},
{id: 2, name: "B2"},
{id: 1, name: "C1"},
{id: 2, name: "C2"},
{id: 3, name: "C3"},
{id: 1, name: "D1"},
{id: 2, name: "D2"},
{id: 3, name: "D3"},
{id: 4, name: "D4"}
).groupBy(v => v.id)
.flatMap(v => v.reduce((acc, v2) => [...acc, v2], []))
.subscribe(
res => console.log(res),
err => console.log('ここには来ない'),
() => console.log('complete')
);
// => [ { id: 1, name: 'A1' }, { id: 1, name: 'B1' }, { id: 1, name: 'C1' }, { id: 1, name: 'D1' } ]
// => [ { id: 2, name: 'B2' }, { id: 2, name: 'C2' }, { id: 2, name: 'D2' } ]
// => [ { id: 3, name: 'C3' }, { id: 3, name: 'D3' } ]
// => [ { id: 4, name: 'D4' } ]
// => complete
map
- Observable の値を変換して発行する
-
Array.map()
とだいたい同じ
-
Rx.Observable.range(1, 5)
.map(v => v * 10)
.subscribe(
res => console.log(res),
err => console.log('ここには来ない'),
() => console.log('complete')
);
// => 10
// => 20
// => 30
// => 40
// => 50
// => complete
mapTo
- Observable の値を特定の値に変換して発行する
-
map()
を先に知っておいた方が良い
-
Rx.Observable.range(1, 5)
.mapTo("A")
.subscribe(
res => console.log(res),
err => console.log('ここには来ない'),
() => console.log('complete')
);
// => A
// => A
// => A
// => A
// => A
// => complete
mergeMap
- Observable の値を変換して、別の Observable と合成する
-
merge()
,map()
を先に知っておいた方が良い
-
Rx.Observable.range(1, 5)
.mergeMap((v) => Rx.Observable.of(`${v}A`, `${v}B`, `${v}C`))
.subscribe(
res => console.log(res),
err => console.log('ここには来ない'),
() => console.log('complete')
);
// => 1A
// => 1B
// => 1C
// => 2A
// => 2B
// => 2C
// => 3A
// => 3B
// => 3C
// => 4A
// => 4B
// => 4C
// => 5A
// => 5B
// => 5C
// => complete
mergeMapTo
- Observable の値を特定の Observable に変換して発行する
-
merge()
,mapTo()
を先に知っておいた方が良い
-
Rx.Observable.range(1, 5)
.mergeMapTo(Rx.Observable.of('A', 'B', 'C'))
.subscribe(
res => console.log(res),
err => console.log('ここには来ない'),
() => console.log('complete')
);
// => A
// => B
// => C
// => A
// => B
// => C
// => A
// => B
// => C
// => A
// => B
// => C
// => A
// => B
// => C
// => complete
mergeMapScan
- Observable の値を別の Observable に変換してアキュムレーターで蓄積する
-
mergeMap()
,scan()
を先に知っておいた方が良い
-
Rx.Observable.range(1, 5)
.mergeScan((acc, v) => Rx.Observable.of(acc * v), 1)
.subscribe(
res => console.log(res),
err => console.log('ここには来ない'),
() => console.log('complete')
);
// => 1
// => 2
// => 6
// => 24
// => 120
// => complete
pairwise
- Observable の値のうち2つをペアにする
Rx.Observable.of("a", "b", "c", "d", "e")
.pairwise()
.subscribe(
res => console.log(res),
err => console.log('ここには来ない'),
() => console.log('complete')
);
// => [ 'a', 'b' ]
// => [ 'b', 'c' ]
// => [ 'c', 'd' ]
// => [ 'd', 'e' ]
// => complete
partition
- Observable 自体を
predicate
の条件で分離する
Rx.Observable.range(1, 10)
.partition(v => v % 2 === 0)
.forEach(p => {
p.subscribe(
res => console.log(res),
err => console.log('ここには来ない'),
() => console.log('complete')
);
});
// => 2
// => 4
// => 6
// => 8
// => 10
// => complete
// => 1
// => 3
// => 5
// => 7
// => 9
// => complete
pluck
- Observable の値のうち特定のキーのみを発行する
Rx.Observable.of(
{name: 'AAAAA', age: 10},
{name: 'BBBBB', age: 20},
{name: 'CCCCC', age: 30},
).pluck('name')
.subscribe(
res => console.log(res),
err => console.log('ここには来ない'),
() => console.log('complete')
);
// => AAAAA
// => BBBBB
// => CCCCC
// => complete
scan
- Observable の値をアキュムレーターに蓄積しつつ毎回発行する
Rx.Observable.range(1, 10)
.scan((acc, v) => acc + v, 0)
.subscribe(
res => console.log(res),
err => console.log('ここには来ない'),
() => console.log('complete')
);
// => 1
// => 3
// => 6
// => 10
// => 15
// => 21
// => 28
// => 36
// => 45
// => 55
// => complete
switchMap
- Observable が発行する度に
project
で指定された Observable が発行する -
switch()
.map()
を先に知っておいた方が良い
Rx.Observable.interval(5000)
.switchMap(v => Rx.Observable.of(`${v}A`))
.subscribe(res => console.log(res));
// => 0A
// => 1A
// => 2A
// => 3A
// => ...
Rx.Observable.interval(5000)
.switchMap(v => Rx.Observable.interval(1000))
.subscribe(res => console.log(res));
// => 0
// => 1
// => 2
// => 3
// => 0
// => 1
// => 2
// => 3
// => 0
// => 1
// => 2
// => 3
// => ...
switchMapTo
- Observable が発行する度に
observable
で指定された Observable が発行する -
switch()
,mapTo()
を先に知っておいた方が良い
Rx.Observable.interval(5000)
.switchMapTo(Rx.Observable.of('A', 'B', 'C'))
.subscribe(res => console.log(res));
// => A
// => B
// => C
// => A
// => B
// => C
// => A
// => B
// => C
// => ...
Rx.Observable.interval(5000)
.switchMapTo(Rx.Observable.interval(1000))
.subscribe(res => console.log(res));
// => 0
// => 1
// => 2
// => 3
// => 0
// => 1
// => 2
// => 3
// => 0
// => 1
// => 2
// => 3
// => ...
window
-
windowBoundaries
で指定された Observable が発行する度に本線が分岐する- あまり良くわかってない。。。
Rx.Observable.interval(100)
.window(Rx.Observable.interval(1000))
.map(w => w.take(2))
.mergeAll()
.subscribe(res => console.log(res));
// => 0
// => 1
// => 9
// => 10
// => 19
// => 20
// => 29
// => 30
// => ...
windowCount
-
windowSize
毎に本線が分岐する-
window()
を先に知っておいた方が良い - あまり良くわかってない。。。
-
Rx.Observable.interval(100)
.windowCount(10)
.map(w => w.take(2))
.mergeAll()
.subscribe(res => console.log(res));
// => 0
// => 1
// => 10
// => 11
// => ...
windowToggle
- Observable が発行する値を
openings
毎に本線が分岐しclosingSelector
されるまで発行される-
window()
を先に知っておいた方が良い - あまり良くわかってない。。。
-
Rx.Observable.interval(100)
.windowToggle(Rx.Observable.interval(1000), v => Rx.Observable.interval(500))
.mergeAll()
.subscribe(res => console.log(res));
// => 9
// => 10
// => 11
// => 12
// => 13
// (500ms待機?)
// => 19
// => 20
// => 21
// => 22
// => 23
// (500ms待機?)
// => ...
windowWhen
-
closingSelector
毎に本線が分岐する-
window()
を先に知っておいた方が良い - あまり良くわかってない。。。
-
window()
との違いもわからない
-
Rx.Observable.interval(100)
.windowWhen(() => Rx.Observable.interval(1000))
.map(w => w.take(2))
.mergeAll()
.subscribe(res => console.log(res));
// => 0
// => 1
// => 9
// => 10
// => 19
// => 20
// => ...
reduce
- Observable の値をアキュムレーターに蓄積し、最後の結果を発行する
-
scan()
とだいたい同じ
-
Rx.Observable.range(1, 10)
.reduce((acc, v) => acc + v, 0)
.subscribe(
res => console.log(res),
err => console.log('ここには来ない'),
() => console.log('complete')
);
// => 55
// => complete
Multicasting Operators
Method Type | Method Name |
---|---|
instance method | multicast |
instance method | publish |
instance method | publishBehavior |
instance method | publishLast |
instance method | publishReplay |
instance method | share |
instance method | shareReplay |
multicast
- 単一のサブスクリプションを共有する Observable を返す?
- あまり良くわかってない。。。
- 実行ログを見ると
do
1つに対してMulti(1), (2)
の出力が出てる
const multicast = Rx.Observable.timer(0, 1000).take(5).map(v => `A${v}`)
.do(
res => console.log(`res(do): ${res}`),
err => console.log('err(do): ここには来ない'),
() => console.log('complete(do)'),
)
.multicast(() => new Rx.Subject());
multicast.subscribe(
res => console.log(`Multi(1): ${res}`),
err => console.log('Multi(1): ここには来ない'),
() => console.log('Multi(1): complete'),
);
multicast.subscribe(
res => console.log(`Multi(2): ${res}`),
err => console.log('Multi(2): ここには来ない'),
() => console.log('Multi(2): complete'),
);
multicast.connect();
// => res(do): A0
// => Multi(1): A0
// => Multi(2): A0
// => res(do): A1
// => Multi(1): A1
// => Multi(2): A1
// => res(do): A2
// => Multi(1): A2
// => Multi(2): A2
// => res(do): A3
// => Multi(1): A3
// => Multi(2): A3
// => res(do): A4
// => Multi(1): A4
// => Multi(2): A4
// => complete(do)
// => Multi(1): complete
// => Multi(2): complete
publish
- 単一のサブスクリプションを共有する Observable を返す?
- あまり良くわかってない。。。
-
multicast()
との違いもわからない - 実行ログを見ると
do
1つに対してPub(1), (2)
の出力が出てる
const publish = Rx.Observable.timer(0, 1000).take(5).map(v => `A${v}`)
.do(
res => console.log(`res(do): ${res}`),
err => console.log('err(do): ここには来ない'),
() => console.log('complete(do)'),
)
.publish();
publish.subscribe(
res => console.log(`Pub(1): ${res}`),
err => console.log('Pub(1): ここには来ない'),
() => console.log('Pub(1): complete'),
);
publish.subscribe(
res => console.log(`Pub(2): ${res}`),
err => console.log('Pub(2): ここには来ない'),
() => console.log('Pub(2): complete'),
);
publish.connect();
// => res(do): A0
// => Pub(1): A0
// => Pub(2): A0
// => res(do): A1
// => Pub(1): A1
// => Pub(2): A1
// => res(do): A2
// => Pub(1): A2
// => Pub(2): A2
// => res(do): A3
// => Pub(1): A3
// => Pub(2): A3
// => res(do): A4
// => Pub(1): A4
// => Pub(2): A4
// => complete(do)
// => Pub(1): complete
// => Pub(2): complete
publishBehavior
- 先頭に値を追加する
publish()
?- あまり良くわかってない。。。
-
publish()
を先に知っておいた方が良い - 実行ログを見ると先頭に
hogehoge
が追加されている
const publish = Rx.Observable.timer(0, 1000).take(5).map(v => `A${v}`)
.do(
res => console.log(`res(do): ${res}`),
err => console.log('err(do): ここには来ない'),
() => console.log('complete(do)'),
)
.publishBehavior('hogehoge');
publish.subscribe(
res => console.log(`Pub(1): ${res}`),
err => console.log('Pub(1): ここには来ない'),
() => console.log('Pub(1): complete'),
);
publish.subscribe(
res => console.log(`Pub(2): ${res}`),
err => console.log('Pub(2): ここには来ない'),
() => console.log('Pub(2): complete'),
);
publish.connect();
// => Pub(1): hogehoge
// => Pub(2): hogehoge
// => res(do): A0
// => Pub(1): A0
// => Pub(2): A0
// => res(do): A1
// => Pub(1): A1
// => Pub(2): A1
// => res(do): A2
// => Pub(1): A2
// => Pub(2): A2
// => res(do): A3
// => Pub(1): A3
// => Pub(2): A3
// => res(do): A4
// => Pub(1): A4
// => Pub(2): A4
// => complete(do)
// => Pub(1): complete
// => Pub(2): complete
publishLast
- 最後の値のみを発行する
publish()
?- あまり良くわかってない。。。
-
publish()
を先に知っておいた方が良い
const publish = Rx.Observable.timer(0, 1000).take(5).map(v => `A${v}`)
.do(
res => console.log(`res(do): ${res}`),
err => console.log('err(do): ここには来ない'),
() => console.log('complete(do)'),
)
.publishLast();
publish.subscribe(
res => console.log(`Pub(1): ${res}`),
err => console.log('Pub(1): ここには来ない'),
() => console.log('Pub(1): complete'),
);
publish.subscribe(
res => console.log(`Pub(2): ${res}`),
err => console.log('Pub(2): ここには来ない'),
() => console.log('Pub(2): complete'),
);
publish.connect();
// => res(do): A0
// => res(do): A1
// => res(do): A2
// => res(do): A3
// => res(do): A4
// => complete(do)
// => Pub(1): A4
// => Pub(2): A4
// => Pub(1): complete
// => Pub(2): complete
publishReplay
- あまりよくわからず
- うまく動かせなかった。。。
-
publish()
は先に知っておいた方が良い
share
- 単一のサブスクリプションを共有する Observable を返す?
-
multicast()
とpublish()
の違いはわからなかった -
ConnectableObservable
ではないのでconnect()
が不要ということだけわかった
-
const share = Rx.Observable.timer(0, 1000).take(5).map(v => `A${v}`)
.do(
res => console.log(`res(do): ${res}`),
err => console.log('err(do): ここには来ない'),
() => console.log('complete(do)'),
)
.share();
share.subscribe(
res => console.log(`Share(1): ${res}`),
err => console.log('Share(1): ここには来ない'),
() => console.log('Share(1): complete'),
);
share.subscribe(
res => console.log(`Share(2): ${res}`),
err => console.log('Share(2): ここには来ない'),
() => console.log('Share(2): complete'),
);
// => res(do): A0
// => Share(1): A0
// => Share(2): A0
// => res(do): A1
// => Share(1): A1
// => Share(2): A1
// => res(do): A2
// => Share(1): A2
// => Share(2): A2
// => res(do): A3
// => Share(1): A3
// => Share(2): A3
// => res(do): A4
// => Share(1): A4
// => Share(2): A4
// => complete(do)
// => Share(1): complete
// => Share(2): complete
shareReplay
- あまりよくわからず。。。
- うまく動かせなかった。。。
-
publishReplay()
とほぼ変わらないと思うのだが。。。
さいごに
Observable.subscribe()
と Observable.create()
しか使ったことなかったのに色々理解することができたが。。。
何故こんなものを書いてしまったのか!!
その他、Observable を触っていて Scheduler
やら Notification
やら Subject
やら登場してきましたが、全く理解できていません!