はじめに
主に(?) Angular の裏側で使われるRxJSはPromise
などと同様に非同期処理を解決するためにObservableパターンを採用したライブラリで、実際複雑な非同期処理を簡潔に書くことができる。
ただObservableパターンに不慣れだったり、oepratorの多さと複雑さに気圧されて、しばしば書いたコードが魔法にしか見えないことがある(結果は分かってもなぜそうなるのかさっぱり分からない・・・)。
いい加減裏側で何が行われているのかしっかりと理解したかったので、以下のようなシンプルなコードを元に、RxJSのソースコードを追いかけていく。
of(1, 2, 3).pipe(
map(value => value * 2)
).subscribe(value => console.log(value));
// output => 2, 4, 6
バージョンと注意
この記事では 2019/04/07 時点の RxJS 6.4.0 を参照している。
また記事中では理解しやすいように、今回動かすコードとは関係の薄い部分はかなり省いて記載する。
登場人物
いきなり処理の流れを追う前に、上のコードで登場する型と代表的なinterfaceをまとめる。
-
Observable<T>
(internal/Observable.ts)- subscribe()
- pipe()
- lift()
-
Observer<T>
(internal/types.ts#L84)- next()
- error()
- complete()
-
Subscriber<T>
(internal/Subscriber.ts)- next()
- error()
- complete()
- unsubscribe()
- Operator (internal/Operator.ts)
call(subscriber: Subscriber<R>, source: any): TeardownLogic;
of(1, 2, 3)
of
の定義と実装は以下の通り、引数の違いはあれど最終的にObservable
型を返す関数となっている。
export function of<T, T2, T3>(a: T, b: T2, c: T3, scheduler?: SchedulerLike): Observable<T | T2 | T3>;
export function of<T>(...args: Array<T | SchedulerLike>): Observable<T> {
// ~~省略~~
return fromArray(args as T[], scheduler);
}
今回(of(1, 2, 3)
)は引数を複数渡しており、scheduler
を渡していないためなんやかんやでfromArray(args as T[], scheduler)
が呼ばれる。
fromArray
では以下のようにObservable
が生成されている。
internal/observable/fromArray.ts
internal/util/subscribeToArray.ts
export function fromArray<T>(input: ArrayLike<T>, scheduler?: SchedulerLike) {
return new Observable<T>(subscribeToArray(input));
}
export const subscribeToArray = <T>(array: ArrayLike<T>) => (subscriber: Subscriber<T>) => {
for (let i = 0, len = array.length; i < len && !subscriber.closed; i++) {
subscriber.next(array[i]);
}
if (!subscriber.closed) {
subscriber.complete();
}
};
Observable
はsubscribe()
に反応するSubscribable<T>
を実装している。Observable
のconstructorを見ると
constructor(subscribe?: (this: Observable<T>, subscriber: Subscriber<T>) => TeardownLogic) {
if (subscribe) {
this._subscribe = subscribe;
}
}
なので、subscribeToArray
の結果がthis._subscribe
に渡される。このObservable
のsubscribe()
が呼ばれると、中身の配列(今回は [1, 2, 3]
)に従って subscriber.next(array[i]);
が呼び出されそうである。
pipe()
pipe
の定義は以下の通り。
internal/Observable.ts#L333
internal/util/pipe.ts#L23
pipe(...operations: OperatorFunction<any, any>[]): Observable<any> {
if (operations.length === 0) {
return this as any;
}
return pipeFromArray(operations)(this);
}
/** @internal */
export function pipeFromArray<T, R>(fns: Array<UnaryFunction<T, R>>): UnaryFunction<T, R> {
if (!fns) {
return noop as UnaryFunction<any, any>;
}
if (fns.length === 1) {
return fns[0];
}
return function piped(input: T): R {
return fns.reduce((prev: any, fn: UnaryFunction<T, R>) => fn(prev), input as any);
};
}
面白いのはpipe
でreturnするときにpipeFromArray
の戻り値に自分自身(Observable
)を渡して実行していること。
pipeFromArray
では、受け取ったoperator(今回はmap
)をreduce
で前の実行結果を入力にしながら順番に実行する関数を返している。
pipe
自身の戻り値もObservable
なので、pipe
周りの挙動をまとめると
- 自分自身(
Observable
)を入力として、受け取ったoperatorをreduce
で順番に実行して結果(Observable
)を返す - operatorとは
Observable
を受け取ってObservable
を返す関数
と言うことが分かった。
map(value => value * 2)
map operatorの実装は以下の通り。
export function map<T, R>(project: (value: T, index: number) => R, thisArg?: any): OperatorFunction<T, R> {
return function mapOperation(source: Observable<T>): Observable<R> {
return source.lift(new MapOperator(project, thisArg));
};
}
一つ前で分かった「operatorとは Observable
を受け取ってObservable
を返す関数」と言うのがfunction mapOperation(source: Observable<T>): Observable<R>
に現れている。
ただそれだけでは不十分で、本質的(?)な実装はsource.lift()
の中に見える。
lift<R>(operator: Operator<T, R>): Observable<R> {
const observable = new Observable<R>();
observable.source = this;
observable.operator = operator;
return observable;
}
上記のようにsource
とoperator
が設定されたObservable
を新しく生成して返している。
ここのoperator
がどんなものかmap
の実装から見ると
export class MapOperator<T, R> implements Operator<T, R> {
constructor(private project: (value: T, index: number) => R, private thisArg: any) {
}
call(subscriber: Subscriber<R>, source: any): any {
return source.subscribe(new MapSubscriber(subscriber, this.project, this.thisArg));
}
}
class MapSubscriber<T, R> extends Subscriber<T> {
count: number = 0;
private thisArg: any;
constructor(destination: Subscriber<R>,
private project: (value: T, index: number) => R,
thisArg: any) {
super(destination);
this.thisArg = thisArg || this;
}
protected _next(value: T) {
let result: any;
try {
result = this.project.call(this.thisArg, value, this.count++);
} catch (err) {
this.destination.error(err);
return;
}
this.destination.next(result);
}
}
少しややこしいが、call()
の中でsubscribe()
を呼んでいること覚えておく。ただしこの時はまだObservable
が新規に生成されただけで、まだcall()
は呼ばれてないことに注意する。
MapSubscriber
についてはこの後見ていく。
subscribe(value => console.log(value))
いよいよsubscribe()
について見ていく。実装は以下のようになっている。
(本来はsink
にadd
するコードがあるが、unsubscribe()
の時に親をさかのぼるためのものなので、今は省略している。)
subscribe(observerOrNext?: PartialObserver<T> | ((value: T) => void),
error?: (error: any) => void,
complete?: () => void): Subscription {
const { operator } = this;
const sink = toSubscriber(observerOrNext, error, complete);
if (operator) {
operator.call(sink, this.source);
} else {
this._subscribe(sink);
}
return sink;
}
ここから今までのソースコードを順番にさかのぼっていくことができる。
最初にsubscribe()
されるObservable
はmap operatorの戻り値なのでoperator
を持っている。なのでoperator.call(sink, this.source);
が実行される。
上で書いた通りcall()
の中身はsource.subscribe()
なので、source
であるof(1, 2, 3)
の戻り値のObservable
がsubscribe()
される。
こちらのObservable
はoperator
を持っていないので、今度はthis._subscribe()
が呼ばれる。
上で見た通りこれは
export const subscribeToArray = <T>(array: ArrayLike<T>) => (subscriber: Subscriber<T>) => {
for (let i = 0, len = array.length; i < len && !subscriber.closed; i++) {
subscriber.next(array[i]);
}
if (!subscriber.closed) {
subscriber.complete();
}
};
である。さらにここでいうsubscriber: Subscriber<T>
とはmapで生成されるMapSubscriber<T, R>
である。
MapSubscriber<T, R>
のnext()
を呼び出しているので、以下のコードが実行される。
protected _next(value: T) {
let result: any;
try {
result = this.project.call(this.thisArg, value, this.count++);
} catch (err) {
this.destination.error(err);
return;
}
this.destination.next(result);
}
ここで言うthis.project
とはvalue => value * 2
である。その結果(2, 4, 6
)がthis.destination.next()
に渡される。
このthis.destination
とはconst sink = toSubscriber(observerOrNext, error, complete);
のsink
である(ややこしい・・・)。
sink
のnext()
とはつまりvalue => console.log(value)
であるので、無事にoutput => 2, 4, 6
が得られるということだった。
終わりに
単純なObservable#of
と、基本的なoperatorであるmap
を題材に結果が得られるまでの流れをソースコードで追ってみた。
魔法にしか見えなかったコードも、裏側で何が起きているか実感できたような気がする。
今回でObservableの生成からpipeline処理、subscribeまで基礎となる流れを理解できたため、次はAngularのHttpClient
などでどのようにObservableが生成されているかや、各operatorがどのように実装されているか個別に追っていきたいと思う。
間違っている所等あれば是非ご指摘ください。