32
32

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

逆引きRxJSの使い方パターン

Posted at

RxJSを覚えようとして調べたところ、mapやfilterなどRxJSが提供している機能の解説はすぐに見つかりましたが、実際にウェブページを作るときにありがちなパターンをRxJSでどのように実現するかという例があまり見つからないので苦労しました。
RxJSの勉強のためにちょっとしたサンプルアプリを作り、その中で見えてきたありがちなパターンを実現するRxJSのコードをまとめてみました。

この記事のRxJSは5系です。4から5で結構メソッド名などが変わっているようなので以前からRxJSを知っている方はご注意を。
また、動作確認はnode v8.1.3で行いました。

ダブルクリック

RxJSの入門として紹介されることが多いサンプルですが、まずはここから。
色々なところで紹介されている入門。

double_click.js
const Rx = require('rxjs/Rx')
const Emitter = require('event-emitter')

const emitter = new Emitter()

// 本来はDOMのクリックイベントなどを監視するが、CLIで動かすためにevent-emitterを使う
const click$ = Rx.Observable.fromEvent(emitter, 'click')
const doubleClick$ = click$
  .bufferTime(100) // 100ms以内のイベントを配列にまとめる
  .filter(arr => arr.length > 1) // 配列長が1より大きい = ダブルクリック
  .map(arr => arr.shift() ) // 最初のイベントだけ使いたいので先頭だけ取り出す

click$.subscribe(() => {
  console.log('single click')
})
doubleClick$.subscribe(() => {
  console.log('double click')
})

setTimeout(() => emitter.emit('click'),  50)
setTimeout(() => emitter.emit('click'), 100)
setTimeout(() => emitter.emit('click'), 150)
setTimeout(() => emitter.emit('click'), 300)

// 実行結果
// single click
// single click
// single click
// double click
// single click

bufferTimeの挙動を理解することがカギ。

連続するイベントを間引きたい

debounceTimeを使います。

debounce.js
const Rx = require('rxjs/Rx')
const _ = require('underscore')
const Emitter = require('event-emitter')

const emitter = new Emitter()

// Rxjs
const debounce$ = Rx.Observable.fromEvent(emitter, 'click')
  .debounceTime(200)
debounce$.subscribe(() => {
  console.log('observable click')
})

// Promise + underscorejs
new Promise(resolve => {
  emitter.on('click', _.debounce(resolve, 200))
}).then(() => {
  console.log('promise click')
})

setTimeout(() => emitter.emit('click'),  50)
setTimeout(() => emitter.emit('click'), 100)

// 実行結果
//
// promise click
// observable click

underscorejsなどのdebounceと同じ感覚で使うことができます。
比較のために、debounceした後に処理を続けるということを想定したPromise版も作ってみました。
Promiseに慣れていればそんなに難しくないのですが、慣れるとRxJSの方がresolveとか考えなくてよいのでシンプルだと思います。

Promiseを返したい

まさにそのためのtoPromiseというものがあります。

promise.js
const Rx = require('rxjs/Rx')

// resolved promise
const promise = Rx.Observable.of(1)
  .delay(300)
  .toPromise()

// rejected promise
const rejectedPromise = Rx.Observable
  .throw(new Error('rejected promise'))
  .toPromise()

promise.then(() => { console.log('promise then') })

rejectedPromise
  .then(() => { console.log('this message will be never show') })
  .catch((error) => { console.log(error) })

// 実行結果
//
// Error: rejected promise
//     at Object.<anonymous> (/***************/promise.js:14:10)
//     at Module._compile (module.js:569:30)
//     at Object.Module._extensions..js (module.js:580:10)
//     at Module.load (module.js:503:32)
//     at tryModuleLoad (module.js:466:12)
//     at Function.Module._load (module.js:458:3)
//     at Function.Module.runMain (module.js:605:10)
//     at startup (bootstrap_node.js:158:16)
//     at bootstrap_node.js:575:3
//
// promise then

throwされている場合は自動的にrejectのPromiseになります。

値の変換とかはしなくていいので何か処理だけしたい

Observableの中でconsole.logを仕込んだり、ajaxの前に通信中の表示を画面に出したい場合など。
Observableに流れる値は変えないで処理だけを行うdoが使えます。

do.js
const Rx = require('rxjs/Rx')

let sum = 0
const observable = Rx.Observable.from([1,2,3])
  .map((x) => x * 10)
  .do((x) => {
    sum += x
    console.log("sum: " + sum)
  })

observable.subscribe((x) => {
  console.log("subscribe: " + x)
})

