LoginSignup
88
77

More than 5 years have passed since last update.

RxJS Observable まとめ

Posted at

はじめに

何故こんなものを書いてしまったのか・・・orz

いつも触っているフレームワークが Angular.js から Angular に変わり、
HTTPClient の戻り値が ng.IPromise から Observable になっていました。

ReactiveX 自体は AndroidiOS を開発していた頃に RxJavaRxSwift を通して、名前だけ知ってましたので、RxJS もその辺のものなんだなーという程度の認識で、Angularチュートリアルの知識で Observable を使ってました。

Angular を触っている限り Observable は付いてくるし、
RxJS を知っておけば他の RxSwift とかでも活かせるだろうなと思って、
軽い気持ちで Observable の機能をまとめてみようと思ったのがこの記事執筆の始まりです。

※途中までやって他の方々がもっと詳しくまとめている記事を見つけたりしましたが、
※途中までやっちゃったし、自分でまとめた方が理解できるし。って思って最後までまとめました。

Google 翻訳を駆使して、実際に動かしてみて、
自分なりの解釈でまとめてますので全然違うのもあるかと思います。

バージョンとか

  • rxjs 5.5.6

Observable

Category
Create Operators
Mathematical and Aggregate Operators
Conditional and Boolean Operators
Error Handling Operators
Utility Operators
Filtering Operators
Combination Operators
Transformation Operators
Multicasting Operators
  • カテゴリは こちら を参考に。
  • リファレンスは こちら を参考に。
  • メソッド名が太字なものは上記カテゴリに載ってなかったので勝手にカテゴリ分けしたもの
  • 赤文字はちゃんと理解できていないもの

Create Operators

Method Type Method Name
class method bindCallback
class method bindNodeCallback
class method create
class method defer
class method empty
class method from
class method fromEvent
class method fromEventPattern
class method fromPromise
class method interval
class method never
class method of
class method range
class method throw
class method timer
instance method repeat
instance method repeatWhen

bindCallback

  • コールバック付きの関数から Observable を作成する
function func(cb: () => void) {
  setTimeout(() => {
    cb();
  }, 1000);
}

const observable = Rx.Observable.bindCallback(func);
observable().subscribe(() => console.log('success'));

// => success
function func(cb: (s1) => void) {
  setTimeout(() => {
    cb('Hello');
  }, 1000);
}

const observable = Rx.Observable.bindCallback(func);
observable().subscribe(res => console.log(res));

// => Hello
function func(cb: (s1, s2) => void) {
  setTimeout(() => {
    cb('Hello', 'World!');
  }, 1000);
}

const observable = Rx.Observable.bindCallback(func);
observable().subscribe(res => console.log(res));

// => [ 'Hello', 'World' ]
function func(value: number, cb: (n1) => void) {
  setTimeout(() => {
    cb(value * 10);
  }, 1000);
}

const observable = Rx.Observable.bindCallback(func);
observable(5).subscribe(res => console.log(res));

// => 50

bindNodeCallback

  • (err, data) => void のような良くあるコールバック付きの関数から Observable を作成する
    • bindCallback() を先に知っておいた方が良い
    • bindCallback() とだいたい同じ
function func(cb: (err, data) => void) {
  setTimeout(() => {
    cb(null, 'Hello');
  }, 1000);
}

const observable = Rx.Observable.bindNodeCallback(func);
observable().subscribe(res => console.log(res));

// => Hello
function func(cb: (err, data1, data2) => void) {
  setTimeout(() => {
    cb(null, 'Hello', 'World!');
  }, 1000);
}

const observable = Rx.Observable.bindNodeCallback(func);
observable().subscribe(res => console.log(res));

// => [ 'Hello', 'World!' ]
function func(cb: (err, data?) => void) {
  setTimeout(() => {
    cb('Error');
  }, 1000);
}

const observable = Rx.Observable.bindNodeCallback(func);
observable().subscribe(
  res => console.log('ここには来ない'),
  err => console.log(err)
);

// => Error

create

  • 新しい Observable を作成する
const observable = Rx.Observable.create(observer => {
  observer.next(1);
  observer.next(2);
  observer.next(3);
  observer.complete();
});
observable.subscribe(res => console.log(res));

// => 1
// => 2
// => 3
const observable = Rx.Observable.create(observer => {
  observer.next(1);
  observer.error('Error');
  observer.next(2);
  observer.complete();
});
observable.subscribe(
  res => console.log(res),
  err => console.log(err)
);

// => 1
// => Error

defer

  • 遅延して Observable を作成する
Rx.Observable.defer(() => Rx.Observable.of('a', 'b', 'c'))
  .subscribe(res => console.log(res));

// => a
// => b
// => c

empty

  • 何も発行せず complete する Observable を作成する
Rx.Observable.empty()
  .subscribe(
    res => console.log('ここには来ない'),
    err => console.log('ここには来ない'),
    () => console.log('complete')
  );

// => complete

from

  • 配列から Observable を作成する
Rx.Observable.from([10, 20, 30])
  .subscribe(res => console.log(res));

// => a
// => b
// => c

fromEvent

  • DOM Events や EventEmitter から Observable を作成する
Rx.Observable.fromEvent(document, 'click')
  .subscribe(evt => console.log(`x: ${evt.x}, y: ${evt.y}`));

// => x: 299, y: 527
// => x: 332, y: 547
// => x: 343, y: 328

fromEventPattern

  • addHandler , removeHandler に基づいて Observable を作成する
    • fromEvent() を先に知っておいた方が良い
    • あまり良くわかってないが、たぶん fromEvent() とだいたい同じ
Rx.Observable.fromEventPattern(
  (handler: any) => document.addEventListener('click', handler),
  (handler: any) => document.removeEventListener('click', handler)
).subscribe(evt => console.log(`x: ${evt.x}, y: ${evt.y}`));

// => x: 299, y: 527
// => x: 332, y: 547
// => x: 343, y: 328

fromPromise

  • Promise から Observable を作成する
function func(cb: () => void) {
  return new Promise((resolve, reject) => {
    setTimeout(() => {
      resolve('success');
    }, 1000);
  });
}

Rx.Observable.fromPromise(func())
  .subscribe(res => console.log(res));

// => success
function func(cb: () => void) {
  return new Promise((resolve, reject) => {
    setTimeout(() => {
      reject('error');
    }, 1000);
  });
}

Rx.Observable.fromPromise(func())
  .subscribe(
    res => console.log('ここには来ない'),
    err => console.log(err)
  );

// => error

interval

  • 定期的に発行する Observable を作成する
Rx.Observable.interval(1000)
  .subscribe(res => console.log(res));

// => 0
// => 1
// => 2
// ...

never

  • 何もしない Observable を作成する
Rx.Observable.never()
  .subscribe(
    res => console.log('ここには来ない'),
    err => console.log('ここには来ない'),
    () => console.log('ここには来ない')
  );

// => (何も起きない)

of

  • 指定した値を発行する Observable を作成する
Rx.Observable.of(1, 2, 3)
  .subscribe(res => console.log(res));

// => 1
// => 2
// => 3
Rx.Observable.of('a', 'b', 'c')
  .subscribe(res => console.log(res));

// => a
// => b
// => c
Rx.Observable.of<any>([1, 2, 3], ['a', 'b', 'c'])
  .subscribe(res => console.log(res));

// => [ 1, 2, 3 ]
// => [ 'a', 'b', 'c' ]

range

  • start から count 個の数値を発行する Observable を作成する
Rx.Observable.range(1, 10)
  .subscribe(res => console.log(res));

// => 1
// => 2
// => 3
// => ...
// => 9
// => 10
Rx.Observable.range(5, 10)
  .subscribe(res => console.log(res));

// => 5
// => 6
// => 7
// => ...
// => 13
// => 14

throw

  • エラーを発行する Observable を作成する
Rx.Observable.throw('Error')
  .subscribe(
    res => console.log('ここには来ない'),
    err => console.log(err)
  );

// => Error

timer

  • 開始時間を指定して定期的に発行する Observable を作成する
    • interval() とだいたい同じ
Rx.Observable.timer(3000, 1000)
  .subscribe(res => console.log(res));

// => 0
// => 1
// => 2
// => ...

repeat

  • 本線を指定回数繰り返す Observable を作成する
Rx.Observable.of('a', 'b', 'c')
  .repeat(3)
  .subscribe(res => console.log(res));

// => a
// => b
// => c
// => a
// => b
// => c
// => a
// => b
// => c

repeatWhen

  • notifier が発行された時に本線を繰り返す Observable を作成する?
    • repeat() を先に知っておいた方が良い
    • あまり良くわかってないが、たぶん repeat() とだいたい同じ
Rx.Observable.of('a', 'b', 'c')
  .repeatWhen(notifier => notifier.take(3)) // +3回繰り返される
  .subscribe(res => console.log(res));

// => a
// => b
// => c
// => a
// => b
// => c
// => a
// => b
// => c
// => a
// => b
// => c

Mathematical and Aggregate Operators

Method Type Method Name
instance method count
instance method max
instance method min

count

  • Observable が発行する値の個数を発行する
Rx.Observable.of('a', 'bb', 'ccc')
  .count()
  .subscribe(res => console.log(res));

// => 3
Rx.Observable.of('a', 'bb', 'ccc')
  .count(v => v.length >= 2)
  .subscribe(res => console.log(res));

// => 2

max

  • Observable が発行する値の中で最大値を発行する
Rx.Observable.of(5, 4, -1, 8, 2)
  .max()
  .subscribe(res => console.log(res));

// => 8

min

  • Observable が発行する値の中で最小値を発行する
Rx.Observable.of(5, 4, -1, 8, 2)
  .min()
  .subscribe(res => console.log(res));

// => -1

Conditional and Boolean Operators

Method Type Method Name
instance method defaultIfEmpty
instance method every
instance method find
instance method findIndex
instance method isEmpty
instance method sequenceEqual
instance method single

