tl;dr
io.Pipeを使えば解決
きっかけ
RaspberypiやOrangePi等のワンボードPCからタイムラプスを撮影していて
このデータをStreamableという動画サービスに自動投稿してました。しかしタイムラプスの撮影時間によってはメモリ不足で上手く動かないことが。
これを解決できないかと考えました。
元の実装
纏めるとこういうことになっていました
(1) bufferを準備(bytes.Buffer)
(2) bufferに対して動画等データなどの書き込み処理
(3) bufferをPOST処理に渡す。
動画のサイズが大きいとbufferが実機のメモリを超えてしまい上手く動かなくなったようです
実装変更内容
(1) のbufferの代わりにio.Pipeのwriter側を準備
(2)の処理をgo routineとして並列実行
(3)のPOST処理にio.PipeのReader側を渡す
つまりPOST処理でデータが必要なタイミングではじめてfileからデータを読み込む動作になるわけです。
ファイルを読み込みながらネットワークへの送信を行う挙動となるため ファイルサイズ分のバッファは不要となります
遅延評価というやつです
と、ここまで順調かと思われていましたがどうしてもサーバのステータスコードが403(Bad request)になってしまいます。
//既存
&http.Request{Method:"POST", URL:(*url.URL)(0xc420514080), Proto:"HTTP/1.1", ProtoMajor:1, ProtoMinor:1, Header:http.Header{"Content-Type":[]string{"multipart/form-data; boundary=7cdd6ce13e672ce350d9de8319029e6e786e822c4bf7a3b7090084ce7dc6"}}, Body:ioutil.nopCloser{Reader:(*bytes.Buffer)(0xc4200c0e70)}, GetBody:(func() (io.ReadCloser, error))(0x1234c50), ContentLength:1222787, TransferEncoding:[]string(nil), Close:false, Host:"api.streamable.com", Form:url.Values(nil), PostForm:url.Values(nil), MultipartForm:(*multipart.Form)(nil), Trailer:http.Header(nil), RemoteAddr:"", RequestURI:"", TLS:(*tls.ConnectionState)(nil), Cancel:(<-chan struct {})(nil), Response:(*http.Response)(nil), ctx:context.Context(nil)}
//Pipe
&http.Request{Method:"POST", URL:(*url.URL)(0xc42001ec80), Proto:"HTTP/1.1", ProtoMajor:1, ProtoMinor:1, Header:http.Header{"Content-Type":[]string{"multipart/form-data; boundary=5ac1d8e294e1ca308ee67468fa0e3236a9d0671165bc1d0d29e60d9efd33"}}, Body:(*io.PipeReader)(0xc42000e0d0), GetBody:(func() (io.ReadCloser, error))(nil), ContentLength:0, TransferEncoding:[]string(nil), Close:false, Host:"api.streamable.com", Form:url.Values(nil), PostForm:url.Values(nil), MultipartForm:(*multipart.Form)(nil), Trailer:http.Header(nil), RemoteAddr:"", RequestURI:"", TLS:(*tls.ConnectionState)(nil), Cancel:(<-chan struct {})(nil), Response:(*http.Response)(nil), ctx:context.Context(nil)}
比較すると、ContentLength:0
になっています。
仕方がないのでContent-lengthを自前で計算する処理も作ります
以下変更内容になります
変更前
(1) bufferを準備(bytes.Buffer)
(2) bufferに対して動画等データなどの書き込み処理
(3) bufferをPOST処理に渡す。
func uploadVideo(creds Credentials, filePath string) (VideoInfo, error) {
if _, err := os.Stat(filePath); os.IsNotExist(err) {
return VideoInfo{}, err
}
var buf bytes.Buffer // (1) bufferを準備
// (2) bufferに対して動画等データなどの書き込み処理 --ここから --
multipartWriter := multipart.NewWriter(&buf)
fileHandle, err := os.Open(filePath)
if err != nil {
return VideoInfo{}, err
}
fileWriter, err := multipartWriter.CreateFormFile("file", filePath)
if err != nil {
return VideoInfo{}, err
}
_, err = io.Copy(fileWriter, fileHandle)
if err != nil {
return VideoInfo{}, err
}
multipartWriter.Close()
//(2) bufferに対して動画等データなどの書き込み処理 --ここまで --
//(3) bufferをPOST処理に渡す。ここから最後まで
req, err := http.NewRequest("POST", uploadURL, &buf)
if err != nil {
return VideoInfo{}, err
}
authenticateHTTPRequest(req, creds)
req.Header.Set("Content-Type", multipartWriter.FormDataContentType())
client := http.DefaultClient
res, err := client.Do(req)
if err != nil {
return VideoInfo{}, err
}
defer res.Body.Close()
if res.StatusCode != http.StatusOK {
return VideoInfo{}, fmt.Errorf("upload failed")
}
bodyBytes, err := ioutil.ReadAll(res.Body)
if err != nil {
return VideoInfo{}, err
}
body := bytesToString(bodyBytes)
videoRes, err := videoResponseFromJSON(body)
if err != nil {
return VideoInfo{}, err
}
return videoRes, nil
}
変更後
(1) のbufferの代わりにio.Pipeのwriter側を準備
(2)の処理をgo routineとして並列実行
(3)のrequest処理にio.PipeのReader側を渡す
// 自前計算処理
func contentLength(fileSize int64, path string) int64 {
var buf bytes.Buffer
multipartWriter := multipart.NewWriter(&buf)
multipartWriter.CreateFormFile("file", path)
multipartWriter.Close()
return int64(buf.Len()) + fileSize
}
func uploadVideoLite(creds Credentials, filePath string) (VideoInfo, error) {
if _, err := os.Stat(filePath); os.IsNotExist(err) {
return VideoInfo{}, err
}
fileHandle, err := os.Open(filePath)
if err != nil {
fmt.Printf("error: %s", err.Error())
}
// (1) のbufferの代わりにio.Pipeのwriter側を準備
pipeReader, pipeWriter := io.Pipe()
multipartWriter := multipart.NewWriter(pipeWriter)
stat, _ := fileHandle.Stat()
// (2)の処理(ファイルをWriterに書き込む処理)をgo routineとして並列実行
go func() {
defer pipeWriter.Close()
fileWriter, err := multipartWriter.CreateFormFile("file", filePath)
if err != nil {
fmt.Printf("error: %s", err.Error())
}
_, err = io.Copy(fileWriter, fileHandle)
if err != nil {
fmt.Printf("error: %s", err.Error())
}
if err := multipartWriter.Close(); err != nil {
fmt.Printf("error: %s", err.Error())
}
}()
//(3)のrequest処理にio.PipeのReader側を渡す
req, err := http.NewRequest("POST", uploadURL, pipeReader)
if err != nil {
return VideoInfo{}, err
}
authenticateHTTPRequest(req, creds)
req.Header.Set("Content-Type", multipartWriter.FormDataContentType())
// 自前計算
req.ContentLength = contentLength(stat.Size(), filePath)
client := http.DefaultClient
res, err := client.Do(req)
if err != nil {
return VideoInfo{}, err
}
defer res.Body.Close()
if res.StatusCode != http.StatusOK {
return VideoInfo{}, fmt.Errorf("upload failed StatusCode:%d", res.StatusCode)
}
bodyBytes, err := ioutil.ReadAll(res.Body)
if err != nil {
return VideoInfo{}, err
}
body := bytesToString(bodyBytes)
videoRes, err := videoResponseFromJSON(body)
if err != nil {
return VideoInfo{}, err
}
return videoRes, nil
}
最後に
最初、GoでChannelを介してio.Readerとio.Writerをつなげれば
遅延評価な感じで処理できて省メモリにできるに違いない。と思ってio.Pipe()を一から作ろうとしていました。
だいたいそういうのは既にあるんですね。
それとメモリの使用状況も以下の通り516MBから1.4MBまで減少しました。
速度は測ってないです。