8
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

[ビデオエコーのデモあり]GoでもWebTransportがしたい!

Last updated at Posted at 2022-03-20

WebTransportのビデオエコーデモの公開がしたい!

スクリーンショット 2022-03-20 19.04.21.png
(左がカメラ、右がエコーされたもの)

画面上で52ミリ秒か。
早い、確かに早い。
ブラウザからエンコードしたchunkデータにタイムスタンプを埋め込み、サーバーにエコーさせてchunkデータが戻ってくるまでわずか41ms
これなら確かによりリアルタイムな何かができるだろう。
しかしこれを実用化するには別の問題があった。安定性だ。

781, 772, 763 ...
694, 682, 678 ...
黒い画面に映し出される数字は数秒毎にじりじりと減っていく。
121, 110, 103, 99
メモリ残量を表す vmstatfree の値はとうとう残り100MBを切ってしまったことを示していた。

92, 106, 102, 98, 112, 108 ...
お? ちゃんとGCが仕事をしてる?? メモリを食い尽くさずに安定稼働できるのか??

そう一抹の期待を抱きながら、ふと視線をずらすと、そこには固まって動きのないウェブカメラの画像が表示されていた。
ああ、やはり10分は持たないか。。
ブラウザでエンコードしたウェブカメラの動画・音声データをサーバーに送信し、Pythonのaioquicで送り返しているだけなのに、どうしてメモリリークしてしまうのだろうか。。

ああ、Python以外の言語とライブラリでWebTransportがしたい!

というわけで安定稼働するようにしたビデオエコーのデモがこちらです。(とりあえず3/31まで)

しょーもない前置きはこれくらいにして本題です!
ビデオエコーを試せるデモを立ててみました。

デモページはこちら

使い方

  1. Chrome M97以降でここにアクセスし、画面右のconnectをクリック
  2. ビデオとマイクのアクセス許可を求められるので許可します。
  3. 画面上部でカメラの解像度やコーデックを選択できます。
    1. datagramが選択されていることを確認します。(バッファリングなどは行なっていません)
    2. 解像度はカメラによって対応していないものがあります。
    3. 動画のコーデックは vp8 vp9 h264 が選べます。(AV1はWebCodecs未実装だったと思います)
    4. 音声のコーデックは opus のみです。(WebCodecsは対応ずみなはずですが、デスクリプションの記載方法がよくわかっていません)
  4. ▶︎ボタンを押すとビデオエコーが始まります。
    1. ハウリングにご注意ください。
    2. 設定を変更するときは一度リロードしてください。
  5. 画面下部にレイテンシーやフレーム情報が出力されます。

動作環境

  • サーバー
    • AWS t4g.small メモリ2GB
    • Go1.17

ソースコード

プログラムはこちらです。

Pythonのqioquic以外でWebTransportをするには?

さて、サーバーが安定しないのはどうもaioquicの使い方がよくないようでした。
と言ってもライブラリをざっとみても、あるいはコネクションをdelしてみても改善されないようだったので他のライブラリを探すことにします。

WebTrasnportに対応しようとしているライブラリなどは quic-goneqo などあるのですが、
どうもマージされる気配がなかったり、かと言って自分でその辺りを実装するにはまだまだ不明点も多くあり、なんか良い方法ないかとgithubを漁っていました。

QuicライブラリでWebTransport通信を実装しているライブラリを発見

そこで見つけたのが quic-goを使いWebTransportを実装している二つのリポジトリです。

  • webtransport-go : 前述のquic-goの中々マージされないプルリクに投稿されたもの。マージされないならマージしなくて良いものを作ってしまえいうことのようです。
  • yomo-presense-background : こちらのソースコードに懇切丁寧に解説が入っているのでかなり参考になります。

上記はいずれもquic-goを使ってブラウザで動くサンプルもあるので、これを参考にすればWebTransportのサーバーサイドの開発の選択肢が広がるはずです。

プログラムのお話

ここからは具体的にソースコードを追っていきたいと思います。

処理の要点を理解するためにわざわざベタっと書くように切り出しています。
WebCodecs周りもそうですが、今後きれいにモジュール化していきたいところです。

