Help us understand the problem. What is going on with this article?

HTTP/2のNode.js実装node-http2を読む

More than 5 years have passed since last update.

この記事は、http2 Advent Calendar 2015 の21日目の記事です。

はじめに

HTTP/2のNode.jsでの実装node-http2を読んだので、ポイントだと思うところについて大まかに書きます。

  • node-http2ってどんな感じなの
  • HTTP/2はどういう風に実装できるの
  • Node.jsのStream APIの良い使用例ないかな

みたいなことを知りたい人の手助けになればと思います。
扱うnode-http2のバージョンは3.2です。

Stream API

node-http2ではNode.jsの主要なAPIであるStream APIがよく使われています。
全く知らない人のために概要だけまとめました

HTTP/2の重要な概念としてStreamがありますがそれとStream APIは別のものです。
今回は単にStreamと書いた場合はHTTP/2としてのStreamやその実装のStreamクラス・インスタンスを指し、Stream API、ReadableStream、Readableなどと書いた場合はNode.jsのStream APIを指すことにします。

node-http2

流れとして重要だと思った箇所をザックリ説明します。載せてるコードは流れをわかりやすくするためにメソッドや変数を展開したり説明に不要な部分は省略したりしたので実際のものとは少し異なります。
主にサーバ側の処理などを例にして説明します。

階層

index.js
http.js
protocol/
  index.js
  endpoint.js
  connection.js
  stream.js
  compressor.js
  flow.js
  framer.js

indexは主にモジュール管理のためのファイルなので特に大事な実装はありません。
protocol/内がHTTP/2の実装で、http.jsがこれを利用した汎用的なAPIという感じです。
主にhttp, endpoint, connection, stream, flowについて説明します。
詳細は省きますが、compressor, framerはそれぞれ圧縮・非圧縮とシリアライズ・デシリアライズの処理をします。

ファイルごとの説明

1 http

1.1 APIとしての使い方

ユーザ(node-http2を利用する開発者)向けのAPIの部分で、Server, Agent(クライアント), IncomingRequest, OutgoingResponseなどのクラスやrequest(options,callback)などのメソッドが公開されています。使い方としては、例えばサーバ側なら次のようになります。(TLSでなくTCPのパターンを例にします)

server = http2.createServer({
    plain: true //TLSではなくTCPを指定
  }, function onRequest(request, response) {
    if(request.url === "/first.js"){
      var push = response.push('/second.js');
      push.writeHead(200);
      fs.createReadStream(path.join(__dirname, '/second.js')).pipe(push);
      fs.createReadStream(path.join(__dirname, '/first.js')).pipe(response);
    }
  });
});

要求が来たときonRequestにWritableStream型のresponseが渡されてきます。
「first.jsが要求されたということはsecond.jsも必要かもしれない」というのがわかる場合は上のようにpush('/second.js')でpushのStreamを作り、データを書き込んで送信すればいいわけです。

1.2 Serverの実装

Serverクラスのコンストラクタ内で次のような処理があります。

if (options.plain) {
  this._mode = 'plain';

  //TCPのサーバを立ち上げる. TCPコネクションが確立された時 _start()が呼ばれる
  this._server = net.createServer(function _start(socket) {
    var endpoint = new Endpoint(this._log, 'SERVER', this._settings);

    endpoint.pipe(socket).pipe(endpoint); //(1)

    var self = this;
    endpoint.on('stream', function _onStream(stream) {  //(2)
      var response = new OutgoingResponse(stream);
      var request = new IncomingRequest(stream);
      request.socket = socket;
      request.once('ready', self.emit.bind(self, 'request', request, response));
    });

    this.emit('connection', socket, endpoint);
  };);
}

