LoginSignup
13
8

More than 1 year has passed since last update.

[いよいよ年明けリリース!?]WebTransportでもechoがしたい!

Last updated at Posted at 2021-10-30

他のWebTransport関連記事はこちら

2021/11/13追記 Chromeの公式サンプルがM97以降で動くようになりました!

公式サンプルが普通に動くようになりました! やったね。

WebTransportがChromeM97でいよいよリリースです!

Webを愛する皆さんこんにちは。
去年、一昨年、WebsoketやWebRTCの新しい技術として注目を集めたWebTransportがいよいよ年明けにもリリースされるようです。
いやぁ、WebRTCが2014ごろにリリースされてから早7年、ようやくという感じです。

今回はCanaryがM97となり、Origin Trials不要になったので公式サンプルを修正して動かしてみます。
そもそもWebTransportとはなんぞやみたいな話はNTTコミュニケーションズさんの記事が詳しいのでそちらをご覧ください。

大きな変更点まとめ

  • Chrome Canary M97ではOrigin Trialsの登録が不要になった
  • プロトコルは quic-transport:// ではなく https:// に決定
  • H3_DATAGRAM は 0x276 ではなく 0xffd277
  • QUICがRDC9000になった
  • 公式サンプルはjsが古くて動かない

作ったもの

githubにローカルで動作するクライアントとサーバーのソースコードをアップしました。

手順

とりあえず動かしてみたい方は以下の手順で動作確認ができます。
今回はIntelMac(Python3.9)で動かしました。
Windowsなどでも動くかとは思います。

必要なソフトをインストールする

Chrome Canary をダウンロードします
Python3.xがない場合はインストールしてください。
今回のソースコードはgithubに置いてあります。
Macの場合はOpenSSLをインストールする必要があります。(デフォルトでインストールされているLiberaSSLではオプションが異なるため)

$ brew install openssl

pythonのライブラリをインストールして一部修正する

pythonでの通信にaioquicというライブラリを使用しますが、H3_DATAGRAMの値が古いので修正します。

$ pip3 install aioquic
# pipenvなどを使っていない場合は多分↓にあると思います。
$ vim /usr/local/lib/python3.9/site-packages/aioquic/h3/connection.p

# fix from
    H3_DATAGRAM = 0x276
# to
    H3_DATAGRAM = 0xffd277

自己証明書を準備する

通信は全てTLSになるので証明書が必要になります。
fingerprintをCanary起動時に指定します。

# 証明書を作成(certificate.pemと.keyができます)
$ openssl req -newkey rsa:2048 -nodes -keyout certificate.key \
  -x509 -out certificate.pem -subj '/CN=Test Certificate' \
  -addext "subjectAltName = DNS:localhost"

# fingerprintを作成(canary起動時に指定)
$ openssl x509 -pubkey -noout -in certificate.pem |
  openssl rsa -pubin -outform der |
  openssl dgst -sha256 -binary | base64

サーバーを起動する

# server
$ python3 py_server/server.py certificate.pem certificate.key

# server for client.html
$ python3 -m http.server

canaryを起動する

起動時に自己証明書のfingerprintを指定します。(しないとTLSエラーで接続できません)

/Applications/Google\ Chrome\ Canary.app/Contents/MacOS/Google\ Chrome\ Canary \
    --origin-to-force-quic-on=localhost:4433 \
    --ignore-certificate-errors-spki-list=[fingerprint]

アクセスしてechoすることを確認する

http://localhost:8000/client.htmlにアクセスします。
するとこんなページが出てきます。

まず、HTTP/3はコネクションを張るので、URLのところはそのままで「Connect」ボタンを押して接続します。
すると、1RTTのハンドシェイクが行われ、接続できたことがEvent Logに表示さります。

さて、HTTP/3には三種類の通信方式があります。

    1. datagram : UDPっぽく使える
    1. Unidirectional Stream : 一方向通信のTCPのようなもの
    1. Bidirectional Stream : 双方向通信のTCPのようなもの

フォームのラジオボタンから方式を選び、すぐ上のテキストエリアに文字を入力して「Send Data」を押してみましょう。
するとデータが送信され、エコーされることが確認できます。

