LoginSignup
47
33

More than 5 years have passed since last update.

RxJSのOperatorsのコード まとめ

Last updated at Posted at 2016-11-05

RxJSを触り始めたので、勉強がてらOperators関連のコードを書いていきます\(^o^)/

導入

npm install rx

または、

yarn add rx

生成(Creating Observables)

Create

Observableを生成する。

import Rx from 'rx';

Rx.Observable
  .create(observer => {
      observer.onNext(9);
      observer.onCompleted();
      return () => console.log('disposed'); 
  })
  .subscribe(
    x => console.log(`Next: ${x}`),
    err => console.log(`Error: ${err}`),
    () => console.log('Completed')
  );
Next: 9
Completed
disposed

Empty

OnCompleteだけ飛ばす。

Rx.Observable.empty()
  .subscribe(
    x => console.log(`Next: ${x}`),
    err => console.log(`Error: ${err}`),
    () => console.log('Completed')
  );
Completed

Never

何も呼ばれない空のObservable。

Rx.Observable.never()
  .subscribe(
    x => console.log(`Next: ${x}`),
    err => console.log(`Error: ${err}`),
    () => console.log('Completed')
  );

Throw

onErrorを流す。

Rx.Observable.return(9)
  .selectMany(Rx.Observable.throw(new Error('error!')))
  .subscribe(
    x => console.log(`Next: ${x}`),
    err => console.log(`Error: ${err}`),
    () => console.log('Completed')
  );
Error: Error: error!

From

IterableなオブジェクトからObservableを生成する。

Rx.Observable
  .from(['apple', 'orange', 'peach'])
  .subscribe(
    x => console.log(`Next: ${x}`),
    err => console.log(`Error: ${err}`),
    () => console.log('Completed')
  );
Next: apple
Next: orange
Next: peach
Completed

Interval

一定時間ごとに値を流す。

Rx.Observable
  .interval(1000)
  .timeInterval()
  .take(3)
  .subscribe(
    x => console.log(`Next: ${x.value}`),
    err => console.log(`Error: ${err}`),
    () => console.log('Completed')
  );
Next: 0
Next: 1
Next: 2
Completed

Just

値を直接指定して、Observableを生成する。

Rx.Observable
  .just(88)
  .subscribe(
    x => console.log(`Next: ${x}`),
    err => console.log(`Error: ${err}`),
    () => console.log('Completed')
  );
Next: 88
Completed

Range

始まりと終わりを指定し、その間の数字のObservableを生成する。

Rx.Observable
  .range(0, 6)
  .subscribe(
    x => console.log(`Next: ${x}`),
    err => console.log(`Error: ${err}`),
    () => console.log('Completed')
  );
Next: 0
Next: 1
Next: 2
Next: 3
Next: 4
Next: 5
Completed

Repeat

繰り返す。

Rx.Observable
  .repeat('Hello world', 3)
  .subscribe(
    x => console.log(`Next: ${x}`),
    err => console.log(`Error: ${err}`),
    () => console.log('Completed')
  );
Next: Hello world
Next: Hello world
Next: Hello world
Completed

Start

指定したスケジューラーのコンテキストで、非同期に関数を実行するっぽい??。

Rx.Observable
  .start(() => 'start', Rx.Scheduler.timeout)
  .subscribe(
    x => console.log(`Next: ${x}`),
    err => console.log(`Error: ${err}`),
    () => console.log('Completed')
  );
Next: start
Completed

Timer

タイマー。

Rx.Observable
  .timer(100, 1000)
  .timeInterval()
  .pluck('interval')
  .take(3)
  .subscribe(
    x => console.log(`Next: ${x}`),
    err => console.log(`Error: ${err}`),
    () => console.log('Completed')
  );
Next: 222
Next: 879
Next: 999
Completed

変換(Transforming Observables)

Buffer

指定した数で、Observableのデータをまとめる。