(1)のendpoint.pipe(socket).pipe(endpoint)socketendpoint(後述)のストリームをつなぎ合わせます。
endpoint, socketは共にDuplex(ReadableでWritable)です。Readable.pipe(Writable)は受け取った引数をそのまま返すので、二つ目のpipeはsocket.pipe(endpoint)ということになります。
endpointから流れてくるデータはsocketに渡して送信し、socketが受信したデータはendpointに流すということです。

(2)の'stream'イベントは新しいStream(後述)ができたときに発行されます。このStreamを元に作ったrequestresponseが、ユーザ側のhttp2.createServer(function onRequest(req, res){...})に渡されます。response:OutgoingResponseはWritable、request:IncomingRequestはReadableで、これらは基本的にはコンストラクタで渡されたstreamにそのままデータの読み書きをします。例えばresponse.write(data)を呼べば特定の条件を除きthis.stream.write(data)が実行されます。

2 endpoint.js

EndpointはDuplexで、TCP, TLS層とHTTP層とのつなぎ目になります。また、HTTP/2コネクションの初期設定の確立のためにしなければならないやり取り(コネクションプリフェイス'PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n'の送受信など)も担当しており、仕様通りこれが完了する前にデータを受け取った場合などはPROTOCOL_ERRORを出します。

圧縮、フロー制御、StreamIDの管理などは各モジュールが担当しているので、初期化メソッド内で次のようにそれぞれの処理を担うDuplexたちを作りつなぎ合わせます。

  this._serializer   = new Serializer(this._log);
  this._deserializer = new Deserializer(this._log);
  this._compressor   = new Compressor(this._log, compressorRole);
  this._decompressor = new Decompressor(this._log, decompressorRole);
  this._connection   = new Connection(this._log, firstStreamId, settings);

  this._deserializer.pipe(this._decompressor).pipe(this._connection)

  this._connection.pipe(this._compressor).pipe(this._serializer)
2.1 Writableとしての役割

TCP・TLSソケットから受け取ったデータを デシリアライズ->非圧縮->フロー制御 などをして(a)新しいストリームでのフレームであれば'stream'イベントを発行する(b)既存のストリームのフレームあればそのストリームにデータを送ります。

TCP層から来たデータを次のようにTransformである_deserializerに書き込むことで、上記のようにpipeで指定した通り_deserializer->_decompressor->_connection(後述)とデータが流れていきます。

Endpoint.prototype._write = function _write(chunk, encoding, done) {
  this._deserializer.write(chunk, encoding, done);
};
2.2 Readableとしての役割

相手に送信したいデータを優先度管理->フロー制御->圧縮->シリアライズをしてTCP, TLSのソケットに送ります。
次のように_serializerからデータを読み込んでpushをするので、ユーザがresponse.write(data)で渡したデータなどもstreamから_connection->_compressor->_serializer->socketとデータが渡るようになっています。

Endpoint.prototype._read = function _read() {
  this._readableState.sync = true;
  var moreNeeded = noread, chunk;
  while (moreNeeded && (chunk = this._serializer.read())) {
    moreNeeded = this.push(chunk);
  }
  if (moreNeeded === noread) {
    this._serializer.once('readable', this._read.bind(this));
  }
  this._readableState.sync = false;
};

3 flow.js

フロー制御をします。Connectionの親クラスで、StreamはFlowのインスタンスをプロパティupstreamとして持ちます。

ReadableStreamは内部にキューを持ちデータを管理しますが、Flowではこれをoutput queueと呼びwindowサイズを見て送信することが決まったデータのキューとして扱います。また、自身のプロパティでもflow control queueとして別にthis._queueを持ち、windowサイズが空くのを待っていたりoutput queueにまだ入り切っていないフレームを保持します。
例えばデータの送信時に_readなどの別メソッドから呼ばれるpush()_push()は次の通りです。

Flow.prototype.push = function push(frame) {
  var moreNeeded = null;
  if (this._queue.length === 0) {
    moreNeeded = this._push(frame); // output queueにプッシュ
  }

  if (moreNeeded === null) {
    this._queue.push(frame); // flow control queueにプッシュ
  }

  return moreNeeded;
};