defaultIfEmpty

  • Observable の発行が Empty だった場合にデフォルト値を発行する
Rx.Observable.empty()
  .defaultIfEmpty('defaultValue')
  .subscribe(res => console.log(res));

// => defaultValue
Rx.Observable.empty()
  .defaultIfEmpty({ data: 'defaultValue'})
  .subscribe(res => console.log(res));

// => d{ data: 'defaultValue' }
Rx.Observable.of('a', 'bb', 'ccc')
  .defaultIfEmpty('defaultValue')
  .subscribe(res => console.log(res));

// => a
// => bb
// => ccc

every

  • predicate で指定する条件を全ての値が満たしているかどうかを発行する
    • Array.every() と同じ
Rx.Observable.of(1, 2, 3, 4, 5)
  .every(v => v <= 5)
  .subscribe(res => console.log(res));

// => true
Rx.Observable.of(1, 2, 3, 4, 5)
  .every(v => v < 5)
  .subscribe(res => console.log(res));

// => false

find

  • predicate で指定する条件を満たす値を発行する
    • Array.find() と同じ
Rx.Observable.of(1, 2, 3, 4, 5)
  .find(v => v === 4)
  .subscribe(res => console.log(res));

// => 4
Rx.Observable.of(1, 2, 3, 4, 5)
  .find(v => v % 2 === 0)
  .subscribe(res => console.log(res));

// => 2
Rx.Observable.of(1, 2, 3, 4, 5)
  .find(v => v === 10)
  .subscribe(res => console.log(res));

// => undefined

findIndex

  • predicate で指定する条件を満たす値のインデックスを発行する
    • Array.findIndex() と同じ
Rx.Observable.of(1, 2, 3, 4, 5)
  .findIndex(v => v === 4)
  .subscribe(res => console.log(res));

// => 3
Rx.Observable.of(1, 2, 3, 4, 5)
  .findIndex(v => v % 2 === 0)
  .subscribe(res => console.log(res));

// => 1
Rx.Observable.of(1, 2, 3, 4, 5)
  .findIndex(v => v === 10)
  .subscribe(res => console.log(res));

// => -1

isEmpty

  • Observable の発行する値が Empty かどうかを発行する
Rx.Observable.empty()
  .isEmpty()
  .subscribe(res => console.log(res));

// => true
Rx.Observable.of('a', 'bb', 'ccc')
  .isEmpty()
  .subscribe(res => console.log(res));

// => false

sequenceEqual

  • 同じ発行のある Observable かどうかを発行する
const compareObservable = Rx.Observable.range(5, 10);
Rx.Observable.of(5, 6, 7, 8, 9, 10, 11, 12, 13, 14)
  .sequenceEqual(compareObservable)
  .subscribe(res => console.log(res));

// => true
const compareObservable = Rx.Observable.range(5, 10);
Rx.Observable.of(5, 6, 7, 8, 9, 10)
  .sequenceEqual(compareObservable)
  .subscribe(res => console.log(res));

// => false

single

  • predicate で指定する値が1つしかない場合は成功
Rx.Observable.of('a', 'b', 'c', 'b', 'e')
  .single(v => v === 'a')
  .subscribe(
    res => console.log(res),
    err => console.log('ここには来ない'),
    () => console.log('complete')
  );

// => a
// => complete
Rx.Observable.of('a', 'b', 'c', 'b', 'e')
  .single(v => v === 'b')
  .subscribe(
    res => console.log('ここには来ない'),
    err => console.log(err),
    () => console.log('ここには来ない')
  );

// => Sequence contains more than one element

Error Handling Operators

Method Type Method Name
instance method catch
instance method retry
instance method retryWhen
instance method onErrorResumeNext

catch

  • Observable がエラーを発行したら次の Observable を発行する
Rx.Observable.throw('Error')
  .catch(err => Rx.Observable.of('a', 'bb', 'ccc'))
  .subscribe(
    res => console.log(res),
    err => console.log('ここには来ない'),
    () => console.log('complete')
  );

// => a
// => bb
// => ccc
// => complete
Rx.Observable.interval(1000)
  .map(v => {
    if (v === 4) {
      throw 'Error';
    }
    return v;
  })
  .catch(err => Rx.Observable.of('IV', 'V', 'VI'))
  .subscribe(
    res => console.log(res),
    err => console.log('ここには来ない'),
    () => console.log('complete')
  );

// => 1
// => 2
// => 3
// => IV
// => V
// => VI
// => complete

retry

  • Observable がエラーを発行したら count だけリトライする
Rx.Observable.throw('Error')
  .retry(3)
  .subscribe(
    res => console.log(res),
    err => console.log(err),
    () => console.log('ここには来ない')
  );

// => Error
Rx.Observable.interval(1000)
  .map(v => {
    if (v === 4) {
      throw 'Error';
    }
    return v;
  })
  .retry(3)
  .subscribe(
    res => console.log(res),
    err => console.log(err),
    () => console.log('ここには来ない')
  );

// => 1
// => 2
// => 3
// => 1
// => 2
// => 3
// => 1
// => 2
// => 3
// => 1
// => 2
// => 3
// => Error

retryWhen

  • Observable がエラーを発行したら notifier を基にリトライを行う
    • retry() を先に知っておいた方が良い
    • あまり良くわかってないが、たぶん retry() とだいたい同じ
Rx.Observable.interval(1000)
  .map(v => {
    if (v === 4) {
      throw 'Error';
    }
    return v;
  })
  .do(
    res => console.log(`do(res): ${res}`),
    err => console.log(`do(err): ${err}`),
    () => console.log(`do(complete)`)
  )
  .retryWhen(errors => {
    return errors.delay(200).do(
      res => console.log(`errors(res): ${res}`), // エラーが発生するとここに飛んでくるんだな?
      err => console.log(`errors(err): ${err}`),
      () => console.log(`errors(complete)`)
    );
  })
  .subscribe(
    res => console.log(`res: ${res}`),
    err => console.log(`err: ${err}`),
    () => console.log(`complete`),
  );

// => do(res): 0
// => res: 0
// => do(res): 1
// => res: 1
// => do(res): 2
// => res: 2
// => do(res): 3
// => res: 3
// => do(err): Error
// => errors(res): Error
// => do(res): 0
// => res: 0
// => do(res): 1
// => res: 1
// => do(res): 2
// => res: 2
// => do(res): 3
// => res: 3
// => do(err): Error
// => errors(res): Error
// ...

onErrorResumeNext

  • Observable がエラーを発行したら次の Observable を発行する
    • catch と同じような感じ
    • 但し、コールバックに err は飛んでこないのでエラーハンドリングしたい場合は catch を使うと良い?
Rx.Observable.interval(1000)
  .map(v => {
    if (v === 4) {
      throw 'Error';
    }
    return v;
  })
  .onErrorResumeNext(Rx.Observable.of('IV', 'V', 'VI'))
  .subscribe(
    res => console.log(res),
    err => console.log('ここには来ない'),
    () => console.log('complete')
  );

// => 0
// => 1
// => 2
// => 3
// => IV
// => V
// => VI
// => complete

Utility Operators

Method Type Method Name
instance method delay
instance method delayWhen
instance method dematerialize
instance method do
instance method materialize
instance method observeOn
instance method subscribeOn
instance method timeInterval
instance method timeout
instance method timeoutWith
instance method timestamp
instance method toArray
class method webSocket
instance method finally
instance method forEach
instance method let
instance method lift
instance method pipe
instance method toPromise

delay

  • Observable の発行を遅延させる
Rx.Observable.of('a', 'bb', 'ccc')
  .delay(5000)
  .subscribe(res => console.log(res));

// => a (5s after)
// => bb
// => ccc

delayWhen

  • Observable の発行を delayDurationSelector で返却する Observable が発行させる
    • delay() を先に知っておいた方が良い
Rx.Observable.of('a', 'bbbb', 'cccccccc')
  .delayWhen(v => {
    return Rx.Observable.timer(v.length * 1000)
  })
  .subscribe(res => console.log(res));

// => a (1s after)
// => bbbb (4s after)
// => cccccccc (8s after)

dematerialize

  • Rx.NotificationObservable に変換する?
    • materialize() の逆(先に materialize() 見たほうがいいかも)
    • あまり良くわかってない。。。
const notifA = new Rx.Notification('N', 'a');
const notifB = new Rx.Notification('N', 'b');
const notifE = new Rx.Notification('E', undefined, new TypeError('x.toUpperCase() is not a function'));

Rx.Observable.of(notifA, notifB, notifE)
  .dematerialize()
  .subscribe(
    res => console.log(res),
    err => console.log(err.message),
    () => console.log('ここには来ない'));

// => a
// => b
// => x.toUpperCase() is not a function

do

  • Observable の発行を subscribe する前にフックする
Rx.Observable.of('a', 'bb', 'ccc')
  .do(
    res => console.log(`do(res): ${res}`),
    err => console.log(`ここには来ない`),
    () => console.log(`do(complete)`)
  )
  .subscribe(
    res => console.log(`res: ${res}`),
    err => console.log(`ここには来ない`),
    () => console.log(`complete`)
  );

// => do(res): a
// => res: a
// => do(res): bb
// => res: bb
// => do(res): ccc
// => res: ccc
// => do(complete)
// => complete

materialize

  • ObservableRx.Notification に変換する?
    • あまり良くわかってない。。。
Rx.Observable.of<any>('a', 'b', undefined, 'd').map(v => v.toUpperCase())
  .materialize()
  .subscribe(
    res => console.log(res),
    err => console.log('ここには来ない'),
    () => console.log('complete')
  );

// => Notification { kind: 'N', value: 'A', error: undefined, hasValue: true }
// => Notification { kind: 'N', value: 'B', error: undefined, hasValue: true }
// => Notification {
//    kind: 'E',
//    value: undefined,
//    error: TypeError: Cannot read property 'toUpperCase' of undefined
//    ...
// complete