WebTransportはQuicプロトコルを利用するので、基本的にはQuicライブラリを使えば実装はできそうです。
ただし、HTTP/3でのハンドシェイクを必要とするため、この辺りがどう実装して良いかイメージしづらいところとなっていました。

ともあれ、上記のリポジトリから要点をまとめると次のようになります。

  1. Quicサーバーを起動する
    1. TLSが必須なため、証明書を読み込ませる
  2. 接続を待ち受ける
    1. 端方向のCONTROLストリームを作成し、SETTINGSフレームを送信する。(WebTransportとh3_datagramに対応していることを通知する)
    2. 端方向のCONTROLストリームを受信し、SETTINGSフレームを確認する。
    3. 双方向ストリームを受信し、HTTP/3リクエストを処理する
      1. HTTP/3ヘッダーを取得し、:protocolを確認したり:pathをもとに処理したいハンドラを設定する。
      2. HTTP/3ヘッダーをエンコードし、200レスポンスを返す
    4. ハンドラの処理を開始する。
      1. データグラムの場合 : データグラムを受信し、直ちにデータを送信する。
      2. ストリームの場合 : 端方向ストリームを受信し、データを受信したら同じデータを送信する。

ライブラリや定数など

まずはQuicなどの依存ライブラリを読み込み、次にHTTP/3WebTransportで使う定数を定義します。
気になるのはHTTP/3のヘッダー圧縮に関わるQPACKくらいでしょうか。
ということは他の言語でもQuicQPACKに対応したライブラリがあれば実装できそうです。

server.go
package main

// 必要なライブラリ(全て)
import (
	"bufio"
	"bytes"
	"context"
	"crypto/tls"
	"errors"
	"io"
	"io/ioutil"
	"log"
	"net/http"
	"net/url"
	"strconv"
	"strings"

	"github.com/lucas-clemente/quic-go"
	"github.com/lucas-clemente/quic-go/quicvarint"
	"github.com/marten-seemann/qpack"
)

// 必要な定数(ライブラリに定義済みのものもあります)
const STREAM_TYPE_CONTROL = 0x00
const STREAM_TYPE_WEBTRANSPORT_UNI = 0x54
const FRAME_TYPE_HEADER = 0x01
const FRAME_TYPE_SETTINGS = 0x04
const H3_DATAGRAM = 0xffd277
const ENABLE_WEBTRANSPORT = 0x2b603742

main関数

通常はコマンドラインパラメータで証明書のファイル名やポート番号を指定すると思いますが、今回は特に何も処理を入れていません。
go func() { wt.Serve("0.0.0.0:4433", certs)}() とさくっと非同期処理ができてしまうのがGoの良いとこだと思います。

server.go
func main() {
	cert, err := tls.LoadX509KeyPair("cert.pem", "cert.key")
	if err != nil {
		log.Println(err)
		return
	}
	certs := []tls.Certificate{cert}
	wt := NewWebTransportServer()
	wtErr := make(chan error)

	addr := "0.0.0.0:4433"
	go func() {
		// WebTransport Server. (UDP)
		wtErr <- wt.Serve(addr, certs)
	}()

	select {
	case err := <-wtErr:
		log.Println(err)
		return
	}
}

Quicサーバーを起動して待ち受ける

Quicサーバーを起動してコネクションを待ち受けます。
TCPやUDPのサーバーと大きく異なるところはなさそうです。

for {}でコネクションを待ち受け、接続があれば go wt.handleSession(sess)で処理を開始します。

server.go
func (wt *WebTransportServer) Serve(addr string, certs []tls.Certificate) error {
	tlsConfig := &tls.Config{
		Certificates: certs,
		NextProtos:   []string{"h3"},
	}
	quicConfig := &quic.Config{
		EnableDatagrams: true,
	}

	listener, err := quic.ListenAddr(addr, tlsConfig, quicConfig)
	if err != nil {
		return err
	}
	log.Printf("WebTransport server listening on %s", listener.Addr().String())

	for {
		sess, err := listener.Accept(context.Background())
		if err != nil {
			log.Println(err)
			continue
		}
		log.Printf("+Session: %s", sess.RemoteAddr().String())
		go wt.handleSession(sess)
	}
}

