13
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

HTTPでも双方向ソケット通信にチャレンジしてみた(けどやめた)

Last updated at Posted at 2023-12-03

フューチャーアドベントカレンダー2023の4日目のエントリーです。昨日はkaedemaluさんのかゆいところに手が届く、Terraformの書き方 (configuration_aliasesの使い方)でした。

Go 1.20で追加されたResponseControllerに対して、1.21でEnableFullDuplexというメソッドが増えていたので、これ使ったら双方向通信がHTTP/1.1でもできるのではないか?と思って触ってみました。

ResponseControllerは辻さんが技術ブログに書いてくれていました。

EnableFullDuplex()メソッドは、HTTP/1.1でも受信が終わらないうちに結果を返し始められるようになるというものです。これが使えるなら、双方向通信ができるということですよね?

HTTP/2や3ではバイナリプロトコルになり、それまででいうところのチャンク形式がデフォルトになりました。送信も受信も、細切れな小さいデータ片に分けて送れます。そのため、HTTP/2以降ではウェブソケットのような通信ができるようになりました。Chromeも、fetch()で細切れの送信が行えるようになります。ChromeだけだがallowHTTP1ForStreamingUpload: trueをつければHTTP/1.1でもいけるらしいといくつかのブログで説明もされています。

これが実現できるとうれしいことはいろいろあるな、と思っていました。考えていたことは次のとおりです

  • HTTP/1.1で双方向できると、ローカルの開発やVPC内部の通信はHTTP/1.1のままでいろいろできるようになる
  • WebSocketはプロキシ超えるのが大変だったりするので、それが簡単になるとうれしい
  • Server Sent EventsやWebSocketのAPIは追加のヘッダーを自由に送れないので、CORSで認証とかやらせにくくていまいちなのが解決できたらうれしい

これが全部解消できたら素敵だな、と思いましたが、そうではなかったよ、というお話です。

送信と受信をラップしたフロントのコード

streamを直接フロントのコンポーネントで扱うのは冗長になりすぎるので、読み込みは非同期ジェネレータでfor awaitで扱えて、書き込みは関数1つ呼べばOKな感じにしておけば使いやすいだろうと思い、まずはラッパーコードを書いてみました。

import { z } from "zod";

export type Option = {
    headers?: Headers,
    mode?: "cors" | "no-cors" | "same-origin",
    credentials?: "omit" | "same-origin" | "include",
    cache?: "default" | "no-store" | "reload" | "no-cache" | "force-cache" | "only-if-cached",
    referrer?: "about:client" | "" | string,
    referrerPolicy?: "no-referrer-when-downgrade" | "same-origin" | "origin" | "strict-origin" | "origin-when-cross-origin" | "strict-origin-when-cross-origin" | "unsafe-url",
    integrity?: string
}

export type ReadonlyOption = Option & {
    method?: "GET" | "HEAD" | "POST" | "PUT" | "DELETE" | "OPTIONS" | "PATCH" | string
    redirect?: "error" | "follow",
    
}

export type WriteonlyOption = Option & {
    method?: "POST" | "PUT" | "PATCH" | string
    redirect?: "error" | "follow" | "manual",
}

// 受信したものを行ごとにJSONにパース(zod利用)し、非同期ジェネレータとして読み込めるようにする
// 読み込みようソケット
export async function* readonlySocket<R>(url: string, schema: z.ZodType<R>, options?: ReadonlyOption): AsyncGenerator<R>  {
    const response = await fetch(url, options);
    if (response.body === null) {
        throw new Error('empty body')
    }
    const reader = response.body.getReader();
    const decoder = new TextDecoder();
    while (true) {
        const { done, value } = await reader.read();
        if (done) {
            // Do something with last chunk of data then exit reader
            return;
        }
        yield schema.parse(JSON.parse(decoder.decode(value)));
    }
}

// close/writeメソッドを返す書き込み用ソケット
// writeに呼ばれたJSONを送信する
export async function writeonlySocket<T>(url: string, options?: WriteonlyOption): Promise<{close: () => void, write: (v: T) => void}> {
    // eslint-disable-next-line @typescript-eslint/no-explicit-any
    let ctrl: ReadableStreamDefaultController<any>
    let unlock: (v: unknown) => void
    const wait = new Promise((resolve) => {
        unlock = resolve
    })
    const stream = new ReadableStream({
        start(controller) {
            ctrl = controller;
            unlock(null)
        }
    }).pipeThrough(new TextEncoderStream());

    await wait
    
    const newOpts = {
        ...options,
        body: stream,
        duplex: "half",
        allowHTTP1ForStreamingUpload: true,
    }
    if (!newOpts.method) {
        newOpts.method = "POST"
    }
    fetch(url, newOpts).then(res => {
        console.log(res.ok, res.statusText)
    })
    return {
        close() {
            ctrl.close()
        },
        write(v: T) {
            ctrl.enqueue(JSON.stringify(v) + "\n")
        }
    }
}

