21
14

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

Google Cloud Platform(2) Advent Calendar 2016 9日目を担当する avvmoto です。よろしくお願いします。

システム開発をしていると、大きなサイズの Zip ファイルを取り扱いうときがあると思います。
GAE だとローカルストレージがないので、もし対象の Zip を全部メモリ上に一旦展開できれば、問題は楽です。しかしメモリに乗り切らない Zip を扱う場合には工夫が必要になります。

本日は GAE/Go でこの問題を解決する方法を取り扱います。

やりたいこと

  • GCS 上のメモリより多なサイズの解凍したい
  • GCS 上へメモリより多なサイズの Zip を出力して書き込みたい

前提

  • TaskQueue 等で実行するので、実行時間はさほど考慮しなくてもよい
  • 実行時のメモリ消費量を、GAE のメモリの範囲内に収めたい
  • Zip の読み書きには標準ライブラリの archive/zip (以下 archive/zip ) を用いる
  • GCS の読み書きには cloud.google.com/go/storage (以下 storage ) を用いる

必要になる工夫

zip.Reader のため、 GCS の io.ReaderAt を作成する

まず Zip の解凍のためのインターフェースが提供される zip.Readerを作成したいです。

そのためには io.ReaderAtが要求されるのですが、 storage にはこれは提供されていません。

ちなみに io.ReaderAt とは、なじみのある io.ReaderRead() がシーケンシャルな読み取りを提供するのに対し、 ランダムリードである ReaderAt()を提供します。

type Reader interface {
        Read(p []byte) (n int, err error)
}

type ReaderAt interface {
        ReadAt(p []byte, off int64) (n int, err error)
}

そこで、 storage にもランダムリードのインターフェース func (*ObjectHandle) NewRangeReader が提供されているので、これを io.ReaderAt のインターフェースに沿うようにラップします。

type GCSReaderAt struct {
	ctx context.Context
	obj *storage.ObjectHandle
}

func (g *GCSReaderAt) ReadAt(b []byte, off int64) (n int, err error) {
	rc, err := g.obj.NewRangeReader(g.ctx, off, int64(len(b)))
	if err != nil {
		return
	}
	defer rc.Close()

	return rc.Read(b)
}

これで、 zip.Reader が作成できるようになりました。
しかし、次の問題が発生するため、まだ巨大なファイルだと読み取りができません。

io.ReaderAt のバッファリング機能を実装する

上記 GCSReaderAt を用いて GCS 上の Zip ファイルを解凍しようとすると、当方の環境では Call error 11: Deadline exceeded (timeout) のエラーが頻出しました。

これは恐らく archive/zipRead() の実装上、小さすぎるサイズずつ io.ReadAt() 経由で GCS を読みに行っているためです。仮にリトライの処理を入れたとしても、ネットワークアクセスの回数が多すぎて非常に遅くなるでしょうし、負荷も大きくなります。

ちなみに、きちんとコード追えてはいないのですが、 archive/zipこのあたりの処理 が、一度の読み取るサイズが小さすぎて刺さってるのかなと踏んでいます。このサイズは外部から指定できず、困ったことになります。

そこで、zip.NewReader() に渡す io.ReaderAt についてですが、前述の GCSReaderAt を用いるのではなく、いったん間に io.ReaderAt のバッファリング機能を挟むことにしましょう。これでネットワークアクセスの回数が大幅に削減でき、高速化や安定動作が期待できます。

なお、おなじみの io.ReadAt でしたらバッファリング機能が bufio で提供されています。 io.ReaderAt のバッファリング機能は見当たらなかったので、私は自前で作ることにしました。avvmoto/buf-readerat として公開してありますので、今回はこれを利用することにしましょう。

なお、ランダムリードのバッファリングは、例えば巨大なファイルに対しすこぶるランダムなランダムリードを実施したら、バッファが殆ど機能しないというのは容易に予想がつきます。ただ、詳細は省きますが、Zip の解凍処理はアルゴリズム上、比較的連続して続けて読む処理が多いことが期待できます。

最終的に、 zip.NewReaderAt に渡す io.ReaderAt は以下のようにしましょう。前述の GCSReaderAtrowGCSReaderAt とし、 bufra でバッファリングして再度 GCSReaderAt を定義してあります。

import (
	"cloud.google.com/go/storage"
	bufra "github.com/avvmoto/buf-readerat"
	"golang.org/x/net/context"
)

var cacheSize = 10 * 1024 * 1024

type GCSReaderAt struct {
	cache *bufra.BufReaderAt
}

// wrap rowGCSReaderAt for buffering
func NewGCSReaderAt(ctx context.Context, obj *storage.ObjectHandle) *GCSReaderAt {
	rowReaderAt := newRowGCSReaderAt(ctx, obj)
	r := bufra.NewBufReaderAt(rowReaderAt, cacheSize)

	return &GCSReaderAt{
		cache: r,
	}
}

func (ra *GCSReaderAt) ReadAt(b []byte, off int64) (n int, err error) {
	return ra.cache.ReadAt(b, off)
}

// wrap storage.NewRangeReader to use as io.ReaderAt
type rowGCSReaderAt struct {
	ctx context.Context
	obj *storage.ObjectHandle
}

func newRowGCSReaderAt(ctx context.Context, obj *storage.ObjectHandle) *rowGCSReaderAt {
	return &rowGCSReaderAt{
		ctx: ctx,
		obj: obj,
	}
}

func (ra *rowGCSReaderAt) ReadAt(b []byte, off int64) (n int, err error) {
	rc, err := ra.obj.NewRangeReader(ra.ctx, off, int64(len(b)))
	if err != nil {
		return
	}
	defer rc.Close()

	n, err = rc.Read(b)
	return
}

これで巨大な Zip ファイルも読み取りできるようになりました。なお、今回は割愛していますが、ネットワーク越しのアクセスとなるので、リトライ処理を入れるのがよいです。

zip.Writer を用いて GCS上にzip を書き込む

今度は Zip ファイルを GCS に作成する方法を考えましょう。とはいえこちらは簡単で、 必要になる io.Writer が既に storage実装されているので、これをそのまま func NewWriter(w io.Writer) *Writer に渡せば良いです。

wc := obj.NewWriter(ctx)
zw := zip.NewWriter(wc)
defer wc.Close()
defer zw.Close()

ただし、 zip.Writer は内部で bufio を利用しています。つまり、 func (*Writer) Close が呼ばれるまで生成した Zip をメモリ上にバッファリングしており、引数に渡した wc へと書き込みを行いません。書き込む Zip ファイルの大きさがメモリより大きいときは、途中で func (*Writer) Flush を呼び、明示的に書き込みとメモリ解放をしてもらい、メモリの消費を押さえる必要があります。

w, err := zw.Create("newfile")
if err != nil {
	panic("some")
}

for i := 0; i < 1000000; i++ {
	w.Write(buf)

	err := zw.Flush() // ここで Flush
	if err != nil {
		panic("some")
	}
}

まとめ

GAE でメモリに乗り切らない Zip ファイルを読み書きする工夫を紹介し、必要となった io.ReadAt のバッファである弊作 avvmoto/buf-readerat を紹介しました。

不正確な点や改善点がありましたら、何卒ご教授のほどよろしくお願いします m(_ _)m

参考文献

io - The Go Programming Language:
zip - The Go Programming Language:
storage - GoDoc:
avvmoto/buf-readerat

21
14
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
21
14

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?