LoginSignup
7
3

More than 5 years have passed since last update.

はじめに

主に(?) Angular の裏側で使われるRxJSはPromiseなどと同様に非同期処理を解決するためにObservableパターンを採用したライブラリで、実際複雑な非同期処理を簡潔に書くことができる。

ただObservableパターンに不慣れだったり、oepratorの多さと複雑さに気圧されて、しばしば書いたコードが魔法にしか見えないことがある(結果は分かってもなぜそうなるのかさっぱり分からない・・・)。

いい加減裏側で何が行われているのかしっかりと理解したかったので、以下のようなシンプルなコードを元に、RxJSのソースコードを追いかけていく。

of(1, 2, 3).pipe(
  map(value => value * 2)
).subscribe(value => console.log(value));

// output => 2, 4, 6

バージョンと注意

この記事では 2019/04/07 時点の RxJS 6.4.0 を参照している。

また記事中では理解しやすいように、今回動かすコードとは関係の薄い部分はかなり省いて記載する。

登場人物

いきなり処理の流れを追う前に、上のコードで登場する型と代表的なinterfaceをまとめる。

of(1, 2, 3)

ofの定義と実装は以下の通り、引数の違いはあれど最終的にObservable型を返す関数となっている。

internal/observable/of.ts

export function of<T, T2, T3>(a: T, b: T2, c: T3, scheduler?: SchedulerLike): Observable<T | T2 | T3>;

export function of<T>(...args: Array<T | SchedulerLike>): Observable<T> {
  // ~~省略~~
  return fromArray(args as T[], scheduler);
}

今回(of(1, 2, 3))は引数を複数渡しており、schedulerを渡していないためなんやかんやでfromArray(args as T[], scheduler)が呼ばれる。

fromArrayでは以下のようにObservableが生成されている。

internal/observable/fromArray.ts
internal/util/subscribeToArray.ts

export function fromArray<T>(input: ArrayLike<T>, scheduler?: SchedulerLike) {
  return new Observable<T>(subscribeToArray(input));
}

export const subscribeToArray = <T>(array: ArrayLike<T>) => (subscriber: Subscriber<T>) => {
  for (let i = 0, len = array.length; i < len && !subscriber.closed; i++) {
    subscriber.next(array[i]);
  }
  if (!subscriber.closed) {
    subscriber.complete();
  }
};

Observablesubscribe()に反応するSubscribable<T>を実装している。Observableのconstructorを見ると

constructor(subscribe?: (this: Observable<T>, subscriber: Subscriber<T>) => TeardownLogic) {
  if (subscribe) {
    this._subscribe = subscribe;
  }
}

なので、subscribeToArrayの結果がthis._subscribeに渡される。このObservablesubscribe()が呼ばれると、中身の配列(今回は [1, 2, 3] )に従って subscriber.next(array[i]); が呼び出されそうである。

pipe()

pipeの定義は以下の通り。

internal/Observable.ts#L333
internal/util/pipe.ts#L23

pipe(...operations: OperatorFunction<any, any>[]): Observable<any> {
  if (operations.length === 0) {
    return this as any;
  }

  return pipeFromArray(operations)(this);
}

/** @internal */
export function pipeFromArray<T, R>(fns: Array<UnaryFunction<T, R>>): UnaryFunction<T, R> {
  if (!fns) {
    return noop as UnaryFunction<any, any>;
  }

  if (fns.length === 1) {
    return fns[0];
  }

  return function piped(input: T): R {
    return fns.reduce((prev: any, fn: UnaryFunction<T, R>) => fn(prev), input as any);
  };
}

面白いのはpipeでreturnするときにpipeFromArrayの戻り値に自分自身(Observable)を渡して実行していること。

pipeFromArrayでは、受け取ったoperator(今回はmap)をreduce前の実行結果を入力にしながら順番に実行する関数を返している。

pipe自身の戻り値もObservableなので、pipe周りの挙動をまとめると

  • 自分自身(Observable)を入力として、受け取ったoperatorをreduceで順番に実行して結果(Observable)を返す
  • operatorとは Observableを受け取ってObservableを返す関数

と言うことが分かった。

map(value => value * 2)

map operatorの実装は以下の通り。

internal/operators/map.ts

export function map<T, R>(project: (value: T, index: number) => R, thisArg?: any): OperatorFunction<T, R> {
  return function mapOperation(source: Observable<T>): Observable<R> {
    return source.lift(new MapOperator(project, thisArg));
  };
}