// 実行結果
//
// sum: 10
// subscribe: 10
// sum: 30
// subscribe: 20
// sum: 60
// subscribe: 30

Observableに流れるもの以外に作用させるコードを書くためにdoは最適ですが、
個人的にはそもそも副作用がある処理をRxJSで書くことが思想にそぐわないと思いますので上のようなコードはむしろアンチパターンかなと。

複数の場所でsubscribeしたときに処理が重複して行われないようにしたい

突然ですが質問です。
あるObservableに対して2箇所でsubscribeした場合、Observableの処理は何回行われるでしょうか?
自分は最初1回だけだと思っていたのですが、実際はこのような挙動になります。

cold.js
const Rx = require('rxjs/Rx')

const timer$ = Rx.Observable
  .interval(1000)
  .do((x) => { console.log("timer$: " + x)})

timer$.subscribe((x) => {
  console.log("Subscriber A: " + x)
})

timer$.subscribe((x) => {
  console.log("Subscriber B: " + x)
})

// 実行結果
// timer$: 0
// Subscriber A: 0
// timer$: 0
// Subscriber B: 0
// timer$: 1
// Subscriber A: 1
// timer$: 1
// Subscriber B: 1
// ...

なんとObservableの処理は2回行われます。
その理由は、Observableはsubscribeのそれぞれに対して複製が割り当てられるからです。
つまりこんなイメージです。

Observable ---- subscribe1 ---->
Observable ---- subscribe2 ---->

Webアプリでよくありがちな非同期処理として、ajaxが終わったイベントを監視して別々の場所で別々のことを行いたい(通知とかModel書き換えたり)ことがあるのでこのようなフローになっていて欲しい場面があります。

                  subscribe1 ----->
                /
Observable -----
                \
                  subscribe2 ----->

このような振る舞いをするObservableはhotなObservableと呼び、前述の複製が作られる方をcoldと呼ぶようです。

そしてcoldからhotに変換するにはshareを使うのが簡単です。

hot.js
const Rx = require('rxjs/Rx')

const timer$ = Rx.Observable
  .interval(1000)
  .do((x) => { console.log("timer$: " + x)})
  .share()

timer$.subscribe((x) => {
  console.log("Subscriber A: " + x)
})

timer$.subscribe((x) => {
  console.log("Subscriber B: " + x)
})

// 実行結果
//
// timer$: 0
// Subscriber A: 0
// Subscriber B: 0
// timer$: 1
// Subscriber A: 1
// Subscriber B: 1

このcoldとhotの概念はちょっと複雑ですので、興味がある方は他に詳しい解説を探してみると良いでしょう。
自分は以下のサイトのお世話になりました。

ajaxしたい

Promiseを返してくれるajaxライブラリであれば、fromPromiseを使ってそのままRxJSの世界に持ってこれます。
以下のコードではajaxをするのためにaxiosという外部ライブラリを使いました。

ajax.js
const Rx = require('rxjs/Rx')
const axios = require('axios')

// 元々はPromiseを返す
// axios.get('http://weather.livedoor.com/forecast/webservice/json/v1?city=130010')
//   .then((response) => {console.log(response.data)})
//   .catch((error) => console.log(error))

const ajax$ = Rx.Observable.fromPromise(
    // 東京の天気
    axios.get('http://weather.livedoor.com/forecast/webservice/json/v1?city=130010')
  )
  .map((response) => response.data)

ajax$.subscribe((data) => {
  console.log(data.title)
  console.log(data.description.text)
}, (error) => {
  console.log(error)
})

// 実行結果
//
// 東京都 東京 の天気
// ...(その日の天気のテキストが表示される)

複数のObservableをマージしたい

mergeを使うと複数のObservableをマージすることができます。
Aの非同期処理かBの非同期処理のどちらかの処理が終わった後に一定時間経ったら何かをさせる、というありがちなパターンをmergeとdebounceTimeを使うことで簡単に実現できます。

merge.js
const Rx = require('rxjs/Rx')
const Emitter = require('event-emitter')

const emitter = new Emitter()

const event1$ = Rx.Observable.fromEvent(emitter, 'event1')
const event2$ = Rx.Observable.fromEvent(emitter, 'event2')
const mergedEvent$ = Rx.Observable.merge(event1$, event2$)
  .debounceTime(500)

mergedEvent$.subscribe((x) => {
  console.log(x)
  // 実際は通知をGUIで出したり、ajaxでサーバーとデータの同期をしたりなど
  // notify()
  // axios.post('...')
})