// フレームの中身全部をoutput queueにプッシュできなかったらnullを返す
Flow.prototype._push = function _push(frame) {
  var data = frame && (frame.type === 'DATA') && frame.data;

  // (DATAフレーム以外 || windowサイズに入りきる)場合は_parentPushでoutput queueにプッシュ
  if (!data || (data.length <= this._window)) {
    return this._parentPush(frame);
  } else if (this._window <= 0) {
    return null;
  } else {

    frame.data = data.slice(this._window);//windowサイズからはみ出た分をframe.dataに残す
    this._parentPush({
      type: 'DATA',
      flags: {},
      stream: frame.stream,
      data: data.slice(0, this._window)//windowサイズの分だけ切り取って送る
    });
    return null;
  }
};

_parentPush()で送信するときや相手から'WINDOW_UPDATE'を受け取った時に_windowの増減をします。

また、_read(),_write()が呼ばれたときに仮想関数の_send(),_receive()を呼ぶので、子クラスであるConnectionは_read(), _write()ではなく_send(), _receive()を実装します。

4 connection.js

Connectionはサーバとクライアント間で1つだけ存在し、複数のストリームを管理します。streamの管理や優先度のアルゴリズムを実装しています。

Streamを二つの配列(map)で保持します。複数のストリームが同一の優先度の場合もあるので、_streamPrioritiesはpriorityに対してストリームの配列を持ちます。

  this._streamIds = []; // (id -> stream) 受信用のストリーム
  this._streamPriorities = []; // (priority -> [stream]) 送信用のストリーム

新しくリソースを送信するとき・クライアントからのリクエストの最初のフレームが来たとき・PUSH_PROMISEのフレームを送るときなど、Streamを作成して各mapに追加し、_allocateId()でidをセットします。
仕様通り、IDが0のストリーム_streamIds[0]SETTINGSフレームを送受信するための特別なストリームとして作成されます。

後述する通り、各streamはプロパティにFlowのインスタンスupstreamを持っており、Connectionからstreamのデータを読み書きする場合はstream.upstream.write()のようにupstreamを介して行います。(upstreamstream_send(),_receive()処理を委譲している)

例えば送信するデータを要求されるときに呼ばれる_send()では保持するストリームに対し優先度順にread()してフレームを読み込み、自身(Flow)のoutput queueとflow control queueがいっぱいになるまでプッシュします。長いですが次の通りです。

Connection.prototype._send = function _send(immediate) {
    ...
priority_loop:
  // 優先度順にストリームを処理していく
  for (var priority in this._streamPriorities) {
    var bucket = this._streamPriorities[priority];
    var nextBucket = [];

    while (bucket.length > 0) {
      for (var index = 0; index < bucket.length; index++) {
        var stream = bucket[index];
        var frame = stream.upstream.read((this._window > 0) ? this._window : -1);

        if (!frame) {
          continue;
        } else if (frame.count_change > this._streamSlotsFree) {
          //frame.count_changeはこのフレームの送受信によって増減するストリームの数(-1か0か1)
          //IDLE->OPENにするフレームなら+1だし, HALF_CLOSED->CLOSEDにするフレームなら-1
          //_streamSlotsFreeは(SETTINGS_MAX_CONCURRENT_STREAMS - 使用中のストリーム数)
          stream.upstream.unshift(frame); //このフレームを処理するとSETTINGS_MAX_CONCURRENT_STREAMSを超えてしまうのでstreamに戻してスキップする
          continue;
        }

        nextBucket.push(stream);

        if (frame.stream === undefined) {
          frame.stream = stream.id || this._allocateId(stream);
        }

        if (frame.type === 'PUSH_PROMISE') {
          this._allocatePriority(frame.promised_stream);
          frame.promised_stream = this._allocateId(frame.promised_stream);
        }

        var moreNeeded = this.push(frame);//output queueかflow control queueにプッシュ
        this._changeStreamCount(frame.count_change);

        if (moreNeeded === false) {
          break priority_loop;
        }
      }

      bucket = nextBucket;
      nextBucket = [];
    }
  }

  // フレームを送信できなかったらwindow updateなどqueueが空くタイミングを待つ
  if (moreNeeded === undefined) {
    this.once('wakeup', this._send.bind(this));
  }
};