observeOn

  • Observable の発行を「次のタイミング」で行う?
    • あまり良くわかってない。。。
const observable: Rx.Observable<any> = Rx.Observable.create(observer => {
  console.log('[1] before next()');
  observer.next(1);
  console.log('[1] after next()');

  console.log('[2] before next()');
  observer.next(2);
  console.log('[2] after next()');

  console.log('[3] before complete()');
  observer.complete();
  console.log('[3] after complete()');
});

console.log('[4] before subscribe');
observable.observeOn(Rx.Scheduler.async)
  .subscribe(
    next => console.log(`[emit] next: ${next}`),
    err => console.log(`[emit] error: ${err}`),
    () => console.log(`[emit] complete`)
  );
console.log('[4] after subscribe');

// => [4] before subscribe
// => [1] before next()
// => [1] after next()
// => [2] before next()
// => [2] after next()
// => [3] before complete()
// => [3] after complete()
// => [4] after subscribe
// => [emit] next: 1
// => [emit] next: 2
// => [emit] complete

subscribeOn

  • Observable の実行を「次のタイミング」で行う?
    • あまり良くわかってない。。。
const o1: Rx.Observable<any> = Rx.Observable.create(observer => {
  console.log('[1] before next()');
  observer.next(1);
  console.log('[1] after next()');

  console.log('[2] before next()');
  observer.next(2);
  console.log('[2] after next()');

  console.log('[3] before complete()');
  observer.complete();
  console.log('[3] after complete()');
});

console.log('[4] before subscribe');
o1.subscribeOn(Rx.Scheduler.async)
  .subscribe(
    next => console.log(`[emit] next: ${next}`),
    err => console.log(`[emit] error: ${err}`),
    () => console.log(`[emit] complete`)
  );
console.log('[4] after subscribe');

// => [4] before subscribe
// => [4] after subscribe
// => [1] before next()
// => [emit] next: 1
// => [1] after next()
// => [2] before next()
// => [emit] next: 2
// => [2] after next()
// => [3] before complete()
// => [emit] complete: 
// => [3] after complete()

timeInterval

  • 前の発行からどれくらい経ったかを発行する
Rx.Observable.of('a', 'bbbb', 'cccccccc')
  .delayWhen(v => Rx.Observable.timer(v.length * 1000))
  .timeInterval()
  .subscribe(
    res => console.log(res),
    err => console.log('ここには来ない'),
    res => console.log('complete'),
  );

// => TimeInterval { value: 'a', interval: 1005 }
// => TimeInterval { value: 'bbbb', interval: 3000 }
// => TimeInterval { value: 'cccccccc', interval: 4000 }
// => complete

timeout

  • タイムアウト時刻まで発行が無い場合はエラーを発行する
Rx.Observable.timer(5000, 1000)
  .timeout(1000)
  .subscribe(
    res => console.log('ここには来ない'),
    err => console.log(err.message),
    () => console.log('ここには来ない')
  );

// => Timeout has occurred

timeoutWith

  • タイムアウト時刻まで発行が無い場合は別の Observable を発行する
    • timeout() を先に知っておいた方が良い
    • timeout() + catch() な感じ?
Rx.Observable.timer(5000, 1000)
  .timeoutWith(1000, Rx.Observable.of('a', 'bb', 'ccc'))
  .subscribe(
    res => console.log(res),
    err => console.log('ここには来ない'),
    () => console.log('complete')
  );

// => a
// => bb
// => ccc
// => complete

timestamp

  • 発行時刻が付加される
Rx.Observable.of('a', 'bbbb', 'cccccccc')
  .delayWhen(v => Rx.Observable.timer(v.length * 1000))
  .timestamp()
  .subscribe(
    res => console.log(res),
    err => console.log('ここには来ない'),
    () => console.log('complete')
  );

// => Timestamp { value: 'a', timestamp: 1521783521464 }
// => Timestamp { value: 'bbbb', timestamp: 1521783524459 }
// => Timestamp { value: 'cccccccc', timestamp: 1521783528462 }
// => complete

toArray

  • 発行された値を配列に変換する
Rx.Observable.of('a', 'b', 'c')
  .toArray()
  .subscribe(
    res => console.log(res),
    err => console.log('ここには来ない'),
    () => console.log('complete')
  );

// => [ 'a', 'b', 'c' ]
// => complete

webSocket

  • WebSocket を WebSocketSubject にラップする
    • あまり良くわかってない。。。
    • うまく動かせなかった。。。
const ws = Rx.Observable.webSocket('ws://localhost:8081');
ws.subscribe(
  res => console.log('message received: ' + res),
  err => console.log(err),
  () => console.log('complete')
);
ws.next(JSON.stringify({ op: 'hello' }));

finally

  • 一番最後に発火する関数を定義する
Rx.Observable.of('a', 'bb', 'ccc')
  .finally(() => {
    console.log('Finally!')
  })
  .subscribe(
    res => console.log(res),
    err => console.log('ここには来ない'),
    () => console.log('complete')
  );

// => a
// => bb
// => ccc
// => complete
// => Finally!
Rx.Observable.of('a', 'bb', 'ccc')
  .map(v => {
    if (v.length > 2) {
      throw new Error('Error!');
    }
    return v;
  })
  .finally(() => {
    console.log('Finally!')
  })
  .subscribe(
    res => console.log(res),
    err => console.log(err.message),
    () => console.log('ここには来ない')
  );

// => a
// => bb
// => Error!
// => Finally!

forEach

  • Subscribe ではなく Promise で購読する
Rx.Observable.of('a', 'bb', 'ccc')
  .forEach(v => console.log(`v: ${v}`))
  .then(res => console.log(`then: ${res}`))
  .catch(err => console.log('ここには来ない'));

// => v: a
// => v: bb
// => v: ccc
// => then: undefined
Rx.Observable.of('a', 'bb', 'ccc')
  .map(v => {
    if (v.length > 2) {
      throw new Error('Error!')
    }
    return v;
  })
  .forEach(v => console.log(`v: ${v}`))
  .then(res => console.log('ここには来ない'))
  .catch(err => console.log(`catch: ${err.message}`));

// => v: a
// => v: bb
// => catch: Error!

let

  • Observable が発行を Subscribeする前にフックできる?
    • do() の Observable 版?
    • あまり良くわかってない。。。
Rx.Observable.of('a', 'bbbb', 'cccccccc')
  .let(selector => {
    return selector.do(
      res => console.log(`let.do(res): ${res}`),
      err => console.log(`let.do(err): ${err}`),
      () => console.log(`let.do(complete)`)
    );
  })
  .do(
    res => console.log(`do(res): ${res}`),
    err => console.log(`do(err): ${err}`),
    () => console.log(`do(complete)`)
  )
  .subscribe(
    res => console.log(`res: ${res}`),
    err => console.log(`err: ${err}`),
    () => console.log(`complete`)
  );

// => let.do(res): a
// => do(res): a
// => res: a
// => let.do(res): bbbb
// => do(res): bbbb
// => res: bbbb
// => let.do(res): cccccccc
// => do(res): cccccccc
// => res: cccccccc
// => let.do(complete)
// => do(complete)
// => complete

lift

  • カスタムオペレータを差し込めるらしい
    • あまり良くわかってない。。。
    • うまく動かせなかった。。。

pipe

  • メソッドチェーンではない書き方ができる
    • メリットがわからない
Rx.Observable.range(1, 10)
  .pipe(
    filter(v => v % 2 === 0),
    map(v => v * 10),
    scan((acc, v) => acc + v, 0)
  )
  .subscribe(
    res => console.log(res),
    err => console.log('ここには来ない'),
    () => console.log('complete')
  );

// => 20
// => 60
// => 120
// => 200
// => 300
// => complete

toPromise

  • Observable ではなく Promise で最後の値を取得する
    • Promise で購読する場合は forEach() を使う?
Rx.Observable.of('a', 'bbbb', 'cccccccc')
  .toPromise()
  .then(res => console.log(res))
  .catch(err => console.log('ここには来ない'));

// => cccccccc
Rx.Observable.of('a', 'bbbb', 'cccccccc')
  .map(v => {
    if (v.length > 4) {
      // throw しても catch には飛ばない
      return new Error('Error!');
    }
    return v;
  })
  .toPromise()
  .then(res => console.log(res))
  .catch(err => console.log('ここには来ない'));

// => Error: Error!

Filtering Operators

Method Type Method Name
instance method audit
instance method auditTime
instance method debounce
instance method debounceTime
instance method distinct
instance method distinctUntilChanged
instance method distinctUntilKeyChanged
instance method elementAt
instance method filter
instance method first
instance method ignoreElements
instance method last
instance method sample
instance method sampleTime
instance method skip
instance method skipLast
instance method skipUntil
instance method skipWhile
instance method take
instance method takeLast
instance method takeUntil
instance method takeWhile
instance method throttle
instance method throttleTime

audit

  • Observable は durationSelector で指定した Observable が発行したら最新の値を発行する
Rx.Observable.interval(100)
  .audit(v => Rx.Observable.timer(10000))
  .subscribe(res => console.log(res));

// => 97
// => 195
// => 293
// ...
Rx.Observable.interval(100)
  .map(v => {
    if (v > 100) {
      throw 'Error!';
    }
    return Rx.Observable.timer(10000);
  })
  .subscribe(
    res => console.log(res),
    err => console.log(err.message),
    () => console.log('ここには来ない')
  );

// => 96
// => 195
// => Error!

auditTime

  • Observable は duration で指定された時刻が来たら最新の値を発行する
    • audit() を先に知っておいた方が良い
Rx.Observable.interval(100)
  .auditTime(10000)
  .subscribe(res => console.log(res));

// => 98
// => 198
// => 298
// ...