解説

さて、ソースコードが何をしているのか確認してみましょう。
HTTP/3の詳細な話はまた別でやりたいと思いますが、
WebTransportを利用する上での要点としては三つくらいです。

  • コネクションの接続
  • データグラムの送信・受信待ち受け
  • ストリームの送信・受信待ち受け

今回はechoなので、それぞれの通信方式でデータを送り返しています。

  • datagramが送信されたらdatagramを送り返す
  • unidirectional streamを受け取ったらクライアント向きのunidirectional streamで送りかえす
  • bidirectional streamを受け取ったら、そのままデータを送りかえす

クライアントサイド

ファイル構造はシンプルで、普通のHTML/JS/CSSです。

  • client.html
  • client.js
  • client.css

接続する

まず、クライアントでは「Connect」ボタンが押されたら接続処理を行い、データグラムとストリームの待ち受け処理を作ります。
接続はとてもシンプルで、WebTransportのコンストラクタにURLを渡してreadyで同期させるだけです。

client.js
async function connect() {
  try {
    const url = document.getElementById("url").value;
    wt = new WebTransport(url);
    addToEventLog('Initiating connection...');
    await wt.ready;
    addToEventLog('Connection ready.');

    wt.closed
      .then(() => {setUIStart(); addToEventLog('Connection closed normally.'); })
      .catch(() => {setUIStart(); addToEventLog('Connection closed abruptly.', 'error')});

    streamNumber = 1;
    datagramWriter = wt.datagrams.writable.getWriter();

    // データグラムの待ち受け処理
    readDatagrams();
    // Unidirectional Stream の待ち受け処理
    acceptUnidirectionalStreams();

    setUIConnected();
  } catch (e) {
    addToEventLog(`Connection failed. ${e}`, 'error');
  }
}

待ち受け処理

データグラムとストリームの待ち受け処理を作ります。
データグラムはセッションが一つなので待ち受け処理も一つです。
ストリームは複数セッション張れるので、返信用のUnidirectionalを待ち受けます。(Bidirectionalはこちらから張るのでその時に受信処理を書きます)

以下はデータグラムの受信処理です。
受信したデータをログに出力しているだけです。

W3Cのサンプルを見る限りではコメントアウトしている for await ブロックのみで完結するようなのですが、残念ながら実装中のようなのです。
(というか、for await とか初めて知りました。。)

client.js
async function readDatagrams() {
  try {
    /*
    const decoder = new TextDecoderStream("utf-8");
    for await (const data of wt.datagrams.readable.pipeThrough(decoder)) {
      addToEventLog(`Datagram received: ${data}`);
    }
    */
    let decoder = new TextDecoder("utf-8");
    let reader = wt.datagrams.readable.getReader();
    while(true) {
      const {value, done} = await reader.read();
      if (done) {
        addToEventLog('Done reading datagrams!');
        return;
      }
      let data = decoder.decode(value);
      addToEventLog(`Datagram received: ${data}`);
    }
  } catch (e) {
    addToEventLog(`Error while reading datagrams: ${e}`, 'error');
  }
}

次にUnidirectional Stream の待ち受け処理です。
こちらからUnidirectionalを張ってデータを送信すると、サーバー側でechoとしてUnidirectionalをクライアントに向けて張るので受信処理を登録します。
このデータ受信処理(readFromIncomingStream)はBirectionalでも同じです。

client.js
async function acceptUnidirectionalStreams() {
  try {
    /*
    for await (const readable of wt.incomingUnidirectionalStreams) {
      const number = streamNumber++;
      addToEventLog(`New incoming unidirectional stream #${number}`);
      // 受信用の処理を登録する
      readFromIncomingStream(readable, number);
    }
    */
    let reader = wt.incomingUnidirectionalStreams.getReader();
    while (true) {
      const {value, done} = await reader.read();
      if (done) {
         addToEventLog('Done accepting unidirectional streams!');
        return;
      }
      let readable = value;
      let number = streamNumber++;
      addToEventLog(`New incoming unidirectional stream #${number}`);
      // 受信用の処理を登録する
      readFromIncomingStream(readable, number);
    }
  } catch (e) {
    addToEventLog(`Error while accepting streams ${e}`, 'error');
  }
}