Reactで呼ぶコードを書いてみたのが次のとおりです。書き込みは適当にタイマーでやっていますが、writeをuseRefか何かで保持したりcontextとかを通じて子コンポーネントに渡す想定でいます

  // zodスキーマ
  const counterSchema = z.object({
    count: z.number()
  })

  useEffect(() => {
    // devでも2回呼ばないように
    if (!initialized.current) {
      initialized.current = true;

      // 読み込み用ストリーム
      (async () => {
        for await (const count of readonlySocket("/api/counter", counterSchema)) {
          // 画面に表示する
          setCount(count.count);
        }
      })();

      // 書き込み用ストリーム
      (async () => {
        const { write, close } = await writeonlySocket("/api/upload-stream");
        for (let i = 0; i < 20; i++) {
          write({timestamp: Date.now()})

          await new Promise((resolve) => setTimeout(resolve, 1000))
        }
        close();
      })();
    }
  }, [])

サーバーのコード

サーバーコードは次のとおりです。読み込みのストリームはhttp.ResponseControllerFlush()で実現できるかな、と。送信は改行区切りテキストを想定し、改行単位でよみこんでJSONパースするようにしています。

package main

import (
	"bufio"
	"encoding/json"
	"fmt"
	"io"
	"net/http"
	"time"
)

type Counter struct {
	Count int `json:"count"`
}

type Timestamp struct {
	Timestamp int `json:"timestamp"`
}

func main() {
	http.HandleFunc("/api/counter", func(w http.ResponseWriter, r *http.Request) {
		rc := http.NewResponseController(w)
		encoder := json.NewEncoder(w)
		for i := 0; i < 10; i++ {
			encoder.Encode(&Counter{Count: i})
			time.Sleep(500 * time.Millisecond)
			rc.Flush()
		}
	})

	http.HandleFunc("/api/upload-stream", func(w http.ResponseWriter, r *http.Request) {
		rc := http.NewResponseController(w)
		rc.EnableFullDuplex()
		reader := bufio.NewReader(r.Body)
		for {
			content, err := reader.ReadBytes('\n')
			if err != nil {
				fmt.Printf("err: %v\n", err)
				break
			}
			var timestamp Timestamp
			json.Unmarshal(content, &timestamp)
			fmt.Printf("timestamp: content: %v\n", timestamp.Timestamp)
		}
		w.WriteHeader(http.StatusOK)
		io.WriteString(w, "finished")
	})

	fmt.Println("start server at :8000")
	http.ListenAndServe(":8000", nil)
}

トラブル発生

ここで実行してみると、受信の方は成功したのですが、送信の方はうまくいきません。HTTP/2でないとダメだよ、というエラーが出ます。どうやら、allowHTTP1ForStreamingUploadフラグはなくなってしまったようです。これがないと、HTTP/1.1だけで双方向通信ができるという夢が潰えてしまう

いちおう、技術的にできるかどうかだけはやっておこうとHTTPS化してためしました。Viteの開発サーバーでHTTPサーバーを起動してフロントを立ちあげ、同じポートでGoのサーバーはプロキシして通していたのでCORSとかは考えずに済むというのを考えていましたが、APIサーバーだけHTTPSにするとCORS化しないとだめですよね。go-chiの力を借りました。

func main() {
	r := chi.NewRouter()
	r.Use(cors.Handler(cors.Options{
		AllowedOrigins: []string{"https://*", "http://*"},
		AllowedMethods: []string{"GET", "POST", "OPTIONS"},
		AllowedHeaders: []string{"Accept", "Authorization", "Content-Type", "X-CSRF-Token"},
		Debug:          true,
	}))

	r.Get("/api/counter", func(w http.ResponseWriter, r *http.Request) {
      // 中身はさきほどと同じ
	})

	r.Post("/api/upload-stream", func(w http.ResponseWriter, r *http.Request) {
      // 中身はさきほどと同じ
	})

	fmt.Println("start server at :8000")
	http.ListenAndServeTLS(":8000", "localhost.pem", "localhost-key.pem", r)
}

フロントのコードも、{mode: "cors"}をつけたり、ホスト名を明示する必要があります。

      // 読み込み用ストリーム
      (async () => {
        for await (const count of readonlySocket("https://localhost:8000/api/counter", counterSchema, {mode: "cors"})) {
          :
        }
      })();

      // 書き込み用ストリーム
      (async () => {
        const { write, close } = await writeonlySocket("https://localhost:8000/api/upload-stream", {mode: "cors"});
        :
      })();

いちおうこれで送受信とも通るようになりました。あとはこの送受信を合わせたコードを作れば送受信いけるね、というところの手前の確認はできましたが、まあHTTP/1.1できないならいいかな。

まとめ

allowHTTP1ForStreamingUploadフラグさえ効いていれば、開発時も簡単セットアップで、デプロイ時もTLSをハンドリングするのは最前面のCDNとかロードバランサーだけで済み、プロキシ突破もWebSocketよりも楽てきて最高!、という思いを持って取り組み始めましたが、まさかのChrome側が非対応に変わってしまうという結果になりました。EnableFullDuplex()メソッドは誰がどのタイミングで使えるのだろうか・・・この手のプログラムは理屈の上では「できるはず!」と思っても手を動かさないと全部動きます!となかなかならないあたりは試し甲斐がありますね。

もちろん、まったく無駄か、というとそういうわけではなく、WebSocketやSSEでは送信できないヘッダーを送ったり、プロキシは通り抜けやすいというメリットはあります。

明日は@yu1Ro5さんの記事です。お楽しみに。

13
2
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
13
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?