Rx.Observable
  .timer(0, 100)
  .buffer(() => Rx.Observable.timer(200))
  .take(3)
  .subscribe(
    x => console.log(`Next: ${x}`),
    err => console.log(`Error: ${err}`),
    () => console.log('Completed')
  );
Next: 
Next: 0,1
Next: 2,3
Completed

FlatMap

流れてくるアイテムを変換し、合成する。また、Observableを生成する。

Rx.Observable
  .just('apple,orange,peach')
  .flatMap(x => Rx.Observable.from(x.split(',')))
  .subscribe(
    x => console.log(`Next: ${x}`),
    err => console.log(`Error: ${err}`),
    () => console.log('Completed')
  );
Next: apple
Next: orange
Next: peach
Completed

GroupBy

グルーピング。

Rx.Observable
  .range(0, 10)
  .groupBy(x => x % 2 == 0)
  .flatMap(group => group.toArray()) 
  .subscribe(
    x => console.log(`Next: ${x}`),
    err => console.log(`Error: ${err}`),
    () => console.log('Completed')
  );
Next: 0,2,4,6,8
Next: 1,3,5,7,9
Completed

Map

データの整形。

Rx.Observable
  .from(['田中', '佐藤', '山田'])
  .map(x => `${x}さん`)
  .subscribe(
    x => console.log(`Next: ${x}`),
    err => console.log(`Error: ${err}`),
    () => console.log('Completed')
  );
Next: 田中さん
Next: 佐藤さん
Next: 山田さん
Completed

Scan

順番に足していく。

Rx.Observable
  .from(['Hello', 'World', '!!'])
  .scan((acc, x) => `${acc} ${x}`)
  .subscribe(
    x => console.log(`Next: ${x}`),
    err => console.log(`Error: ${err}`),
    () => console.log('Completed')
  );
Next: Hello
Next: Hello World
Next: Hello World !!
Completed

Window

ストリームを束ねて、新たなストリームを生成する???

Rx.Observable
  .timer(1, 50)
  .window(() => Rx.Observable.timer(100))
  .take(3)
  .flatMap(x => x.toArray())
  .subscribe(
    x => console.log(`Next: ${x}`),
    err => console.log(`Error: ${err}`),
    () => console.log('Completed')
  );
Next: 
Next: 0,1
Next: 2,3
Completed

フィルタリング(Filtering Observables)

Debounce

指定した時間データが流れてこなければ、直前のデータでイベントが発火する。

Rx.Observable
  .interval(1000)
  .timeInterval()
  .take(3)
  .debounce(1000)
  .subscribe(
    x => console.log(`Next: ${x}`),
    err => console.log(`Error: ${err}`),
    () => console.log('Completed')
  );
Next: 2
Completed

Distinct

重複なく流す。

Rx.Observable
  .from(['apple', 'orange', 'apple', 'apple', 'りんご'])
  .distinct()
  .subscribe(
    x => console.log(`Next: ${x}`),
    err => console.log(`Error: ${err}`),
    () => console.log('Completed')
  );
Next: apple
Next: orange
Next: りんご
Completed

Elementat

指定した番号の要素だけ流す。

Rx.Observable
  .from(['apple', 'orange', 'りんご'])
  .elementAt(1)
  .subscribe(
    x => console.log(`Next: ${x}`),
    err => console.log(`Error: ${err}`),
    () => console.log('Completed')
  );
Next: orange
Completed

Filter

フィルター。

Rx.Observable
  .range(0, 10)
  .filter(x => x % 2 == 0)
  .subscribe(
    x => console.log(`Next: ${x}`),
    err => console.log(`Error: ${err}`),
    () => console.log('Completed')
  );
Next: 0
Next: 2
Next: 4
Next: 6
Next: 8
Completed

First

最初のやつを流す。

Rx.Observable
  .range(0, 10)
  .first()
  .subscribe(
    x => console.log(`Next: ${x}`),
    err => console.log(`Error: ${err}`),
    () => console.log('Completed')
  );
Next: 0
Completed

IgnoreElements

