ReactiveExtensions
RxJS
Rx
RxJSDay 23

RxJS の Observable で Promise を扱う

More than 3 years have passed since last update.

この記事は bouzuya's RxJS Advent Calendar 2015 23 日目かつ RxJS Advent Calendar 2015 の 23 日目です。


はじめに

今日は Promise と Obervable とを相互変換する方法について書きます。


Observable と Promise

Promise は ES2015 で標準化され、各種 Web browser で実装されて、 JavaScript の非同期処理の代表的なものになっています。

RxJS の Observable は Promise からの変換や Promise への変換に対応しています。 Promise の扱いについて見ていきます。


Promise to Observable


Observable.fromPromise

Promise から Observable に変換します。

import { Observable } from 'rx';

Observable
.fromPromise(Promise.resolve(42))
.subscribe(
value => console.log(`onNext: ${value}`),
error => console.log(`onError: ${error}`),
() => console.log('onCompleted')
);

// onNext: 42
// onCompleted

onNext のあと onCompleted が流れてきます。

import { Observable } from 'rx';

const promise = new Promise((resolve) => {
setTimeout(() => resolve(100), 1000);
});

Observable
.fromPromise(promise)
.subscribe(
value => console.log(`onNext: ${value}`),
error => console.log(`onError: ${error}`),
() => console.log('onCompleted')
);

// onNext: 100
// onCompleted

setTimeout をはさんでも動きます。

import { Observable } from 'rx';

const promise = new Promise((_, reject) => {
setTimeout(() => reject(new Error('ERROR!')), 1000);
});

Observable
.fromPromise(promise)
.subscribe(
value => console.log(`onNext: ${value}`),
error => console.log(`onError: ${error}`),
() => console.log('onCompleted')
);

// onError: Error: ERROR!

Error も期待通りに動きます。


Observable.prototype.flatMap / Observable.prototype.mergeAll

flatMapmergeAll などの Observable を flat にする動作は Observable だけでなく Promise にも対応しています。Observable ではなく Promise が返された場合には、それを Observable.fromPromise で変換して Observable として処理してくれます。

import { Observable } from 'rx';

Observable
.from([1, 2, 3])
// .map(x => Observable.fromPromise(Promise.resolve(x * 10)))
.map(x => Promise.resolve(x * 10))
.mergeAll()
.subscribe(
value => console.log(`onNext: ${value}`),
error => console.log(`onError: ${error}`),
() => console.log('onCompleted')
);

// onNext: 10
// onNext: 20
// onNext: 30
// onCompleted

URL を fetch すると Promise を返すことが多いので、この挙動はよく使います。

import fetch from 'node-fetch';

import { Observable } from 'rx';

Observable
.of('http://example.com')
// fetch(url) returns Promise<Response>
.flatMap(url => fetch(url))
// response.text() returns Promise<String>
.flatMap(response => response.text())
.subscribe(
value => console.log(`onNext: ${value}`),
error => console.log(`onError: ${error}`),
() => console.log('onCompleted')
);

// onNext: <!doctype html> ...
// onCompleted


Observable.prototype.toPromise

Observable から Promise に変換します。ObservableonCompleted の際に最後の onNext の値が流れてきます。

import { Observable } from 'rx';

Observable
.from([1, 2, 3])
.toPromise()
.then(value => console.log(`fulfilled: ${value}`));
// fulfilled: 3


おわりに

RxJS の Observable は Promise からの変換や Promise への変換を確認しました。

こういう単純な情報は意外と書かれていない気がしたので、改めて書いてみました。