接続されたらハンドシェイクする

Quicレベルの処理はライブラリがわでよしなにやってくれるので、早速HTTP/3のハンドシェイクを行います。
まず、こちらからCONTROLストリームを作成してSETTINGSフレームを送信し、WebTransportやデータグラムに対応していることを示します。
次に、クライアントからのCONTROLストリームを受信し、SETTINGSフレームを確認します。

これが済むとブラウザではnew WebTransport()が通ります。
次にHTTP/3のリクエストストリーム(双方向)を受け付け、HTTPヘッダーを確認して200を返します。
ここまでいけるとブラウザが wt.ready()のプロミスを解決してくれて、晴れてWebTransport通信を開始することができます。

server.go
func (wt *WebTransportServer) handleSession(sess quic.Session) {

	// 1. send setting frame. ENABLE_WEBTRANSPORT
	wt.sendSettingFrame(sess)

	//  2. recv setting frame.
	wt.receiveSettingFrame(sess)

    // 3. HTTP/3のリクエスト処理
	conn, err := wt.receiveClientConnect(sess)
	if err != nil {
		log.Println("HTTP CONNECT failed", err)
		return
	}

	if conn != nil {
		log.Println("conn run")
		conn.run()
	}
	log.Println("Prepared! Start to work...")
}

SETTINGSフレームを送る

ここからは実際にQuicデータをバイナリとして書き込んでいきます。
ストリームやフレームといった概念が紛らわしいですが、データ構造をきちんと把握しておきたいところです。

まずは端方向ストリームを作成し、最初にストリーム種別がCONTROLであることを設定します。
次にフレームがSETTINGSであることを設定します。
長さパラメータを設定した後、実際にパラメータと値を入力します。

server.go
func (wt *WebTransportServer) sendSettingFrame(sess quic.Session) {
	log.Printf("[1] Send SETTINGS frame")

	respStream, err := sess.OpenUniStream()
	if err != nil {
		log.Println(err)
		return
	}

	buf := &bytes.Buffer{}
	quicvarint.Write(buf, STREAM_TYPE_CONTROL)

	// 7. HTTP Framing Layer
	// HTTP/3 Frame Format {
	// 	 Type (i),
	// 	 Length (i),
	// 	 Frame Payload (..),
	// }
	quicvarint.Write(buf, FRAME_TYPE_SETTINGS)

	var l uint64
	l += uint64(quicvarint.Len(ENABLE_WEBTRANSPORT) + quicvarint.Len(1))
	quicvarint.Write(buf, l)

	// Write value
	// Setting {
	//   Identifier (i),
	//   Value (i),
	// }

	// SETTINGS Frame {
	//   Type (i) = 0x04,
	//   Length (i),
	//   Setting (..) ...,
	// }
	//

	quicvarint.Write(buf, H3_DATAGRAM)
	quicvarint.Write(buf, 1)
	quicvarint.Write(buf, ENABLE_WEBTRANSPORT)
	quicvarint.Write(buf, 1)

	bbb := buf.Bytes()
	log.Printf("\t>>>bbb:[len=%d] %# x", len(bbb), bbb)

	n, err := respStream.Write(bbb)
	if err != nil {
		log.Println(err)
		return
	}

	log.Printf("\t>>>wrote n:%d", n)
	log.Printf("\tSettings frame sent !")
}

SETTINGSフレームを受け取る

次にブラウザからの設定を受け取ります。
汎用的なサーバーを開発するのであればきちんと設定値を確認しないといけませんが、今回は検証目的なので特にチェックはしていません。