debounce

  • durationSelector で指定した Observable が発行するまで遅延する
    • durationSelector が発火する前に次の値が発行したら遅延した値は消える
    • あまり良くわかってないが、そんな感じと思われる・・・。
Rx.Observable.interval(1000)
  .debounce(v => Rx.Observable.timer(500))
  .subscribe(res => console.log(res));

// => 0
// => 1
// => 2
// ...

debounceTime

  • dueTime で指定した時間まで遅延する
    • debounce() を先に知っておいた方が良い
    • dueTime が経つ前に次の値が発行したら遅延した値は消える
    • あまり良くわかってないが、そんな感じと思われる・・・。
Rx.Observable.interval(1000)
  .debounceTime(2000)
  .subscribe(res => console.log(res));

distinct

  • Observable が発行する値の重複を排除する
Rx.Observable.of(1, 1, 2, 2, 2, 1, 2, 4, 3, 2, 3, 1)
  .distinct()
  .subscribe(
    res => console.log(res),
    err => console.log('ここには来ない'),
    () => console.log('complete')
  );

// => 1
// => 2
// => 4
// => 3
// => complete

distinctUntilChanged

  • Observable が発行する値のうち、直前の値と同じ場合は排除する
    • distinct() を先に知っておいた方が良い
Rx.Observable.of(1, 1, 2, 2, 2, 1, 2, 4, 3, 2, 3, 1)
  .distinctUntilChanged()
  .subscribe(
    res => console.log(res),
    err => console.log('ここには来ない'),
    () => console.log('complete')
  );

// => 1
// => 2
// => 1
// => 2
// => 4
// => 3
// => 2
// => 3
// => 1
// => complete

distinctUntilKeyChanged

  • Observable が発行する値のうち、特定の Key に対して直前の値と同じ場合は排除する
    • distinct() を先に知っておいた方が良い
Rx.Observable.of(
  { age: 10, name: 'Hoge' },
  { age: 20, name: 'Foo' },
  { age: 30, name: 'Foo' },
  { age: 10, name: 'Hoge' })
  .distinctUntilKeyChanged('name')
  .subscribe(
    res => console.log(res),
    err => console.log('ここには来ない'),
    () => console.log('complete')
  );

// => { age: 10, name: 'Hoge' }
// => { age: 20, name: 'Foo' }
// => { age: 10, name: 'Hoge' }
// => complete
Rx.Observable.of(
  { age: 10, name: 'Hoge' },
  { age: 20, name: 'Foo' },
  { age: 20, name: 'Hoge' },
  { age: 10, name: 'Foo' })
  .distinctUntilKeyChanged('age')
  .subscribe(
    res => console.log(res),
    err => console.log('ここには来ない'),
    () => console.log('complete')
  );

// => { age: 10, name: 'Hoge' }
// => { age: 20, name: 'Foo' }
// => { age: 10, name: 'Foo' }
// => complete

elementAt

  • Observable が発行する値のうち、特定の値を発行する
Rx.Observable.of('A', 'B', 'C')
  .elementAt(1)
  .subscribe(
    res => console.log(res),
    err => console.log('ここには来ない'),
    () => console.log('complete')
  );

// => B
// => complete

filter

  • Observable が発行する値を predicate で指定した条件にフィルタする
    • Array.filter() と同じ
Rx.Observable.range(1, 5)
  .filter(v => v % 2 === 0)
  .subscribe(
    res => console.log(res),
    err => console.log('ここには来ない'),
    () => console.log('complete')
  );

// => 2
// => 4
// => complete

first

  • Observable が最初に発行する値のみにフィルタする
    • predicate を指定して条件を指定することも可能
Rx.Observable.range(1, 5)
  .first()
  .subscribe(
    res => console.log(res),
    err => console.log('ここには来ない'),
    () => console.log('complete')
  );

// => 1
// => complete
Rx.Observable.range(1, 5)
  .first(v => v % 2 === 0)
  .subscribe(
    res => console.log(res),
    err => console.log('ここには来ない'),
    () => console.log('complete')
  );

// => 2
// => complete

ignoreElements

  • Observable が発行する全ての値を無視する
Rx.Observable.range(1, 5)
  .ignoreElements()
  .subscribe(
    res => console.log('ここには来ない'),
    err => console.log('ここには来ない'),
    () => console.log('complete')
  );

// => complete

last

  • Observable が最初に発行する値のみにフィルタする
    • predicate を指定して条件を指定することも可能
Rx.Observable.range(1, 5)
  .last()
  .subscribe(
    res => console.log(res),
    err => console.log('ここには来ない'),
    () => console.log('complete')
  );

// => 5
// => complete
Rx.Observable.range(1, 5)
  .last(v => v % 2 === 0)
  .subscribe(
    res => console.log(res),
    err => console.log('ここには来ない'),
    () => console.log('complete')
  );

// => 4
// => complete

sample

  • notifier で指定された Observable による通知があった時に最新の値を発行する
    • 最新の値に変化がない場合は何も起きない
Rx.Observable.interval(1000)
  .sample(Rx.Observable.interval(5000))
  .subscribe(res => console.log(res));

// => 3
// => 8
// => 13
// => ...
Rx.Observable.interval(5000)
  .sample(Rx.Observable.interval(1000))
  .subscribe(res => console.log(res));

// => 0
// => 1
// => 2
// => 3
// => ...

sampleTime

  • period で指定された時間が経った時に最新の値を発行する
    • sample() を先に知っておいた方が良い
    • 最新の値に変化がない場合は何も起きない
Rx.Observable.interval(1000)
  .sampleTime(5000)
  .subscribe(res => console.log(res));

// => 3
// => 8
// => 13
// => ...
Rx.Observable.interval(5000)
  .sampleTime(1000)
  .subscribe(res => console.log(res));

// => 0
// => 1
// => 2
// => 3
// => ...

skip

  • Observable が発行する値を count だけスキップする
Rx.Observable.range(1, 5)
  .skip(3)
  .subscribe(
    res => console.log(res),
    err => console.log('ここには来ない'),
    () => console.log('complete')
  );

// => 4
// => 5
// => complete

skipLast

  • Observable が発行する値を count だけ後ろからスキップする
    • skip() を先に知っておいた方が良い
Rx.Observable.range(1, 5)
  .skipLast(3)
  .subscribe(
    res => console.log(res),
    err => console.log('ここには来ない'),
    () => console.log('complete')
  );

// => 1
// => 2
// => complete

skipUntil

  • notifier で指定された Observable による通知があるまでスキップする
    • skip() を先に知っておいた方が良い
Rx.Observable.interval(1000)
  .skipUntil(Rx.Observable.timer(5000))
  .subscribe(res => console.log(res));

// => 4
// => 5
// => ...

skipWhile

  • predicate で指定する条件が true の間はスキップする
    • skip() を先に知っておいた方が良い
    • 一度条件から外れると発行が行われる
    • (途中から再びスキップされる。とかは無い)
Rx.Observable.interval(1000)
  .skipWhile(v => v < 5)
  .subscribe(res => console.log(res));

// => 5
// => 6
// => ...
Rx.Observable.interval(1000)
  .skipWhile(v => v > 5)
  .subscribe(res => console.log(res));

// => 0
// => 1
// => 2
// => 3
// => 4
// => 5
// => 6
// => 7
// => ...

take

  • Observable が発行する値のうち count 回だけ発行する
Rx.Observable.interval(1000)
  .take(3)
  .subscribe(
    res => console.log(res),
    err => console.log('ここには来ない'),
    () => console.log('complete')
  );

// => 0
// => 1
// => 2
// => complete

takeLast

  • Observable が発行する値のうち、ラスト count 回だけ発行する
    • take() を先に知っておいた方が良い
Rx.Observable.interval(1000)
  .take(5)
  .takeLast(2)
  .subscribe(
    res => console.log(res),
    err => console.log('ここには来ない'),
    () => console.log('complete')
  );

// => 3
// => 4
// => complete

takeUntil

  • notifier に指定された Observable による通知があるまで発行する
    • take() を先に知っておいた方が良い
Rx.Observable.interval(1000)
  .takeUntil(Rx.Observable.timer(5000))
  .subscribe(
    res => console.log(res),
    err => console.log('ここには来ない'),
    () => console.log('complete')
  );

// => 0
// => 1
// => 2
// => 3
// => complete

takeWhile

  • predicate が true の間は発行する
    • take() を先に知っておいた方が良い
    • 一度条件から外れたら終了
    • (途中から再び発行し始める。とかは無い)
Rx.Observable.interval(1000)
  .takeWhile(v => v < 5)
  .subscribe(
    res => console.log(res),
    err => console.log('ここには来ない'),
    () => console.log('complete')
  );

// => 0
// => 1
// => 2
// => 3
// => 4
// => complete

throttle

  • だいたい sample() と同じ動きをする?
    • durationSelector が発行されるまで Observable の発行は無視される
Rx.Observable.interval(1000)
  .throttle(v => Rx.Observable.timer(5000))
  .subscribe(res => console.log(res));

// => 0
// => 5
// => 10
// => ...

throttleTime

  • duration で指定するバージョン
    • throttle() を先に知っておいた方が良い
Rx.Observable.interval(1000)
  .throttleTime(5000)
  .subscribe(res => console.log(res));

// => 0
// => 5
// => 10
// => ...

Combination Operators

Method Type Method Name
class method combineLatest
class method concat
class method forkJoin
class method merge
class method zip
instance method combineAll
instance method combineLatest
instance method concat
instance method concatAll
instance method exhaust
instance method merge
instance method mergeAll
instance method race
instance method startWith
instance method switch
instance method withLatestFrom
instance method zip
instance method zipAll

combineLatest

  • 複数の Observable を組み合わせる
const observable1 = Rx.Observable.timer(0, 1000).map(v => `A${v}`);
const observable2 = Rx.Observable.timer(0, 3000).map(v => `B${v}`);
Rx.Observable.combineLatest(observable1, observable2)
  .subscribe(res => console.log(res));