一つ前で分かった「operatorとは Observableを受け取ってObservableを返す関数」と言うのがfunction mapOperation(source: Observable<T>): Observable<R>に現れている。

ただそれだけでは不十分で、本質的(?)な実装はsource.lift()の中に見える。

internal/Observable.ts#L66

lift<R>(operator: Operator<T, R>): Observable<R> {
  const observable = new Observable<R>();
  observable.source = this;
  observable.operator = operator;
  return observable;
}

上記のようにsourceoperatorが設定されたObservableを新しく生成して返している。

ここのoperatorがどんなものかmapの実装から見ると

export class MapOperator<T, R> implements Operator<T, R> {
  constructor(private project: (value: T, index: number) => R, private thisArg: any) {
  }

  call(subscriber: Subscriber<R>, source: any): any {
    return source.subscribe(new MapSubscriber(subscriber, this.project, this.thisArg));
  }
}

class MapSubscriber<T, R> extends Subscriber<T> {
  count: number = 0;
  private thisArg: any;

  constructor(destination: Subscriber<R>,
              private project: (value: T, index: number) => R,
              thisArg: any) {
    super(destination);
    this.thisArg = thisArg || this;
  }

  protected _next(value: T) {
    let result: any;
    try {
      result = this.project.call(this.thisArg, value, this.count++);
    } catch (err) {
      this.destination.error(err);
      return;
    }
    this.destination.next(result);
  }
}

少しややこしいが、call()の中でsubscribe()を呼んでいること覚えておく。ただしこの時はまだObservableが新規に生成されただけで、まだcall()は呼ばれてないことに注意する。

MapSubscriberについてはこの後見ていく。

subscribe(value => console.log(value))

いよいよsubscribe()について見ていく。実装は以下のようになっている。
(本来はsinkaddするコードがあるが、unsubscribe()の時に親をさかのぼるためのものなので、今は省略している。)

internal/Observable.ts#L199

subscribe(observerOrNext?: PartialObserver<T> | ((value: T) => void),
            error?: (error: any) => void,
            complete?: () => void): Subscription {

  const { operator } = this;
  const sink = toSubscriber(observerOrNext, error, complete);

  if (operator) {
    operator.call(sink, this.source);
  } else {
    this._subscribe(sink);
  }

  return sink;
}

ここから今までのソースコードを順番にさかのぼっていくことができる。

最初にsubscribe()されるObservableはmap operatorの戻り値なのでoperatorを持っている。なのでoperator.call(sink, this.source);が実行される。

上で書いた通りcall()の中身はsource.subscribe()なので、sourceであるof(1, 2, 3)の戻り値のObservablesubscribe()される。
こちらのObservableoperatorを持っていないので、今度はthis._subscribe()が呼ばれる。

上で見た通りこれは

export const subscribeToArray = <T>(array: ArrayLike<T>) => (subscriber: Subscriber<T>) => {
  for (let i = 0, len = array.length; i < len && !subscriber.closed; i++) {
    subscriber.next(array[i]);
  }
  if (!subscriber.closed) {
    subscriber.complete();
  }
};

である。さらにここでいうsubscriber: Subscriber<T>とはmapで生成されるMapSubscriber<T, R>である。

MapSubscriber<T, R>next()を呼び出しているので、以下のコードが実行される。

  protected _next(value: T) {
    let result: any;
    try {
      result = this.project.call(this.thisArg, value, this.count++);
    } catch (err) {
      this.destination.error(err);
      return;
    }
    this.destination.next(result);
  }

ここで言うthis.projectとはvalue => value * 2である。その結果(2, 4, 6)がthis.destination.next()に渡される。
このthis.destinationとはconst sink = toSubscriber(observerOrNext, error, complete);sinkである(ややこしい・・・)。
sinknext()とはつまりvalue => console.log(value)であるので、無事にoutput => 2, 4, 6が得られるということだった。

終わりに

単純なObservable#ofと、基本的なoperatorであるmapを題材に結果が得られるまでの流れをソースコードで追ってみた。

魔法にしか見えなかったコードも、裏側で何が起きているか実感できたような気がする。

今回でObservableの生成からpipeline処理、subscribeまで基礎となる流れを理解できたため、次はAngularのHttpClientなどでどのようにObservableが生成されているかや、各operatorがどのように実装されているか個別に追っていきたいと思う。

間違っている所等あれば是非ご指摘ください。

7
3
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
7
3