server.go
func (wt *WebTransportServer) receiveSettingFrame(sess quic.Session) {
	recvSettingStream, _ := sess.AcceptUniStream(sess.Context())
	log.Printf("[2] receive client SETTINGS frame")

	sqr := quicvarint.NewReader(recvSettingStream)
	//  control stream type
	sty, err := quicvarint.Read(sqr)
	if err != nil {
		log.Println(err)
		return
	}
	log.Printf("\tStreamType: %# x\r\n", sty)

	//
	ftype, err := quicvarint.Read(sqr)
	if err != nil {
		log.Println(err)
		return
	}
	log.Printf("\tFrameType: %# x\r\n", ftype)

	// setting length
	flen, err := quicvarint.Read(sqr)
	if err != nil {
		log.Println(err)
		return
	}
	log.Printf("\tLength: %# x(oct=%d)\r\n", flen, flen)

	payload := make(map[uint64]uint64)
	payloadBuf := make([]byte, flen)
	if _, err := io.ReadFull(recvSettingStream, payloadBuf); err != nil {
		log.Println(err)
		return
	}
	bb := bytes.NewReader(payloadBuf)
	for bb.Len() > 0 {
		id, err := quicvarint.Read(bb)
		if err != nil {
			log.Println(err)
			return
		}
		value, err := quicvarint.Read(bb)
		if err != nil {
			log.Println(err)
			return
		}
		payload[id] = value
		log.Printf("\tidentifier:%# x, value: %d (%# x)\r\n", id, value, value)
	}
}

HTTP/3リクエスト処理

さて、通常のWebプログラム同様にHTTP/3のヘッダーのパスを見てロジックを切り替えることになります。
ただしヘッダーはQPACKでエンコード・デコードしないといけません。
また、ドラフト段階ではそれを示すためのフラグを設定する必要があります。
ここは双方向ストリームでのやりとりになります。(閉じてはいけません)

server.go
func (wt *WebTransportServer) receiveClientConnect(sess quic.Session) (EventHandler, error) {
	ctx := context.Background()
	reqStream, err := sess.AcceptStream(ctx)
	log.Printf("[3] Recieve HTTP CONNECT from client")

	if err != nil {
		return nil, err
	}

	log.Printf("\trequest stream accepted: %d", reqStream.StreamID())

	qr := quicvarint.NewReader(reqStream)
	t, err := quicvarint.Read(qr)
	if err != nil {
		log.Println(err)
		return nil, err
	}
	log.Printf("\tt: %# x", t)

	ll, err := quicvarint.Read(qr)
	if err != nil {
		log.Println(err)
		return nil, err
	}
	log.Printf("\tll: %# x", ll)

	if t != FRAME_TYPE_HEADER {
		// header frame
		log.Println("\tnot header frame, should force close connection!!!!!")
		return nil, errors.New("not header frame")
	}

	headerBlock := make([]byte, ll)
	var n int
	if n, err = io.ReadFull(reqStream, headerBlock); err != nil {
		return nil, err
	}
	log.Printf("\tn: %d", n)
	decoder := qpack.NewDecoder(nil)
	headers, err := decoder.DecodeFull(headerBlock)
	if err != nil {
		return nil, err
	}

	// check header.
	var path string
	for k, v := range headers {
		log.Printf("\t[header] %d: %s", k, v)

		if v.Name == ":path" {
			path = v.Value
		}
	}

	var conn EventHandler
	u, err := url.Parse(path)
	if err != nil {
		log.Println("parse :path failed: ", err)
	}

	if u.Path == "/audio/echo/stream" {
		conn = &StreamEcho{session: sess, ctx: ctx}
	}
	if u.Path == "/video/echo/stream" {
		conn = &StreamEcho{session: sess, ctx: ctx}
	}
	if u.Path == "/audio/echo/datagram" {
		conn = &DatagramEcho{session: sess}
	}
	if u.Path == "/video/echo/datagram" {
		conn = &DatagramEcho{session: sess}
	}

	// response
	{
		status := http.StatusOK
		header := http.Header{}
		header.Add(":status", strconv.Itoa(status))
		header.Add("Sec-Webtransport-Http3-Draft", "draft02")
		var qpackHeaders bytes.Buffer
		encoder := qpack.NewEncoder(&qpackHeaders)
		for k, v := range header {
			log.Printf("\t[response header] %s: %s", k, v)
			for index := range v {
				log.Printf("\t[range v] v=%s, index=%d, v[index]=%s", v, index, v[index])
				encoder.WriteField(qpack.HeaderField{
					Name:  strings.ToLower(k),
					Value: v[index],
				})
			}
		}

		buf := &bytes.Buffer{}
		quicvarint.Write(buf, FRAME_TYPE_HEADER)
		quicvarint.Write(buf, uint64(qpackHeaders.Len()))
		log.Printf("\tresponse 200: %# x %# x", buf.Bytes(), qpackHeaders.Bytes())

		writer := bufio.NewWriter(reqStream)
		if n, err = writer.Write(buf.Bytes()); err != nil {
			return nil, err
		}
		log.Printf("\tn1=%d", n)
		if n, err = writer.Write(qpackHeaders.Bytes()); err != nil {
			return nil, err
		}
		if err = writer.Flush(); err != nil {
			return nil, err
		}
		log.Printf("\tn1=%d", n)
	}

	return conn, nil
}

