LoginSignup
13
15

More than 5 years have passed since last update.

大きなファイルのアップロードを省メモリで行いたい(io.Pipeを使う)

Last updated at Posted at 2017-04-09

tl;dr

io.Pipeを使えば解決

きっかけ

RaspberypiやOrangePi等のワンボードPCからタイムラプスを撮影していて
このデータをStreamableという動画サービスに自動投稿してました。しかしタイムラプスの撮影時間によってはメモリ不足で上手く動かないことが。

これを解決できないかと考えました。

元の実装

纏めるとこういうことになっていました

(1) bufferを準備(bytes.Buffer)
(2) bufferに対して動画等データなどの書き込み処理
(3) bufferをPOST処理に渡す。

動画のサイズが大きいとbufferが実機のメモリを超えてしまい上手く動かなくなったようです

実装変更内容

(1) のbufferの代わりにio.Pipeのwriter側を準備
(2)の処理をgo routineとして並列実行
(3)のPOST処理にio.PipeのReader側を渡す

つまりPOST処理でデータが必要なタイミングではじめてfileからデータを読み込む動作になるわけです。

ファイルを読み込みながらネットワークへの送信を行う挙動となるため ファイルサイズ分のバッファは不要となります
遅延評価というやつです

と、ここまで順調かと思われていましたがどうしてもサーバのステータスコードが403(Bad request)になってしまいます。


//既存
&http.Request{Method:"POST", URL:(*url.URL)(0xc420514080), Proto:"HTTP/1.1", ProtoMajor:1, ProtoMinor:1, Header:http.Header{"Content-Type":[]string{"multipart/form-data; boundary=7cdd6ce13e672ce350d9de8319029e6e786e822c4bf7a3b7090084ce7dc6"}}, Body:ioutil.nopCloser{Reader:(*bytes.Buffer)(0xc4200c0e70)}, GetBody:(func() (io.ReadCloser, error))(0x1234c50), ContentLength:1222787, TransferEncoding:[]string(nil), Close:false, Host:"api.streamable.com", Form:url.Values(nil), PostForm:url.Values(nil), MultipartForm:(*multipart.Form)(nil), Trailer:http.Header(nil), RemoteAddr:"", RequestURI:"", TLS:(*tls.ConnectionState)(nil), Cancel:(<-chan struct {})(nil), Response:(*http.Response)(nil), ctx:context.Context(nil)}


//Pipe
&http.Request{Method:"POST", URL:(*url.URL)(0xc42001ec80), Proto:"HTTP/1.1", ProtoMajor:1, ProtoMinor:1, Header:http.Header{"Content-Type":[]string{"multipart/form-data; boundary=5ac1d8e294e1ca308ee67468fa0e3236a9d0671165bc1d0d29e60d9efd33"}}, Body:(*io.PipeReader)(0xc42000e0d0), GetBody:(func() (io.ReadCloser, error))(nil), ContentLength:0, TransferEncoding:[]string(nil), Close:false, Host:"api.streamable.com", Form:url.Values(nil), PostForm:url.Values(nil), MultipartForm:(*multipart.Form)(nil), Trailer:http.Header(nil), RemoteAddr:"", RequestURI:"", TLS:(*tls.ConnectionState)(nil), Cancel:(<-chan struct {})(nil), Response:(*http.Response)(nil), ctx:context.Context(nil)}

比較すると、ContentLength:0になっています。

仕方がないのでContent-lengthを自前で計算する処理も作ります

以下変更内容になります

変更前

(1) bufferを準備(bytes.Buffer)
(2) bufferに対して動画等データなどの書き込み処理
(3) bufferをPOST処理に渡す。