emitter.emit('event1', 1)
emitter.emit('event2', 10)

// 実行結果
// (500ms待つ)
//
// 10

Observableの途中でajaxを挟みたい

mapで変換してからajax通信する、みたいなパターンです。
mergeMapはObservable(もしくはPromiseなど)を受け取り、mapで値を変換するように流れるObservableを変えることができます。

mergeMap1.js
const Rx = require('rxjs/Rx')

function fakeAjax(x) {
  return new Promise((resolve, reject) => {
    setTimeout(() => {
      // 常に成功する場合
      // resolve(x)

      // エラーを返すことがある場合
      if (x === 10) resolve(x)
      return reject(new Error)
    }, 100)
  })
}

// 500ms間隔で0,1,2,3,4までを流す
const observable$ = Rx.Observable.interval(500).take(4)
  .map((x) => x * 10)
  .mergeMap((x) => {
    return fakeAjax(x)
  } )
  // ここで以下のようにcatchしたとしても一度エラーになるとそのObservableには流れなくなってしまう
  // .catch((error) => Rx.Observable.of('error'))

observable$.subscribe(
  (x) => { console.log(x) },
  (error) => { console.log(error)}
)

// 実行結果 常に成功する場合
// 0
// 10
// 20
// 30

// 実行結果 エラーを返すことがある場合
// Error
//     at Timeout.setTimeout [as _onTimeout] (/***/mergeMap.js:11:21)
//     at ontimeout (timers.js:488:11)
//     at tryOnTimeout (timers.js:323:5)
//     at Timer.listOnTimeout (timers.js:283:5)

コードは非常に簡潔なのですが、このままでは問題があります。
それはmergeMapで変換するObservable(もしくはPromise)がエラーになった場合、その後に元のObservableは処理をしなくなってしまうということです。

ajaxが何らかの理由で失敗した場合、その後何もできなくなってしまうのでは非常に困ります。
これを解決しようとするとこのようなコードになります。

mergeMap.js
const Rx = require('rxjs/Rx')

function fakeAjax(x) {
  return new Promise((resolve, reject) => {
    setTimeout(() => {
      // エラーを返すことがある場合
      if (x === 10) resolve(x)
      return reject(new Error)
    }, 100)
  })
}

// 500ms間隔で0,1,2,3,4までを流す
const observable$ = Rx.Observable.interval(500).take(4)
  .map((x) => x * 10)
  .mergeMap((x) => {
    return fakeAjax(x)
      .catch((error) => error)
  } )

observable$.subscribe(
  (x) => {
    if (x instanceof Error) {
      console.log(x)
    }
    else {
      console.log('success: ' + x)
    }
  }
)

// 実行結果
// Error
//     at Timeout.setTimeout [as _onTimeout] (/***/mergeMap2.js:8:21)
//     ...省略
// success: 10
// Error
//     at Timeout.setTimeout [as _onTimeout] (/***/mergeMap2.js:8:21)
//     ...省略
// Error
//     at Timeout.setTimeout [as _onTimeout] (/***/mergeMap2.js:8:21)
//     ...省略

要は大本のObservableがエラーにならなければよいので、mergeMapの中でエラーハンドリングをしてやります。
ただし、この方法ではsubscribeでエラーのハンドリングはできなくなってしまいます。
そのため、上のコードではErrorオブジェクトかどうかを判定して擬似的にエラーハンドリングを実現しています。

今のところこの方法しか思いつかなかったのですが、subscribeの書き方がハックっぽいのでもっと良い書き方があればぜひ教えてほしいです・・・。

ちなみにmergeMapはRxJS4まではflatMapというメソッド名だったようですが、RxJS5からはmergeMapに改名されたようです。ドキュメントからもflatMapという名前を探しても見つからないので注意。
(ドキュメントにも書いていないですが、実はmergeMapのエイリアスとしてflatMapというメソッド名はまだ残っているので一応使うことは可能なようです)

サンプルコード

ここまで紹介したコードはgithubで公開しています。
1つ上の階層でnpm iして依存モジュールをインストールすれば実行することができます。

最後に

RxJSはObservableの概念に慣れるまでが大変ですが、ある程度慣れてきたら一周回って公式のドキュメントが一番分かりやすいことに気が付きました。
特に図が非常に分かりやすいので、オペレーターの挙動がよくわからない場合はググって使い方を探すよりも公式ドキュメントを見た方が早く理解できると思います。
http://reactivex.io/rxjs/

32
32
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
32
32

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?