すべての要素を無視する。そして、onNextが呼ばれず、onError, OnCompletedのみ呼ばれる。

Rx.Observable
  .range(0, 10)
  .ignoreElements()
  .subscribe(
    x => console.log(`onNext: ${x}`),
    err => console.log(`onError: ${err}`),
    () => console.log('onCompleted')
  );
Completed

Last

最後のやつだけ流す。

Rx.Observable
  .range(0, 10)
  .last()
  .subscribe(
    x => console.log(`Next: ${x}`),
    err => console.log(`Error: ${err}`),
    () => console.log('Completed')
  );
Next: 9
Completed

Sample

指定した間隔(一定間隔)で値を流す。

Rx.Observable
  .interval(500)
  .sample(2000)
  .take(3)
  .subscribe(
    x => console.log(`Next: ${x}`),
    err => console.log(`Error: ${err}`),
    () => console.log('Completed')
  );
Next: 3
Next: 7
Next: 11
Completed

Skip

スキップ。

Rx.Observable
  .range(0, 10)
  .skip(8)
  .subscribe(
    x => console.log(`Next: ${x}`),
    err => console.log(`Error: ${err}`),
    () => console.log('Completed')
  );
Next: 8
Next: 9
Completed

SkipLast

後ろから省く。

Rx.Observable
  .range(0, 10)
  .skipLast(3)
  .subscribe(
    x => console.log(`Next: ${x}`),
    err => console.log(`Error: ${err}`),
    () => console.log('Completed')
  );
Next: 0
Next: 1
Next: 2
Next: 3
Next: 4
Next: 5
Next: 6
Completed

Take

指定した値だけ取り出す。

Rx.Observable
  .range(0, 10)
  .take(2)
  .subscribe(
    x => console.log(`Next: ${x}`),
    err => console.log(`Error: ${err}`),
    () => console.log('Completed')
  );
Next: 0
Next: 1
Completed

TakeLast

後ろから指定した値だけ取り出す。

Rx.Observable
  .range(0, 10)
  .takeLast(2)
  .subscribe(
    x => console.log(`Next: ${x}`),
    err => console.log(`Error: ${err}`),
    () => console.log('Completed')
  );
Next: 8
Next: 9
Completed

結合(Combining Observables)

And/Then/When

複数のObservableを制御をしながら、結合してひとつのObservableを作るっぽい??

Rx.Observable
  .when(
    Rx.Observable.timer(200).and(Rx.Observable.timer(300)).thenDo((x, y) => 'first'),
    Rx.Observable.timer(400).and(Rx.Observable.timer(500)).thenDo((x, y) => 'second'))
  .subscribe(
    x => console.log(`Next: ${x}`),
    err => console.log(`Error: ${err}`),
    () => console.log('Completed')
  );
Next: first
Next: second
Completed

CombineLatest

簡単にいうと、Observable同士を合成するやつ。

var source1 = Rx.Observable
  .interval(100)
  .map(x => `First: ${x}`);

var source2 = Rx.Observable
  .interval(150)
  .map(x => `Second: ${x}`);

source1
  .combineLatest(
    source2,
    (s1, s2) => `${s1} ${s2}`)
  .take(5)
  .subscribe(
    x => console.log(`Next: ${x}`),
    err => console.log(`Error: ${err}`),
    () => console.log('Completed')
  );
Next: First: 0 Second: 0
Next: First: 1 Second: 0
Next: First: 2 Second: 0
Next: First: 2 Second: 1
Next: First: 3 Second: 1
Completed

Join

2つのObservableをjoinする感じ??

var source1 = Rx.Observable
  .interval(100)
  .map(x => `First: ${x}`);

var source2 = Rx.Observable
  .interval(100)
  .map(x => `Second: ${x}`);

source1
  .join(
    source2,
    () => Rx.Observable.timer(0), 
    () => Rx.Observable.timer(0), 
    (x, y) => `${x}_${y}`)
  .take(5)
  .subscribe(
    x => console.log(`Next: ${x}`),
    err => console.log(`Error: ${err}`),
    () => console.log('Completed')
  );