// => [ 'A0', 'B0' ]
// (1s 待機)
// => [ 'A1', 'B0' ]
// (1s 待機)
// => [ 'A2', 'B0' ]
// (1s 待機)
// => [ 'A2', 'B1' ]
// => [ 'A3', 'B1' ]
// (1s 待機)
// => [ 'A4', 'B1' ]
// (1s 待機)
// => [ 'A5', 'B1' ]
// (1s 待機)
// => [ 'A5', 'B2' ]
// => [ 'A6', 'B2' ]
// (1s 待機)
// => [ 'A7', 'B2' ]
// (1s 待機)
// => [ 'A8', 'B2' ]
// (1s 待機)
// => [ 'A8', 'B3' ]
// => [ 'A9', 'B3' ]
// (1s 待機)
// ...

concat

  • 複数の Observable を順次実行する
const observable1 = Rx.Observable.timer(0, 1000).map(v => `A${v}`).take(5);
const observable2 = Rx.Observable.timer(0, 500).map(v => `B${v}`).take(5);
const observable3 = Rx.Observable.timer(0, 2000).map(v => `C${v}`).take(5);
Rx.Observable.concat(observable1, observable2, observable3)
  .subscribe(
    res => console.log(res),
    err => console.log('ここには来ない'),
    () => console.log('complete')
  );

// => A0
// => A1
// => A2
// => A3
// => A4
// => B0
// => B1
// => B2
// => B3
// => B4
// => C0
// => C1
// => C2
// => C3
// => C4
// => complete

forkJoin

  • 複数の Observable を並列実行し、最後の値のみ発行する
const observable1 = Rx.Observable.timer(0, 1000).map(v => `A${v}`).take(5);
const observable2 = Rx.Observable.timer(0, 500).map(v => `B${v}`).take(5);
const observable3 = Rx.Observable.timer(0, 2000).map(v => `C${v}`).take(5);
Rx.Observable.forkJoin(observable1, observable2, observable3)
  .subscribe(
    res => console.log(res),
    err => console.log('ここには来ない'),
    () => console.log('complete')
  );

merge

  • 複数の Observable を並列実行し、それぞれ値を発行する
const observable1 = Rx.Observable.timer(0, 1000).map(v => `A${v}`).take(5);
const observable2 = Rx.Observable.timer(0, 500).map(v => `B${v}`).take(5);
const observable3 = Rx.Observable.timer(0, 2000).map(v => `C${v}`).take(5);
Rx.Observable.merge(observable1, observable2, observable3)
  .subscribe(
    res => console.log(res),
    err => console.log('ここには来ない'),
    () => console.log('complete')
  );

// => A0
// => B0
// => C0
// => B1
// => A1
// => B2
// => B3
// => C1
// => A2
// => B4
// => A3
// => C2
// => A4
// => C3
// => C4
// => complete

zip

  • 複数の Observable を合成して値を発行する
const age = Rx.Observable.of(10, 20, 30);
const name = Rx.Observable.of('Hoge', 'Foo', 'Bar');
const developer = Rx.Observable.of(true, true, false);
Rx.Observable.zip(age, name, developer)
  .subscribe(
    res => console.log(res),
    err => console.log('ここには来ない'),
    () => console.log('complete')
  );

// => [ 10, 'Hoge', true ]
// => [ 20, 'Foo', true ]
// => [ 30, 'Bar', false ]
// => complete

combineAll

  • 外側 Observable の発行が完了したら、内側の複数の Observable が組み合わされて発行される
Rx.Observable.timer(0, 1000).map(v => `Outer(${v})`).take(5)
  .do(
    res => console.log(`do(res): ${res}`),
    err => console.log('do(err): ここには来ない'),
    () => console.log('do(complete): complete')
  )
  .map(v => Rx.Observable.of(`Inner: ${v}`))
  .combineAll()
  .subscribe(
    res => console.log(res),
    err => console.log('ここには来ない'),
    () => console.log('complete')
  );

// => do(res): Outer(0)
// => do(res): Outer(1)
// => do(res): Outer(2)
// => do(res): Outer(3)
// => do(res): Outer(4)
// => do(complete): complete
// => [ 'Inner: Outer(0)', 'Inner: Outer(1)', 'Inner: Outer(2)', 'Inner: Outer(3)', 'Inner: Outer(4)' ]
// => complete

combineLatest

  • 複数の Observable を組み合わせる
    • instance method 版
Rx.Observable.timer(0, 1000).map(v => `A${v}`)
  .combineLatest(Rx.Observable.timer(0, 3000).map(v => `B${v}`))
  .subscribe(res => console.log(res));

// => [ 'A0', 'B0' ]
// (1s 待機)
// => [ 'A1', 'B0' ]
// (1s 待機)
// => [ 'A2', 'B0' ]
// (1s 待機)
// => [ 'A2', 'B1' ]
// => [ 'A3', 'B1' ]
// (1s 待機)
// => [ 'A4', 'B1' ]
// (1s 待機)
// => [ 'A5', 'B1' ]
// (1s 待機)
// => [ 'A5', 'B2' ]
// => [ 'A6', 'B2' ]
// (1s 待機)
// => [ 'A7', 'B2' ]
// (1s 待機)
// => [ 'A8', 'B2' ]
// (1s 待機)
// => [ 'A8', 'B3' ]
// => [ 'A9', 'B3' ]

concat

  • 複数の Observable を順次実行する
    • instance method 版
Rx.Observable.timer(0, 1000).map(v => `A${v}`).take(5)
  .concat(Rx.Observable.timer(0, 500).map(v => `B${v}`).take(5))
  .concat(Rx.Observable.timer(0, 2000).map(v => `C${v}`).take(5))
  .subscribe(
    res => console.log(res),
    err => console.log('ここには来ない'),
    () => console.log('complete')
  );

// => A0
// => A1
// => A2
// => A3
// => A4
// => B0
// => B1
// => B2
// => B3
// => B4
// => C0
// => C1
// => C2
// => C3
// => C4
// => complete

concatAll

  • 外側の Observable が発行されたら内側の Observable の発行が完了するまで待つ
    • concat() を先に知っておいた方が良い
Rx.Observable.of('A', 'B', 'C', 'D', 'E').map(v => `Outer(${v})`)
  .map(v => Rx.Observable.timer(0, 1000).take(3).map(vv => `Inner(${vv}): ${v}`))
  .concatAll()
  .subscribe(
    res => console.log(res),
    err => console.log('ここには来ない'),
    () => console.log('complete')
  );

// => Inner(0): Outer(A)
// => Inner(1): Outer(A)
// => Inner(2): Outer(A)
// => Inner(0): Outer(B)
// => Inner(1): Outer(B)
// => Inner(2): Outer(B)
// => Inner(0): Outer(C)
// => Inner(1): Outer(C)
// => Inner(2): Outer(C)
// => Inner(0): Outer(D)
// => Inner(1): Outer(D)
// => Inner(2): Outer(D)
// => Inner(0): Outer(E)
// => Inner(1): Outer(E)
// => Inner(2): Outer(E)
// => complete

exhaust

  • 外側の Observable が発行されたら内側の Observable が発行し始める
  • 内側の Observable の発行完了前に次の外側 の Observable が発行したら無視される
// concatAll と同じ動き?
Rx.Observable.timer(0, 5000).take(3).map(v => `Outer(${v})`)
  .map(v => Rx.Observable.of(`Inner: ${v}`))
  .exhaust()
  .subscribe(
    res => console.log(res),
    err => console.log('ここには来ない'),
    () => console.log('complete')
  );

// => Inner: Outer(0)
// => Inner: Outer(1)
// => Inner: Outer(2)
// => complete
// concatAll と同じ動き?
Rx.Observable.timer(0, 5000).take(3).map(v => `Outer(${v})`)
  .map(v => Rx.Observable.timer(0, 1000).take(4).map(vv => `Inner(${vv}): ${v}`))
  .exhaust()
  .subscribe(
    res => console.log(res),
    err => console.log('ここには来ない'),
    () => console.log('complete')
  );

// => Inner(0): Outer(0)
// => Inner(1): Outer(0)
// => Inner(2): Outer(0)
// => Inner(3): Outer(0)
// => Inner(0): Outer(1)
// => Inner(1): Outer(1)
// => Inner(2): Outer(1)
// => Inner(3): Outer(1)
// => Inner(0): Outer(2)
// => Inner(1): Outer(2)
// => Inner(2): Outer(2)
// => Inner(3): Outer(2)
// => complete
Rx.Observable.timer(0, 5000).take(3).map(v => `Outer(${v})`)
  .map(v => Rx.Observable.timer(0, 1000).take(7).map(vv => `Inner(${vv}): ${v}`))
  .exhaust()
  .subscribe(
    res => console.log(res),
    err => console.log('ここには来ない'),
    () => console.log('complete')
  );

// => Inner(0): Outer(0)
// => Inner(1): Outer(0)
// => Inner(2): Outer(0)
// => Inner(3): Outer(0)
// => Inner(4): Outer(0)
// => Inner(5): Outer(0)
// => Inner(6): Outer(0)
// => Inner(0): Outer(2)
// => Inner(1): Outer(2)
// => Inner(2): Outer(2)
// => Inner(3): Outer(2)
// => Inner(4): Outer(2)
// => Inner(5): Outer(2)
// => Inner(6): Outer(2)
// => complete

merge

  • 複数の Observable を並列実行し、それぞれ値を発行する
    • instance method 版
Rx.Observable.timer(0, 1000).take(5).map(v => `A${v}`)
  .merge(Rx.Observable.timer(0, 500).take(5).map(v => `B${v}`))
  .merge(Rx.Observable.timer(0, 2000).take(5).map(v => `C${v}`))
  .subscribe(
    res => console.log(res),
    err => console.log('ここには来ない'),
    () => console.log('complete')
  );

