Help us understand the problem. What is going on with this article?

RxJS の Observable で Promise を扱う

More than 3 years have passed since last update.

この記事は bouzuya's RxJS Advent Calendar 2015 23 日目かつ RxJS Advent Calendar 2015 の 23 日目です。

はじめに

今日は Promise と Obervable とを相互変換する方法について書きます。

Observable と Promise

Promise は ES2015 で標準化され、各種 Web browser で実装されて、 JavaScript の非同期処理の代表的なものになっています。

RxJS の Observable は Promise からの変換や Promise への変換に対応しています。 Promise の扱いについて見ていきます。

Promise to Observable

Observable.fromPromise

Promise から Observable に変換します。

import { Observable } from 'rx';

Observable
  .fromPromise(Promise.resolve(42))
  .subscribe(
    value => console.log(`onNext: ${value}`),
    error => console.log(`onError: ${error}`),
    () => console.log('onCompleted')
  );

// onNext: 42
// onCompleted

onNext のあと onCompleted が流れてきます。

import { Observable } from 'rx';

const promise = new Promise((resolve) => {
  setTimeout(() => resolve(100), 1000);
});

Observable
  .fromPromise(promise)
  .subscribe(
    value => console.log(`onNext: ${value}`),
    error => console.log(`onError: ${error}`),
    () => console.log('onCompleted')
  );

// onNext: 100
// onCompleted

setTimeout をはさんでも動きます。

import { Observable } from 'rx';

const promise = new Promise((_, reject) => {
  setTimeout(() => reject(new Error('ERROR!')), 1000);
});

Observable
  .fromPromise(promise)
  .subscribe(
    value => console.log(`onNext: ${value}`),
    error => console.log(`onError: ${error}`),
    () => console.log('onCompleted')
  );

// onError: Error: ERROR!

Error も期待通りに動きます。

Observable.prototype.flatMap / Observable.prototype.mergeAll

flatMapmergeAll などの Observable を flat にする動作は Observable だけでなく Promise にも対応しています。Observable ではなく Promise が返された場合には、それを Observable.fromPromise で変換して Observable として処理してくれます。

import { Observable } from 'rx';

Observable
  .from([1, 2, 3])
  // .map(x => Observable.fromPromise(Promise.resolve(x * 10)))
  .map(x => Promise.resolve(x * 10)) 
  .mergeAll()
  .subscribe(
    value => console.log(`onNext: ${value}`),
    error => console.log(`onError: ${error}`),
    () => console.log('onCompleted')
  );

// onNext: 10
// onNext: 20
// onNext: 30
// onCompleted

URL を fetch すると Promise を返すことが多いので、この挙動はよく使います。

import fetch from 'node-fetch';
import { Observable } from 'rx';

Observable
  .of('http://example.com')
  // fetch(url) returns Promise<Response>
  .flatMap(url => fetch(url))
  // response.text() returns Promise<String>
  .flatMap(response => response.text())
  .subscribe(
    value => console.log(`onNext: ${value}`),
    error => console.log(`onError: ${error}`),
    () => console.log('onCompleted')
  );

// onNext: <!doctype html> ...
// onCompleted

Observable.prototype.toPromise

Observable から Promise に変換します。ObservableonCompleted の際に最後の onNext の値が流れてきます。

import { Observable } from 'rx';

Observable
  .from([1, 2, 3])
  .toPromise()
  .then(value => console.log(`fulfilled: ${value}`));
// fulfilled: 3

おわりに

RxJS の Observable は Promise からの変換や Promise への変換を確認しました。

こういう単純な情報は意外と書かれていない気がしたので、改めて書いてみました。

bouzuya
ぼく、ぼうずや。なさけはひとのためならず。たのしいはせいぎ。
http://bouzuya.net/
Why not register and get more from Qiita?
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away