Next: First: 0_Second: 0
Next: First: 1_Second: 1
Next: First: 2_Second: 2
Next: First: 3_Second: 3
Next: First: 4_Second: 4
Completed

Merge

マージです。

const source1 = Rx.Observable
  .range(0, 10)
  .filter(x => x % 2 == 0);

const source2 = Rx.Observable
  .range(0, 10)
  .filter(x => x % 2 == 1);

Rx.Observable
  .merge(
    source1,
    source2)
  .subscribe(
    x => console.log(`Next: ${x}`),
    err => console.log(`Error: ${err}`),
    () => console.log('Completed')
  );
Next: 0
Next: 1
Next: 2
Next: 3
Next: 4
Next: 5
Next: 6
Next: 7
Next: 8
Next: 9
Completed

StartWith

与えた値を先に流してから、その後にObservableの値を流す。


Rx.Observable
  .from(['', ''])
  .startWith('')
  .subscribe(
    x => console.log(`Next: ${x}`),
    err => console.log(`Error: ${err}`),
    () => console.log('Completed')
  );
Next: り
Next: ん
Next: ご
Completed

Switch

次のObservableへと、次々スイッチする。

Rx.Observable
  .from([
    Rx.Observable.from(['まぐろ', 'あじ', 'ひらめ']),
    Rx.Observable.from(['りんご', 'みかん', 'おれんじ']),
    Rx.Observable.from(['いぬ', 'ねこ', 'しか']),
  ])
  .switch()
  .subscribe(
    x => console.log(`Next: ${x}`),
    err => console.log(`Error: ${err}`),
    () => console.log('Completed')
  );
Next: まぐろ
Next: りんご
Next: いぬ
Next: ねこ
Next: しか
Completed

Zip

2つ以上のObservableから流れてきた値を合成して新しい一つの値を作る。

const source1 = Rx.Observable.range(0, 3);
const source2 = Rx.Observable.range(5, 8);

Rx.Observable
  .zip(
    source1,
    source2,
    (x, y) => `${x} + ${y} = ${x + y}` 
  )
  .subscribe(
    x => console.log(`Next: ${x}`),
    err => console.log(`Error: ${err}`),
    () => console.log('Completed')
  );
Next: 0 + 5 = 5
Next: 1 + 6 = 7
Next: 2 + 7 = 9
Completed

エラーハンドリング(Error Handling Operators)

Catch

onErrorの時に代わりに流すObservableなどを指定する。

Rx.Observable
  .from([0, 1, 2])
  .map(x => {
    if(x > 0) return x;
    throw new Error('error');
  })
  .catch(e => {
    console.log(`catcht: ${e}`);
    return Rx.Observable.just(2);
  })
  .subscribe(
    x => console.log(`Next: ${x}`),
    err => console.log(`Error: ${err}`),
    () => console.log('Completed')
  );
catcht: Error: error
Next: 2
Completed

Retry

例外が発生しても指定の回数リトライする。

Rx.Observable
  .range(0, 5)
  .map(x => {
    if(x > 3) {
      throw new Error('error: x > 3');
    }
    return x;
  })
  .retry(2)
  .subscribe(
    x => console.log(`Next: ${x}`),
    err => console.log(`Error: ${err}`),
    () => console.log('Completed')
  );
Next: 0
Next: 1
Next: 2
Next: 3
Next: 0
Next: 1
Next: 2
Next: 3
Error: Error: error: x > 3

Observable Utility Operators

Delay

delay。

Rx.Observable
  .range(0, 5)
  .delay(1000)
  .subscribe(
    x => console.log(`Next: ${x}`),
    err => console.log(`Error: ${err}`),
    () => console.log('Completed')
  );
Next: 0
Next: 1
Next: 2
Next: 3
Next: 4
Completed

残りは、また今度書きます...

47
33
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
47
33