search
LoginSignup
35

More than 5 years have passed since last update.

posted at

updated at

RxJS の Observable で Promise を扱う

この記事は bouzuya's RxJS Advent Calendar 2015 23 日目かつ RxJS Advent Calendar 2015 の 23 日目です。

はじめに

今日は Promise と Obervable とを相互変換する方法について書きます。

Observable と Promise

Promise は ES2015 で標準化され、各種 Web browser で実装されて、 JavaScript の非同期処理の代表的なものになっています。

RxJS の Observable は Promise からの変換や Promise への変換に対応しています。 Promise の扱いについて見ていきます。

Promise to Observable

Observable.fromPromise

Promise から Observable に変換します。

import { Observable } from 'rx';

Observable
  .fromPromise(Promise.resolve(42))
  .subscribe(
    value => console.log(`onNext: ${value}`),
    error => console.log(`onError: ${error}`),
    () => console.log('onCompleted')
  );

// onNext: 42
// onCompleted

onNext のあと onCompleted が流れてきます。

import { Observable } from 'rx';

const promise = new Promise((resolve) => {
  setTimeout(() => resolve(100), 1000);
});

Observable
  .fromPromise(promise)
  .subscribe(
    value => console.log(`onNext: ${value}`),
    error => console.log(`onError: ${error}`),
    () => console.log('onCompleted')
  );

// onNext: 100
// onCompleted

setTimeout をはさんでも動きます。

import { Observable } from 'rx';

const promise = new Promise((_, reject) => {
  setTimeout(() => reject(new Error('ERROR!')), 1000);
});

Observable
  .fromPromise(promise)
  .subscribe(
    value => console.log(`onNext: ${value}`),
    error => console.log(`onError: ${error}`),
    () => console.log('onCompleted')
  );

// onError: Error: ERROR!

Error も期待通りに動きます。

Observable.prototype.flatMap / Observable.prototype.mergeAll

flatMapmergeAll などの Observable を flat にする動作は Observable だけでなく Promise にも対応しています。Observable ではなく Promise が返された場合には、それを Observable.fromPromise で変換して Observable として処理してくれます。

import { Observable } from 'rx';

Observable
  .from([1, 2, 3])
  // .map(x => Observable.fromPromise(Promise.resolve(x * 10)))
  .map(x => Promise.resolve(x * 10)) 
  .mergeAll()
  .subscribe(
    value => console.log(`onNext: ${value}`),
    error => console.log(`onError: ${error}`),
    () => console.log('onCompleted')
  );

// onNext: 10
// onNext: 20
// onNext: 30
// onCompleted

URL を fetch すると Promise を返すことが多いので、この挙動はよく使います。

import fetch from 'node-fetch';
import { Observable } from 'rx';

Observable
  .of('http://example.com')
  // fetch(url) returns Promise<Response>
  .flatMap(url => fetch(url))
  // response.text() returns Promise<String>
  .flatMap(response => response.text())
  .subscribe(
    value => console.log(`onNext: ${value}`),
    error => console.log(`onError: ${error}`),
    () => console.log('onCompleted')
  );

// onNext: <!doctype html> ...
// onCompleted

Observable.prototype.toPromise

Observable から Promise に変換します。ObservableonCompleted の際に最後の onNext の値が流れてきます。

import { Observable } from 'rx';

Observable
  .from([1, 2, 3])
  .toPromise()
  .then(value => console.log(`fulfilled: ${value}`));
// fulfilled: 3

おわりに

RxJS の Observable は Promise からの変換や Promise への変換を確認しました。

こういう単純な情報は意外と書かれていない気がしたので、改めて書いてみました。

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
What you can do with signing up
35