async function readFromIncomingStream(readable, number) {
  try {
    /*
    const decoder = new TextDecoderStream("utf-8");
    for await (const chunk of readable.pipeThrough(decoder)) {
      addToEventLog(`Received data on stream #${number}: ${chunk}`);
    }
    */
    let decoder = new TextDecoderStream("utf-8");
    let reader = readable.pipeThrough(decoder).getReader();
    while (true) {
      const {value, done} = await reader.read();
      if (done) {
        addToEventLog('Stream #' + number + ' closed');
        return;
      }
      let data = value;
      addToEventLog(`Received data on stream #${number}: ${data}`);
    }
  } catch (e) {
    addToEventLog(`Error while reading from stream #${number}: ${e}`, 'error');
    addToEventLog(`    ${e.message}`);
  }
}

送信処理

「Send Data」ボタンが押されたら種類に合わせてデータを送信します。
基本的にはストリームを作成してwriterを取得してwriteするだけです。

client.js
// 接続時に設定される
let wt, streamNumber, datagramWriter;

async function sendData() {
  const form = document.forms.sending.elements;
  const rawData = sending.data.value;
  const data = new TextEncoder("utf-8").encode(rawData);
  try {
    switch (form.sendtype.value) {
      case "datagram": {
        // データグラムを送信しているだけ
        await datagramWriter.write(data);
        addToEventLog(`Sent datagram: ${rawData}`);
        break;
      }
      case "unidi" : {
        const writable = await wt.createUnidirectionalStream();
        const writer = writable.getWriter();
        await writer.write(data);
        await writer.close();
        addToEventLog(`Sent a unidirectional stream with data: ${rawData}`);
        break;
      }
      case "bidi": {
        const duplexStream = await wt.createBidirectionalStream();
        const n = streamNumber++;
        // サーバーからの返信を受信する設定
        readFromIncomingStream(duplexStream.readable, n);

        const writer = duplexStream.writable.getWriter();
        await writer.write(data);
        await writer.close();
        addToEventLog(`Sent bidirectional stream #${n} with data: ${rawData}`);

        break;
      }
    }
  } catch (e) {
    addToEventLog(`Error while sending data: ${e}`, 'error');
  }
}

クライアント処理は以上です。
for await が使えるようになるとかなりスッキリしそうですね。

サーバーサイド

サーバーでは大きく分けて三つの処理が必要になります。

  • サーバーを初期化する
  • コネクションを処理する
  • イベントを処理する

サーバーを初期化する

まずはサーバーの初期化です。
TLS証明書やポート番号を指定して起動します。

py_server/server.py
if __name__ == '__main__':
    parser = argparse.ArgumentParser()
    parser.add_argument('certificate')
    parser.add_argument('key')
    args = parser.parse_args()

    configuration = QuicConfiguration(
        alpn_protocols=H3_ALPN,
        is_client=False,
        max_datagram_frame_size=65536,
    )
    configuration.load_cert_chain(args.certificate, args.key)

    loop = asyncio.get_event_loop()
    loop.run_until_complete(
        serve(
            BIND_ADDRESS,
            BIND_PORT,
            configuration=configuration,
            create_protocol=WebTransportProtocol,
        ))
    try:
        logging.info(
            "Listening on https://{}:{}".format(BIND_ADDRESS, BIND_PORT))
        loop.run_forever()
    except KeyboardInterrupt:
        pass

コネクションを処理する

次にコネクション処理を書きます。
この辺りはほぼ公式のサンプル通りです。
ハンドシェイクやヘッダーやらOriginなどの処理があります。