stream.js

1組のリクエストとレスポンスの交換に使われるストリームです。idle, closedなどの状態管理の処理やpromise(),headers()などユーザ側から呼ばれるメソッドがあります。

ユーザ側からwrite()で渡したデータはconnectionからはstream.upstream.read()で取得し、connectionからstream.upstream.write()で渡したデータはユーザ側からはread()で取得できます。

_read()_write()はユーザ側とのやりとりで、_receive()_send()はconnection、つまりupstream側とのやりとりで呼ばれるメソッドです。_write()_receive()は引数に関数readyが渡されるので、次のデータを受け取れる状態になったときにこれを呼ぶ必要があります。
_read(), _write(), _receive(), _send()は次の通りです。

Stream.prototype._initializeDataFlow = function _initializeDataFlow() {
  // 初期化
  this.upstream = new Flow();
  this.upstream._send = this._send.bind(this);
  this.upstream._receive = this._receive.bind(this);
};

// データを受信したとき呼ばれる
Stream.prototype._receive = function _receive(frame, ready) {
  // 次のデータを受け取れる状態になったときにready()を呼ぶ

  if (!this._ended && (frame.type === 'DATA')) {
    var moreNeeded = this.push(frame.data);
    if (!moreNeeded) {
      this._receiveMore = ready; // queueがいっぱいなのでreadyは別のタイミングで呼ぶ
    }
  }

  if (!this._ended && (frame.flags.END_STREAM || (frame.type === 'RST_STREAM'))) {
    this.push(null);
    this._ended = true;
  }

  if (this._receiveMore !== ready) {
    ready();
  }
};

// ユーザ側がデータを読み込みたいときに呼ばれる
Stream.prototype._read = function _read() {
  if (this._receiveMore) {
    var receiveMore = this._receiveMore;
    delete this._receiveMore;
    receiveMore(); // 上述のreadyを呼んだので次のデータがまた_receiveに流れてくる
  }
};

// ユーザ側から書き込まれたときに呼ばれる
Stream.prototype._write = function _write(buffer, encoding, ready) {
  var moreNeeded = this._pushUpstream({
    type: 'DATA',
    flags: {},
    stream: this.id,
    data: buffer
  });

  if (moreNeeded) {
    ready(); 
  } else {
    this._sendMore = ready; // ready()はまだ呼ばず、queueが空いて送信できるようになったときに呼ぶ
  }
};

// 次のデータを送信したいときに上流から呼ばれる
Stream.prototype._send = function _send() {
  if (this._sendMore) {
    var sendMore = this._sendMore;
    delete this._sendMore;
    sendMore(); 
  }
};

Stream.prototype._pushUpstream = function _pushUpstream(frame) {
  this.upstream.push(frame);
  this._transition(true, frame); // 状態遷移
};

まとめと感想

HTTP/2はステートフルで実装が複雑になりすぎるという批判がありましたが確かに少し難しい感じがします。streamの多重化(ID管理)、状態管理、優先度、フロー制御など機能が多く、保持する情報・変数も多くなるのでゴチャゴチャにならないようしっかり切り分けて実装するのが大事だと思いました。

Stream APIはかなり便利でよしなにバッファリングをしてくれることはもちろん、push()pipe()'data'イベントなど簡単なインターフェースでデータの入力元、出力先の実装や利用ができるのもいい点だと思います。

takaaki7
https://plaid.co.jp/
https://plaid.co.jp/
Why not register and get more from Qiita?
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away