// => A0
// => B0
// => C0
// => B1
// => A1
// => B2
// => B3
// => C1
// => A2
// => B4
// => A3
// => C2
// => A4
// => C3
// => C4
// => complete

mergeAll

  • 外側の Observable が発行されたら内側の Observable が発行し始める
  • 内側の Observable の発行完了前に次の外側の Observable が発行したらそれぞれの値を発行する
    • merge() を先に知っておいた方が良い
Rx.Observable.timer(0, 5000).take(3).map(v => `Outer(${v})`)
  .map(v => Rx.Observable.timer(0, 1000).take(7).map(vv => `Inner(${vv}): ${v}`))
  .mergeAll()
  .subscribe(
    res => console.log(res),
    err => console.log('ここには来ない'),
    () => console.log('complete')
  );

// => Inner(0): Outer(0)
// => Inner(1): Outer(0)
// => Inner(2): Outer(0)
// => Inner(3): Outer(0)
// => Inner(4): Outer(0)
// => Inner(0): Outer(1)
// => Inner(5): Outer(0)
// => Inner(1): Outer(1)
// => Inner(6): Outer(0)
// => Inner(2): Outer(1)
// => Inner(3): Outer(1)
// => Inner(4): Outer(1)
// => Inner(0): Outer(2)
// => Inner(5): Outer(1)
// => Inner(1): Outer(2)
// => Inner(6): Outer(1)
// => Inner(2): Outer(2)
// => Inner(3): Outer(2)
// => Inner(4): Outer(2)
// => Inner(5): Outer(2)
// => Inner(6): Outer(2)
// => complete

race

  • 一番発行の速い Observable が採用される
Rx.Observable.interval(1000).take(2).map(v => `A${v}`)
  .race(Rx.Observable.interval(500).take(5).map(v => `B${v}`))
  .race(Rx.Observable.interval(2000).take(1).map(v => `C${v}`))
  .subscribe(
    res => console.log(res),
    err => console.log('ここには来ない'),
    () => console.log('complete')
  );

// => B0
// => B1
// => B2
// => B3
// => B4
// => complete

startWith

  • Observable の先頭に値を追加する
Rx.Observable.range(1, 5)
  .startWith('A' as any)
  .subscribe(...callbacks);

// => A
// => 1
// => 2
// => 3
// => 4
// => 5
// => complete

switch

  • 外側の Observable が発行する度に内側の Observable に切り替わる
Rx.Observable.timer(0, 5000).take(3).map(v => `Outer(${v})`)
  .map(v => Rx.Observable.timer(0, 1000).map(vv => `Inner(${vv}): ${v}`))
  .switch()
  .subscribe(res => console.log(res));

// => Inner(0): Outer(0)
// => Inner(1): Outer(0)
// => Inner(2): Outer(0)
// => Inner(3): Outer(0)
// => Inner(4): Outer(0)
// => Inner(0): Outer(1)
// => Inner(1): Outer(1)
// => Inner(2): Outer(1)
// => Inner(3): Outer(1)
// => Inner(4): Outer(1)
// => Inner(0): Outer(2)
// => Inner(1): Outer(2)
// => Inner(2): Outer(2)
// => Inner(3): Outer(2)
// => Inner(4): Outer(2)
// => Inner(5): Outer(2)
// => Inner(6): Outer(2)
// => Inner(7): Outer(2)

withLatestFrom

  • Observable が発行するときに、別の Observable の最新の値と合成して発行する
Rx.Observable.timer(0, 5000).take(3).map(v => `A${v}`)
  .withLatestFrom(Rx.Observable.timer(0, 1000).map(v => `B${v}`))
  .subscribe(
    res => console.log(res),
    err => console.log('ここには来ない'),
    () => console.log('complete')
  );

// => [ 'A(0)', 'B0' ]
// => [ 'A(1)', 'B4' ]
// => [ 'A(2)', 'B9' ]
// => complete

zip

  • 複数の Observable を合成して値を発行する
    • instance method 版
const age = Rx.Observable.of(10, 20, 30);
const name = Rx.Observable.of('Hoge', 'Foo', 'Bar');
const developer = Rx.Observable.of(true, true, false);
age.zip(name, developer)
  .subscribe(
    res => console.log(res),
    err => console.log('ここには来ない'),
    () => console.log('complete')
  );

// => [ 10, 'Hoge', true ]
// => [ 20, 'Foo', true ]
// => [ 30, 'Bar', false ]
// => complete
const age = Rx.Observable.of(10, 20, 30);
const name = Rx.Observable.of('Hoge', 'Foo', 'Bar');
const developer = Rx.Observable.of(true, true, false);
age.zip(name)
  .zip(developer)
  .subscribe(
    res => console.log(res),
    err => console.log('ここには来ない'),
    () => console.log('complete')
  );

// => [ [ 10, 'Hoge' ], true ]
// => [ [ 20, 'Foo' ], true ]
// => [ [ 30, 'Bar' ], false ]
// => complete

zipAll

  • 外側の Observable を内側の Observable に合成して発行する
    • zip() を先に知っておいた方が良い
Rx.Observable.timer(0, 5000).take(3).map(v => `Outer(${v})`)
  .map(v => Rx.Observable.timer(0, 1000).take(7).map(vv => `Inner(${vv}): ${v}`))
  .zipAll()
  .subscribe(
    res => console.log(res),
    err => console.log('ここには来ない'),
    () => console.log('complete')
  );

// => [ 'Inner(0): Outer(0)', 'Inner(0): Outer(1)', 'Inner(0): Outer(2)' ]
// => [ 'Inner(1): Outer(0)', 'Inner(1): Outer(1)', 'Inner(1): Outer(2)' ]
// => [ 'Inner(2): Outer(0)', 'Inner(2): Outer(1)', 'Inner(2): Outer(2)' ]
// => [ 'Inner(3): Outer(0)', 'Inner(3): Outer(1)', 'Inner(3): Outer(2)' ]
// => [ 'Inner(4): Outer(0)', 'Inner(4): Outer(1)', 'Inner(4): Outer(2)' ]
// => [ 'Inner(5): Outer(0)', 'Inner(5): Outer(1)', 'Inner(5): Outer(2)' ]
// => [ 'Inner(6): Outer(0)', 'Inner(6): Outer(1)', 'Inner(6): Outer(2)' ]

Transformation Operators

Method Type Method Name
instance method buffer
instance method bufferCount
instance method bufferTime
instance method bufferToggle
instance method bufferWhen
instance method concatMap
instance method concatMapTo
instance method exhaustMap
instance method expand
instance method groupBy
instance method map
instance method mapTo
instance method mergeMap
instance method mergeMapTo
instance method mergeMapScan
instance method pairwise
instance method partition
instance method pluck
instance method scan
instance method switchMap
instance method switchMapTo
instance method window
instance method windowCount
instance method windowToggle
instance method windowWhen
instance method reduce

buffer

  • Observable が発行する値をバッファしておく
    • closingNotifier による通知がある度にバッファを発行する
Rx.Observable.interval(1000)
  .buffer(Rx.Observable.interval(5000))
  .subscribe(res => console.log(res));

// => [ 0, 1, 2, 3 ]
// => [ 4, 5, 6, 7, 8 ]
// => [ 9, 10, 11, 12, 13 ]
// => ...
Rx.Observable.interval(1000)
  .buffer(Rx.Observable.interval(5000).take(3))
  .subscribe(
    res => console.log(res),
    err => console.log('ここには来ない'),
    () => console.log('complete')
  );

// => [ 0, 1, 2, 3 ]
// => [ 4, 5, 6, 7, 8 ]
// => [ 9, 10, 11, 12, 13 ]
// => complete

bufferCount

  • Obaservable が発行する値を bufferSize だけバッファしておき、溜まったら発行する
    • buffer() を先に知っておいた方が良い
Rx.Observable.interval(1000)
  .bufferCount(5)
  .subscribe(res => console.log(res));

// => [ 0, 1, 2, 3, 4 ]
// => [ 5, 6, 7, 8, 9 ]
// => [ 10, 11, 12, 13, 14 ]
// => ...

bufferTime

  • Observable が発行する値を bufferTimeSpan が経過するまでバッファしておく
    • buffer() を先に知っておいた方が良い
Rx.Observable.interval(1000)
  .bufferTime(5000)
  .subscribe(res => console.log(res));

// => [ 0, 1, 2, 3 ]
// => [ 4, 5, 6, 7, 8 ]
// => [ 9, 10, 11, 12, 13 ]
// => ...

bufferToggle

  • Observable が発行する値を openings による通知があった時から closingSelector による通知があるまでバッファしておく
    • buffer() を先に知っておいた方が良い
Rx.Observable.interval(1000)
  .bufferToggle(Rx.Observable.interval(10000), v => Rx.Observable.interval(5000))
  .subscribe(res => console.log(res));

// => [ 9, 10, 11, 12, 13 ]
// => [ 19, 20, 21, 22, 23 ]
// => [ 29, 30, 31, 32, 33 ]
// => ...

bufferWhen

  • Observable が発行する値を closingSelector による通知があるまでバッファしてとく
    • buffer() を先に知っておいた方が良い
    • buffer() との違いは closingSelector の Observable に .take(3) とか付けても効き目がない?
Rx.Observable.interval(1000)
  .bufferWhen(() => Rx.Observable.interval(5000))
  .subscribe(res => console.log(res));

// => [ 0, 1, 2, 3 ]
// => [ 4, 5, 6, 7, 8 ]
// => [ 9, 10, 11, 12, 13 ]
// => ...
Rx.Observable.interval(1000)
  .bufferWhen(() => Rx.Observable.interval(5000).take(3))
  .subscribe(res => console.log(res));