py_server/server.py
class WebTransportProtocol(QuicConnectionProtocol):

    def __init__(self, *args, **kwargs) -> None:
        super().__init__(*args, **kwargs)
        self._http: Optional[H3Connection] = None
        self._handler: Optional[CounterHandler] = None

    def quic_event_received(self, event: QuicEvent) -> None:
        if isinstance(event, ProtocolNegotiated):
            self._http = H3Connection(self._quic, enable_webtransport=True)
        elif isinstance(event, StreamReset) and self._handler is not None:
            # Streams in QUIC can be closed in two ways: normal (FIN) and
            # abnormal (resets).  FIN is handled by the handler; the code
            # below handles the resets.
            self._handler.stream_closed(event.stream_id)

        if self._http is not None:
            for h3_event in self._http.handle_event(event):
                self._h3_event_received(h3_event)

    def _h3_event_received(self, event: H3Event) -> None:
        if isinstance(event, HeadersReceived):
            headers = {}
            for header, value in event.headers:
                headers[header] = value
            if (headers.get(b":method") == b"CONNECT" and
                    headers.get(b":protocol") == b"webtransport"):
                self._handshake_webtransport(event.stream_id, headers)
            else:
                self._send_response(event.stream_id, 400, end_stream=True)

        if self._handler:
            self._handler.h3_event_received(event)

    def _handshake_webtransport(self,
                                stream_id: int,
                                request_headers: Dict[bytes, bytes]) -> None:
        authority = request_headers.get(b":authority")
        path = request_headers.get(b":path")
        if authority is None or path is None:
            # `:authority` and `:path` must be provided.
            self._send_response(stream_id, 400, end_stream=True)
            return
        if path == b"/counter":
            assert(self._handler is None)
            self._handler = CounterHandler(stream_id, self._http)
            self._send_response(stream_id, 200)
        else:
            self._send_response(stream_id, 404, end_stream=True)

    def _send_response(self,
                       stream_id: int,
                       status_code: int,
                       end_stream=False) -> None:
        headers = [(b":status", str(status_code).encode())]
        self._http.send_headers(
            stream_id=stream_id, headers=headers, end_stream=end_stream)

イベントを処理する

最後にイベントを処理します。
今回はechoサーバーなので、データグラムにはデータグラムで、ストリームにはストリームでデータを送信します。
なお、データグラムについては受信したデータをそのまま返しますが、
ストリームでは受信したデータを self._texts[stream_id]に溜め込み、受信が完了した時点でデータを返しています。

py_server/server.py
class CounterHandler:

    def __init__(self, session_id, http: H3Connection) -> None:
        self._session_id = session_id
        self._http = http
        self._texts = defaultdict(bytes) // ここにストリームの受信データを貯めておきます

    // これが今回のメインechoな処理
    def h3_event_received(self, event: H3Event) -> None:
         // データグラムを受信した場合そのままデータグラムで返す
        if isinstance(event, DatagramReceived):
            logging.info("receive and send datagram {}".format(event.data.decode(encoding='utf-8')))
            payload = event.data # str(len(event.data)).encode('ascii')
            self._http.send_datagram(self._session_id, payload)

        // ストリームの場合
        if isinstance(event, WebTransportStreamDataReceived):
            self._texts[event.stream_id] += event.data
            // 受信が完了するまでデータをバッファするclosedされた時のevent.dataは空になっているのでバッファが必要
            if event.stream_ended:
                payload = self._texts[event.stream_id]
                // Unidirectionalの場合クライアントに向けてUnidirectional Streamを作る
                if stream_is_unidirectional(event.stream_id):
                    logging.info("receive and send unidirectional stream {}".format(payload.decode('utf-8')))
                    response_id = self._http.create_webtransport_stream(
                        self._session_id, is_unidirectional=True)
                // Bidirectionalの場合は双方向通信なのでそれに対してデータを返す
                else:
                    logging.info("receive and send bidirectional stream {}".format(payload.decode('utf-8')))
                    response_id = event.stream_id
                self._http._quic.send_stream_data(
                    response_id, payload, end_stream=True)
                self.stream_closed(event.stream_id)

    def stream_closed(self, stream_id: int) -> None:
        try:
            del self._texts[stream_id]
        except KeyError:
            pass

まとめ

通信方式が三種類あるため、一見ややこしそうに見えますが、
クライアントもサーバーも処理自体はとてもシンプルです。
今後活用していくにあたってはサーバー側の作り込みやプロトコルの理解が必要になってくるのでその辺りを引き続き調べようと思います。

他のWebTransport関連記事はこちら

13
8
6

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
13
8