func uploadVideo(creds Credentials, filePath string) (VideoInfo, error) {
    if _, err := os.Stat(filePath); os.IsNotExist(err) {
        return VideoInfo{}, err
    }

    var buf bytes.Buffer // (1) bufferを準備

    // (2) bufferに対して動画等データなどの書き込み処理  --ここから --
    multipartWriter := multipart.NewWriter(&buf)

    fileHandle, err := os.Open(filePath)
    if err != nil {
        return VideoInfo{}, err
    }

    fileWriter, err := multipartWriter.CreateFormFile("file", filePath)
    if err != nil {
        return VideoInfo{}, err
    }

    _, err = io.Copy(fileWriter, fileHandle)
    if err != nil {
        return VideoInfo{}, err
    }

    multipartWriter.Close()
    //(2) bufferに対して動画等データなどの書き込み処理  --ここまで --

    //(3) bufferをPOST処理に渡す。ここから最後まで
    req, err := http.NewRequest("POST", uploadURL, &buf)
    if err != nil {
        return VideoInfo{}, err
    }

    authenticateHTTPRequest(req, creds)

    req.Header.Set("Content-Type", multipartWriter.FormDataContentType())

    client := http.DefaultClient
    res, err := client.Do(req)
    if err != nil {
        return VideoInfo{}, err
    }
    defer res.Body.Close()

    if res.StatusCode != http.StatusOK {
        return VideoInfo{}, fmt.Errorf("upload failed")
    }

    bodyBytes, err := ioutil.ReadAll(res.Body)
    if err != nil {
        return VideoInfo{}, err
    }

    body := bytesToString(bodyBytes)

    videoRes, err := videoResponseFromJSON(body)
    if err != nil {
        return VideoInfo{}, err
    }

    return videoRes, nil
}

変更後

(1) のbufferの代わりにio.Pipeのwriter側を準備
(2)の処理をgo routineとして並列実行
(3)のrequest処理にio.PipeのReader側を渡す



// 自前計算処理
func contentLength(fileSize int64, path string) int64 {
    var buf bytes.Buffer
    multipartWriter := multipart.NewWriter(&buf)
    multipartWriter.CreateFormFile("file", path)
    multipartWriter.Close()
    return int64(buf.Len()) + fileSize
}


func uploadVideoLite(creds Credentials, filePath string) (VideoInfo, error) {
    if _, err := os.Stat(filePath); os.IsNotExist(err) {
        return VideoInfo{}, err
    }

    fileHandle, err := os.Open(filePath)
    if err != nil {
        fmt.Printf("error: %s", err.Error())
    }

    // (1) のbufferの代わりにio.Pipeのwriter側を準備
    pipeReader, pipeWriter := io.Pipe()
    multipartWriter := multipart.NewWriter(pipeWriter)
    stat, _ := fileHandle.Stat()


    // (2)の処理(ファイルをWriterに書き込む処理)をgo routineとして並列実行
    go func() {
        defer pipeWriter.Close()

        fileWriter, err := multipartWriter.CreateFormFile("file", filePath)
        if err != nil {
            fmt.Printf("error: %s", err.Error())
        }

        _, err = io.Copy(fileWriter, fileHandle)
        if err != nil {
            fmt.Printf("error: %s", err.Error())
        }

        if err := multipartWriter.Close(); err != nil {
            fmt.Printf("error: %s", err.Error())
        }

    }()

    //(3)のrequest処理にio.PipeのReader側を渡す
    req, err := http.NewRequest("POST", uploadURL, pipeReader)
    if err != nil {
        return VideoInfo{}, err
    }

    authenticateHTTPRequest(req, creds)

    req.Header.Set("Content-Type", multipartWriter.FormDataContentType())

    // 自前計算
    req.ContentLength = contentLength(stat.Size(), filePath)

    client := http.DefaultClient

    res, err := client.Do(req)
    if err != nil {
        return VideoInfo{}, err
    }
    defer res.Body.Close()

    if res.StatusCode != http.StatusOK {
        return VideoInfo{}, fmt.Errorf("upload failed StatusCode:%d", res.StatusCode)
    }

    bodyBytes, err := ioutil.ReadAll(res.Body)
    if err != nil {
        return VideoInfo{}, err
    }

    body := bytesToString(bodyBytes)
    videoRes, err := videoResponseFromJSON(body)
    if err != nil {
        return VideoInfo{}, err
    }

    return videoRes, nil

}

最後に

最初、GoでChannelを介してio.Readerとio.Writerをつなげれば
遅延評価な感じで処理できて省メモリにできるに違いない。と思ってio.Pipe()を一から作ろうとしていました。

だいたいそういうのは既にあるんですね。

それとメモリの使用状況も以下の通り516MBから1.4MBまで減少しました。
速度は測ってないです。

pprof001_svg_と_pprof001_svg.png

13
15
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
13
15