// => [ 0, 1, 2, 3 ]
// => [ 4, 5, 6, 7, 8 ]
// => [ 9, 10, 11, 12, 13 ]
// => ...

concatMap

  • Observable が発行した値を project で指定した Observable に使うことができる
    • project が発行中のときは待つ
  • concat(), map() を先に知っておいた方が良い
Rx.Observable.interval(5000)
  .take(3)
  .concatMap(v => Rx.Observable.of(v * 10, v * 100, v * 1000))
  .subscribe(
    res => console.log(res),
    err => console.log('ここには来ない'),
    () => console.log('complete')
  );

// => 0
// => 0
// => 0
// => 10
// => 100
// => 1000
// => 20
// => 200
// => 2000
// => complete
Rx.Observable.interval(5000)
  .take(3)
  .concatMap(v => Rx.Observable.interval(1000).take(4))
  .subscribe(
    res => console.log(res),
    err => console.log('ここには来ない'),
    () => console.log('complete')
  );

// => 0
// => 1
// => 2
// => 3
// (1秒待つ)
// => 0
// => 1
// => 2
// => 3
// (1秒待つ)
// => 0
// => 1
// => 2
// => 3
// => complete
Rx.Observable.interval(5000)
  .take(3)
  .concatMap(v => Rx.Observable.interval(1000).take(10))
  .subscribe(
    res => console.log(res),
    err => console.log('ここには来ない'),
    () => console.log('complete')
  );

// => 0
// => 1
// => 2
// => 3
// => 4
// => 5
// => 6
// => 7
// => 8
// => 9
// => 0
// => 1
// => 2
// => 3
// => 4
// => 5
// => 6
// => 7
// => 8
// => 9
// => 0
// => 1
// => 2
// => 3
// => 4
// => 5
// => 6
// => 7
// => 8
// => 9
// => complete

concatMapTo

  • Observable が発行した値を observable として発行する
  • concat(), mapTo() を先に知っておいた方が良い
Rx.Observable.interval(5000)
  .take(3)
  .concatMapTo(Rx.Observable.of('A', 'B', 'C'))
  .subscribe(
    res => console.log(res),
    err => console.log('ここには来ない'),
    () => console.log('complete')
  );

// => A
// => B
// => C
// => A
// => B
// => C
// => A
// => B
// => C
// => complete

exhaustMap

  • concatMap() とだいたい同じ?
    • project が発行中の時に Observable が発行すると無視される
  • exhaust(), map() を先に知っておいた方が良い
// `concatMap()` と同じ動き
Rx.Observable.interval(5000)
  .take(3)
  .exhaustMap(v => Rx.Observable.of(v * 10, v * 100, v * 1000))
  .subscribe(
    res => console.log(res),
    err => console.log('ここには来ない'),
    () => console.log('complete')
  );

// => 0
// => 0
// => 0
// => 10
// => 100
// => 1000
// => 20
// => 200
// => 2000
// => complete
// `concatMap()` と同じ動き
Rx.Observable.interval(5000)
  .take(3)
  .exhaustMap(v => Rx.Observable.interval(1000).take(4))
  .subscribe(
    res => console.log(res),
    err => console.log('ここには来ない'),
    () => console.log('complete')
  );

// => 0
// => 1
// => 2
// => 3
// => 0
// => 1
// => 2
// => 3
// => 0
// => 1
// => 2
// => 3
// => complete
Rx.Observable.interval(5000)
  .take(3)
  .exhaustMap(v => Rx.Observable.interval(1000).take(10))
  .subscribe(
    res => console.log(res),
    err => console.log('ここには来ない'),
    () => console.log('complete')
  );

// => 0
// => 1
// => 2
// => 3
// => 4
// => 5
// => 6
// => 7
// => 8
// => 9
// => complete

expand

  • Observalue が発行する値を再帰的に処理する
    • Observale の出力は1つ目のみを使用?
Rx.Observable.of(1, 2, 3)
  .expand(v => Rx.Observable.of(2 * v))
  .take(5)
  .subscribe(
    res => console.log(res),
    err => console.log('ここには来ない'),
    () => console.log('complete')
  );

// => 1
// => 2
// => 4
// => 8
// => 16
// => complete

groupBy

  • Observable が発行する値をグループ化する
Rx.Observable.of(
  {id: 1, name: "A1"},
  {id: 1, name: "B1"},
  {id: 2, name: "B2"},
  {id: 1, name: "C1"},
  {id: 2, name: "C2"},
  {id: 3, name: "C3"},
  {id: 1, name: "D1"},
  {id: 2, name: "D2"},
  {id: 3, name: "D3"},
  {id: 4, name: "D4"}
).groupBy(v => v.id)
  .flatMap(v => v.reduce((acc, v2) => [...acc, v2], []))
  .subscribe(
    res => console.log(res),
    err => console.log('ここには来ない'),
    () => console.log('complete')
  );

// => [ { id: 1, name: 'A1' }, { id: 1, name: 'B1' }, { id: 1, name: 'C1' }, { id: 1, name: 'D1' } ]
// => [ { id: 2, name: 'B2' }, { id: 2, name: 'C2' }, { id: 2, name: 'D2' } ]
// => [ { id: 3, name: 'C3' }, { id: 3, name: 'D3' } ]
// => [ { id: 4, name: 'D4' } ]
// => complete

map

  • Observable の値を変換して発行する
    • Array.map() とだいたい同じ
Rx.Observable.range(1, 5)
  .map(v => v * 10)
  .subscribe(
    res => console.log(res),
    err => console.log('ここには来ない'),
    () => console.log('complete')
  );

// => 10
// => 20
// => 30
// => 40
// => 50
// => complete

mapTo

  • Observable の値を特定の値に変換して発行する
    • map() を先に知っておいた方が良い
Rx.Observable.range(1, 5)
  .mapTo("A")
  .subscribe(
    res => console.log(res),
    err => console.log('ここには来ない'),
    () => console.log('complete')
  );

// => A
// => A
// => A
// => A
// => A
// => complete

mergeMap

  • Observable の値を変換して、別の Observable と合成する
    • merge(), map() を先に知っておいた方が良い
Rx.Observable.range(1, 5)
  .mergeMap((v) => Rx.Observable.of(`${v}A`, `${v}B`, `${v}C`))
  .subscribe(
    res => console.log(res),
    err => console.log('ここには来ない'),
    () => console.log('complete')
  );

// => 1A
// => 1B
// => 1C
// => 2A
// => 2B
// => 2C
// => 3A
// => 3B
// => 3C
// => 4A
// => 4B
// => 4C
// => 5A
// => 5B
// => 5C
// => complete

mergeMapTo

  • Observable の値を特定の Observable に変換して発行する
    • merge(), mapTo() を先に知っておいた方が良い
Rx.Observable.range(1, 5)
  .mergeMapTo(Rx.Observable.of('A', 'B', 'C'))
  .subscribe(
    res => console.log(res),
    err => console.log('ここには来ない'),
    () => console.log('complete')
  );

// => A
// => B
// => C
// => A
// => B
// => C
// => A
// => B
// => C
// => A
// => B
// => C
// => A
// => B
// => C
// => complete

mergeMapScan

  • Observable の値を別の Observable に変換してアキュムレーターで蓄積する
    • mergeMap(), scan() を先に知っておいた方が良い
Rx.Observable.range(1, 5)
  .mergeScan((acc, v) => Rx.Observable.of(acc * v), 1)
  .subscribe(
    res => console.log(res),
    err => console.log('ここには来ない'),
    () => console.log('complete')
  );

// => 1
// => 2
// => 6
// => 24
// => 120
// => complete

pairwise

  • Observable の値のうち2つをペアにする
Rx.Observable.of("a", "b", "c", "d", "e")
  .pairwise()
  .subscribe(
    res => console.log(res),
    err => console.log('ここには来ない'),
    () => console.log('complete')
  );

// => [ 'a', 'b' ]
// => [ 'b', 'c' ]
// => [ 'c', 'd' ]
// => [ 'd', 'e' ]
// => complete

partition

  • Observable 自体を predicate の条件で分離する
Rx.Observable.range(1, 10)
  .partition(v => v % 2 === 0)
  .forEach(p => {
    p.subscribe(
      res => console.log(res),
      err => console.log('ここには来ない'),
      () => console.log('complete')
    );
  });

// => 2
// => 4
// => 6
// => 8
// => 10
// => complete
// => 1
// => 3
// => 5
// => 7
// => 9
// => complete

pluck

  • Observable の値のうち特定のキーのみを発行する
Rx.Observable.of(
  {name: 'AAAAA', age: 10},
  {name: 'BBBBB', age: 20},
  {name: 'CCCCC', age: 30},
).pluck('name')
 .subscribe(
    res => console.log(res),
    err => console.log('ここには来ない'),
    () => console.log('complete')
 );

// => AAAAA
// => BBBBB
// => CCCCC
// => complete

scan

  • Observable の値をアキュムレーターに蓄積しつつ毎回発行する
Rx.Observable.range(1, 10)
  .scan((acc, v) => acc + v, 0)
  .subscribe(
    res => console.log(res),
    err => console.log('ここには来ない'),
    () => console.log('complete')
  );

// => 1
// => 3
// => 6
// => 10
// => 15
// => 21
// => 28
// => 36
// => 45
// => 55
// => complete

switchMap

  • Observable が発行する度に project で指定された Observable が発行する
  • switch(). map() を先に知っておいた方が良い
Rx.Observable.interval(5000)
  .switchMap(v => Rx.Observable.of(`${v}A`))
  .subscribe(res => console.log(res));

// => 0A
// => 1A
// => 2A
// => 3A
// => ...
Rx.Observable.interval(5000)
  .switchMap(v => Rx.Observable.interval(1000))
  .subscribe(res => console.log(res));

// => 0
// => 1
// => 2
// => 3
// => 0
// => 1
// => 2
// => 3
// => 0
// => 1
// => 2
// => 3
// => ...

