概要
大量のデータを処理するとき、メモリにすべてを展開せずにstream的に処理する必要があるときがある。
こういうstream的な処理は、大体どの言語でもあまりサンプルがなく、調べながらやるしかない。
ということでgoでやってみたのが以下。
Streamingの利点
- メモリの使用が最小限であること
- json linesフォーマットと親和性が高い
補足> json linesフォーマットとは?
jsonフォーマットはこんな感じ。
"list": [
{ "hoge": 1, "fuga": 2 },
{ "hoge": 1, "fuga": 2 },
{ "hoge": 1, "fuga": 2 },
{ "hoge": 1, "fuga": 2 },
{ "hoge": 1, "fuga": 2 }
]
json linesフォーマットはこんな感じ。
{ "hoge": 1, "fuga": 2 }
{ "hoge": 1, "fuga": 2 }
{ "hoge": 1, "fuga": 2 }
{ "hoge": 1, "fuga": 2 }
{ "hoge": 1, "fuga": 2 }
jsonファイルの拡張子は.json
だが、json linesの拡張子は.jsonl
だったり.json
だったり固定ではない模様。
Streamingの欠点
- 通常のjsonフォーマットが使えない事
- どうしてもgoroutineを使った非同期処理が必要となる事
なので、データ量が多くないなら、Streamingに処理せず普通にメモリに展開したほうが良いかもしれない。
Lambda
以下のサンプルは、Dynamodbにデータが挿入・更新されると実行される Dynamodb streamsという仕組みで動くLamba。
出力先のS3ファイルは、一行が一つのdynamodbレコードに相当するjsonであるような、json linesフォーマット。
func handler(ctx context.Context, event events.DynamoDBEvent) error {
r, w := io.Pipe()
go func() {
defer func() {
if err := w.Close(); err != nil {
log.Println("writer close error: ", err)
return
}
}()
for _, r := range event.Records {
if err := UnmarshalDynamoEvent(r.Change.NewImage, &data); err != nil {
log.Println("unmarshal dynamo event error: ", err)
w.CloseWithError(err)
return
}
b, err := json.Marshal(&data)
if err != nil {
log.Println("json marshal error: ", err)
w.CloseWithError(err)
return
}
if _, err := w.Write(b); err != nil {
log.Println("data buffer write error: ", err)
w.CloseWithError(err)
return
}
if _, err := w.Write(lf); err != nil {
log.Println("LF buffer write error: ", err)
w.CloseWithError(err)
return
}
}
}()
key := fmt.Sprintf("%s/%s/%s.json", keyPrefix, now.Format("20060102"), now.Format("150405"))
result, err := s3uploader.Upload(&s3manager.UploadInput{
Body: r,
Bucket: aws.String(bucket),
Key: aws.String(key),
})
log.Println("result:", result)
if err != nil {
return err
}
return nil
}
こんな感じでStreamingにS3に出力することができた。
ちなみにPipeは以下のようなもの。
io.Pipe
func Pipe() (*PipeReader, *PipeWriter){
io.PipeWriter
func (w *PipeWriter) Write(data []byte) (n int, err error) {
func (w *PipeWriter) Close() error {
func (w *PipeWriter) CloseWithError(err error) error {
データをWriterに書き込むわけだが、書き込み中になんらかのエラーが発生してStreamをクローズしたいとき、CloseWithErrorを使って失敗したことをReader側に通知できる。
成功したら普通にCloseで閉じる。
io.PipeReader
func (r *PipeReader) Read(data []byte) (n int, err error)
func (r *PipeReader) Close() error {
func (w *PipeReader) CloseWithError(err error) error {
Writer側で何等かのエラーが発生(CloseWithError)すると、Readに失敗してerrが帰ってくる。なので、エラーが帰ってきたら処理を中断する。
Read側でエラーが発生してWriter側に通知したい場合はこちらのCloseWithErrorを使うことになるが、main threadをRead側にするのかWrite側にするのかで通知の方向が変わりそう。
結論
-
io.Pipe
を使う事で、データを書きこみながらio.Readerに流すことができる - Write中に処理を中断する必要がある場合、
CloseWithError
でReaderに通知する - json lines形式を入出力に使えば、streamingな処理を書きやすい
- writerのCloseを忘れずに!(closeを忘れるとdeadlockになる)
goroutineを使うコードを書くのは抵抗があるが、こういうコードを書く場合逆にgoroutineを使ったほうが綺麗に書けるきもする。
golangと仲良くなりたいが、悩みはつきない・・・