エコーする(データグラム)

さて、ここまできてようやくロジックを書くことができます。
先ほど、パスをチェックするところでハンドラを定義し、レスポンス呼び出し時に実行するように。

server.go
func (c *DatagramEcho) run() {
	go c.read()
}

func (c *DatagramEcho) read() {
	for {
		msg, err := c.session.ReceiveMessage()
		if err != nil {
			log.Println(err)
			break
		}

		// echo
		{
			buf := &bytes.Buffer{}
			buf.Write(msg)
			err = c.session.SendMessage(buf.Bytes())
			if err != nil {
				log.Println(err)
			}
		}
	}
}

エコーする(ストリーム)

さて、ストリームはデータグラムより少しややこしいです。
今回は1フレームを1ストリームとしているため、単方向ストリームを受信し、データを受信し終わったらすぐに送信する形にしました。

単方向ストリームは先頭にストリームタイプとセッションIDがあり、その後データが続きます。
ここのセッションIDは、作成したストリームのIDではなく、最初のHTP/3リクエストのストリームID(基本的には0)になります。

 0                   1                   2                   3
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
|                           0x54 (i)                          ...
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
|                        Session ID (i)                       ...
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
|                         Stream Body                         ...
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+

ioutil.ReadAll()で全部読み込まずに、stream.Read(buf)で読んだデータを即時送信する方法の方が良い気がしたのですが、
読み取り完了時にRead()がすぐに制御を返してくれない挙動にハマったので、ひとまずioutil.ReadAll()にしました。

server.go
func (c *StreamEcho) read() {
	for {
		stream, err := c.session.AcceptUniStream(c.ctx)
		if err != nil {
			log.Println(err)
			return
		}
		log.Println("accept stream.")

        // ここを非同期にしないとブロックして処理が進まない
		go func(stream quic.ReceiveStream) {
			// read heder
			qr := quicvarint.NewReader(stream)
			sty, err := quicvarint.Read(qr)
			if err != nil {
				log.Println(err)
				return
			}
			log.Printf("\tStreamType: %# x\r\n", sty)
			if sty != STREAM_TYPE_WEBTRANSPORT_UNI {
				return
			}
			sid, err := quicvarint.Read(qr)
			if err != nil {
				log.Println(err)
				return
			}

			log.Printf("accept stream id %d, session %d.", stream.StreamID(), sid)

			// echo by uni stream.
			res, err := c.session.OpenUniStream()
			if err != nil {
				log.Println(err)
				return
			}
			log.Printf("open %d stream.", res.StreamID())
			defer res.Close()

			h := &bytes.Buffer{}
			quicvarint.Write(h, STREAM_TYPE_WEBTRANSPORT_UNI)
			quicvarint.Write(h, uint64(sid))
			res.Write(h.Bytes())

			buf, err := ioutil.ReadAll(stream)
			if err != nil {
				log.Println(err)
			}
			res.Write(buf)
		}(stream)
	}
}

まとめ

何はともあれ、Goで安定してビデオエコーができるようになったのは大きいと思います。
エラー処理や非同期処理まわり、ブラウザ周りの処理などやることはまだまだたくさんありますが、
面白い分野ではあるのでもっと盛り上がってくれば良いなと思います。

8
2
1

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
8
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?