switchMapTo

  • Observable が発行する度に observable で指定された Observable が発行する
  • switch(), mapTo() を先に知っておいた方が良い
Rx.Observable.interval(5000)
  .switchMapTo(Rx.Observable.of('A', 'B', 'C'))
  .subscribe(res => console.log(res));

// => A
// => B
// => C
// => A
// => B
// => C
// => A
// => B
// => C
// => ...
Rx.Observable.interval(5000)
  .switchMapTo(Rx.Observable.interval(1000))
  .subscribe(res => console.log(res));

// => 0
// => 1
// => 2
// => 3
// => 0
// => 1
// => 2
// => 3
// => 0
// => 1
// => 2
// => 3
// => ...

window

  • windowBoundaries で指定された Observable が発行する度に本線が分岐する
    • あまり良くわかってない。。。
Rx.Observable.interval(100)
  .window(Rx.Observable.interval(1000))
  .map(w => w.take(2))
  .mergeAll()
  .subscribe(res => console.log(res));

// => 0
// => 1
// => 9
// => 10
// => 19
// => 20
// => 29
// => 30
// => ...

windowCount

  • windowSize 毎に本線が分岐する
    • window() を先に知っておいた方が良い
    • あまり良くわかってない。。。
Rx.Observable.interval(100)
  .windowCount(10)
  .map(w => w.take(2))
  .mergeAll()
  .subscribe(res => console.log(res));

// => 0
// => 1
// => 10
// => 11
// => ...

windowToggle

  • Observable が発行する値を openings 毎に本線が分岐し closingSelector されるまで発行される
    • window() を先に知っておいた方が良い
    • あまり良くわかってない。。。
Rx.Observable.interval(100)
  .windowToggle(Rx.Observable.interval(1000), v => Rx.Observable.interval(500))
  .mergeAll()
  .subscribe(res => console.log(res));

// => 9
// => 10
// => 11
// => 12
// => 13
// (500ms待機?)
// => 19
// => 20
// => 21
// => 22
// => 23
// (500ms待機?)
// => ...

windowWhen

  • closingSelector毎に本線が分岐する
    • window() を先に知っておいた方が良い
    • あまり良くわかってない。。。
    • window() との違いもわからない
Rx.Observable.interval(100)
  .windowWhen(() => Rx.Observable.interval(1000))
  .map(w => w.take(2))
  .mergeAll()
  .subscribe(res => console.log(res));

// => 0
// => 1
// => 9
// => 10
// => 19
// => 20
// => ...

reduce

  • Observable の値をアキュムレーターに蓄積し、最後の結果を発行する
    • scan() とだいたい同じ
Rx.Observable.range(1, 10)
  .reduce((acc, v) => acc + v, 0)
  .subscribe(
    res => console.log(res),
    err => console.log('ここには来ない'),
    () => console.log('complete')
  );

// => 55
// => complete

Multicasting Operators

Method Type Method Name
instance method multicast
instance method publish
instance method publishBehavior
instance method publishLast
instance method publishReplay
instance method share
instance method shareReplay

multicast

  • 単一のサブスクリプションを共有する Observable を返す?
    • あまり良くわかってない。。。
    • 実行ログを見ると do 1つに対して Multi(1), (2) の出力が出てる
const multicast = Rx.Observable.timer(0, 1000).take(5).map(v => `A${v}`)
  .do(
    res => console.log(`res(do): ${res}`),
    err => console.log('err(do): ここには来ない'),
    () => console.log('complete(do)'),
  )
  .multicast(() => new Rx.Subject());

multicast.subscribe(
  res => console.log(`Multi(1): ${res}`),
  err => console.log('Multi(1): ここには来ない'),
  () => console.log('Multi(1): complete'),
);
multicast.subscribe(
  res => console.log(`Multi(2): ${res}`),
  err => console.log('Multi(2): ここには来ない'),
  () => console.log('Multi(2): complete'),
);
multicast.connect();

// => res(do): A0
// => Multi(1): A0
// => Multi(2): A0
// => res(do): A1
// => Multi(1): A1
// => Multi(2): A1
// => res(do): A2
// => Multi(1): A2
// => Multi(2): A2
// => res(do): A3
// => Multi(1): A3
// => Multi(2): A3
// => res(do): A4
// => Multi(1): A4
// => Multi(2): A4
// => complete(do)
// => Multi(1): complete
// => Multi(2): complete

publish

  • 単一のサブスクリプションを共有する Observable を返す?
    • あまり良くわかってない。。。
    • multicast() との違いもわからない
    • 実行ログを見ると do 1つに対して Pub(1), (2) の出力が出てる
const publish = Rx.Observable.timer(0, 1000).take(5).map(v => `A${v}`)
  .do(
    res => console.log(`res(do): ${res}`),
    err => console.log('err(do): ここには来ない'),
    () => console.log('complete(do)'),
  )
  .publish();

publish.subscribe(
  res => console.log(`Pub(1): ${res}`),
  err => console.log('Pub(1): ここには来ない'),
  () => console.log('Pub(1): complete'),
);
publish.subscribe(
  res => console.log(`Pub(2): ${res}`),
  err => console.log('Pub(2): ここには来ない'),
  () => console.log('Pub(2): complete'),
);
publish.connect();

// => res(do): A0
// => Pub(1): A0
// => Pub(2): A0
// => res(do): A1
// => Pub(1): A1
// => Pub(2): A1
// => res(do): A2
// => Pub(1): A2
// => Pub(2): A2
// => res(do): A3
// => Pub(1): A3
// => Pub(2): A3
// => res(do): A4
// => Pub(1): A4
// => Pub(2): A4
// => complete(do)
// => Pub(1): complete
// => Pub(2): complete

publishBehavior

  • 先頭に値を追加する publish()
    • あまり良くわかってない。。。
    • publish() を先に知っておいた方が良い
    • 実行ログを見ると先頭に hogehoge が追加されている
const publish = Rx.Observable.timer(0, 1000).take(5).map(v => `A${v}`)
  .do(
    res => console.log(`res(do): ${res}`),
    err => console.log('err(do): ここには来ない'),
    () => console.log('complete(do)'),
  )
  .publishBehavior('hogehoge');

publish.subscribe(
  res => console.log(`Pub(1): ${res}`),
  err => console.log('Pub(1): ここには来ない'),
  () => console.log('Pub(1): complete'),
);
publish.subscribe(
  res => console.log(`Pub(2): ${res}`),
  err => console.log('Pub(2): ここには来ない'),
  () => console.log('Pub(2): complete'),
);
publish.connect();

// => Pub(1): hogehoge
// => Pub(2): hogehoge
// => res(do): A0
// => Pub(1): A0
// => Pub(2): A0
// => res(do): A1
// => Pub(1): A1
// => Pub(2): A1
// => res(do): A2
// => Pub(1): A2
// => Pub(2): A2
// => res(do): A3
// => Pub(1): A3
// => Pub(2): A3
// => res(do): A4
// => Pub(1): A4
// => Pub(2): A4
// => complete(do)
// => Pub(1): complete
// => Pub(2): complete

publishLast

  • 最後の値のみを発行する publish()
    • あまり良くわかってない。。。
    • publish() を先に知っておいた方が良い
const publish = Rx.Observable.timer(0, 1000).take(5).map(v => `A${v}`)
  .do(
    res => console.log(`res(do): ${res}`),
    err => console.log('err(do): ここには来ない'),
    () => console.log('complete(do)'),
  )
  .publishLast();

publish.subscribe(
  res => console.log(`Pub(1): ${res}`),
  err => console.log('Pub(1): ここには来ない'),
  () => console.log('Pub(1): complete'),
);
publish.subscribe(
  res => console.log(`Pub(2): ${res}`),
  err => console.log('Pub(2): ここには来ない'),
  () => console.log('Pub(2): complete'),
);
publish.connect();

// => res(do): A0
// => res(do): A1
// => res(do): A2
// => res(do): A3
// => res(do): A4
// => complete(do)
// => Pub(1): A4
// => Pub(2): A4
// => Pub(1): complete
// => Pub(2): complete

publishReplay

  • あまりよくわからず
  • うまく動かせなかった。。。
  • publish() は先に知っておいた方が良い

share

  • 単一のサブスクリプションを共有する Observable を返す?
    • multicast()publish() の違いはわからなかった
    • ConnectableObservable ではないので connect() が不要ということだけわかった
const share = Rx.Observable.timer(0, 1000).take(5).map(v => `A${v}`)
  .do(
    res => console.log(`res(do): ${res}`),
    err => console.log('err(do): ここには来ない'),
    () => console.log('complete(do)'),
  )
  .share();

share.subscribe(
  res => console.log(`Share(1): ${res}`),
  err => console.log('Share(1): ここには来ない'),
  () => console.log('Share(1): complete'),
);
share.subscribe(
  res => console.log(`Share(2): ${res}`),
  err => console.log('Share(2): ここには来ない'),
  () => console.log('Share(2): complete'),
);

// => res(do): A0
// => Share(1): A0
// => Share(2): A0
// => res(do): A1
// => Share(1): A1
// => Share(2): A1
// => res(do): A2
// => Share(1): A2
// => Share(2): A2
// => res(do): A3
// => Share(1): A3
// => Share(2): A3
// => res(do): A4
// => Share(1): A4
// => Share(2): A4
// => complete(do)
// => Share(1): complete
// => Share(2): complete

shareReplay

  • あまりよくわからず。。。
  • うまく動かせなかった。。。
  • publishReplay() とほぼ変わらないと思うのだが。。。

さいごに

Observable.subscribe()Observable.create() しか使ったことなかったのに色々理解することができたが。。。
何故こんなものを書いてしまったのか!!

その他、Observable を触っていて Scheduler やら Notification やら Subject やら登場してきましたが、全く理解できていません!

88
77
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
88
77