12
6

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

[Golang] aws Lambda で、s3へファイルをstreamingに出力する

Last updated at Posted at 2020-02-05

概要

大量のデータを処理するとき、メモリにすべてを展開せずにstream的に処理する必要があるときがある。
こういうstream的な処理は、大体どの言語でもあまりサンプルがなく、調べながらやるしかない。
ということでgoでやってみたのが以下。

Streamingの利点

  • メモリの使用が最小限であること
  • json linesフォーマットと親和性が高い

補足> json linesフォーマットとは?

jsonフォーマットはこんな感じ。

"list": [
  { "hoge": 1, "fuga": 2 },
  { "hoge": 1, "fuga": 2 },
  { "hoge": 1, "fuga": 2 },
  { "hoge": 1, "fuga": 2 },
  { "hoge": 1, "fuga": 2 }
]

json linesフォーマットはこんな感じ。

{ "hoge": 1, "fuga": 2 }
{ "hoge": 1, "fuga": 2 }
{ "hoge": 1, "fuga": 2 }
{ "hoge": 1, "fuga": 2 }
{ "hoge": 1, "fuga": 2 }

jsonファイルの拡張子は.jsonだが、json linesの拡張子は.jsonlだったり.jsonだったり固定ではない模様。

Streamingの欠点

  • 通常のjsonフォーマットが使えない事
  • どうしてもgoroutineを使った非同期処理が必要となる事

なので、データ量が多くないなら、Streamingに処理せず普通にメモリに展開したほうが良いかもしれない。

Lambda

以下のサンプルは、Dynamodbにデータが挿入・更新されると実行される Dynamodb streamsという仕組みで動くLamba。
出力先のS3ファイルは、一行が一つのdynamodbレコードに相当するjsonであるような、json linesフォーマット。

func handler(ctx context.Context, event events.DynamoDBEvent) error {
	r, w := io.Pipe()

	go func() {
		defer func() {
			if err := w.Close(); err != nil {
				log.Println("writer close error: ", err)
				return
			}
		}()
		for _, r := range event.Records {
			if err := UnmarshalDynamoEvent(r.Change.NewImage, &data); err != nil {
				log.Println("unmarshal dynamo event error: ", err)
				w.CloseWithError(err)
				return
			}
			b, err := json.Marshal(&data)
			if err != nil {
				log.Println("json marshal error: ", err)
				w.CloseWithError(err)
				return
			}
			if _, err := w.Write(b); err != nil {
				log.Println("data buffer write error: ", err)
				w.CloseWithError(err)
				return
			}
			if _, err := w.Write(lf); err != nil {
				log.Println("LF buffer write error: ", err)
				w.CloseWithError(err)
				return
			}
		}
	}()

	key := fmt.Sprintf("%s/%s/%s.json", keyPrefix, now.Format("20060102"), now.Format("150405"))
	result, err := s3uploader.Upload(&s3manager.UploadInput{
		Body:   r,
		Bucket: aws.String(bucket),
		Key:    aws.String(key),
	})
 	log.Println("result:", result)
	if err != nil {
		return err
	}
	return nil
}

こんな感じでStreamingにS3に出力することができた。
ちなみにPipeは以下のようなもの。

io.Pipe

func Pipe() (*PipeReader, *PipeWriter){

io.PipeWriter

func (w *PipeWriter) Write(data []byte) (n int, err error) {
func (w *PipeWriter) Close() error {
func (w *PipeWriter) CloseWithError(err error) error {

データをWriterに書き込むわけだが、書き込み中になんらかのエラーが発生してStreamをクローズしたいとき、CloseWithErrorを使って失敗したことをReader側に通知できる。
成功したら普通にCloseで閉じる。

io.PipeReader

func (r *PipeReader) Read(data []byte) (n int, err error)
func (r *PipeReader) Close() error {
func (w *PipeReader) CloseWithError(err error) error {

Writer側で何等かのエラーが発生(CloseWithError)すると、Readに失敗してerrが帰ってくる。なので、エラーが帰ってきたら処理を中断する。
Read側でエラーが発生してWriter側に通知したい場合はこちらのCloseWithErrorを使うことになるが、main threadをRead側にするのかWrite側にするのかで通知の方向が変わりそう。

結論

  1. io.Pipeを使う事で、データを書きこみながらio.Readerに流すことができる
  2. Write中に処理を中断する必要がある場合、CloseWithErrorでReaderに通知する
  3. json lines形式を入出力に使えば、streamingな処理を書きやすい
  4. writerのCloseを忘れずに!(closeを忘れるとdeadlockになる)

goroutineを使うコードを書くのは抵抗があるが、こういうコードを書く場合逆にgoroutineを使ったほうが綺麗に書けるきもする。
golangと仲良くなりたいが、悩みはつきない・・・

12
6
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
12
6

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?