はじめに
Go言語で圧縮ファイル展開処理やフォーマット変換処理などをio.Reader
, io.Writer
を使いストリーミングで処理できるサンプルを作ってみました。
ストリーミングでの処理をすることで処理元のデータサイズに依存せずに処理を行うことができます。
処理内容
以下の一連の処理を行います。
- S3(Minio)からgzipファイルをダウンロード
- gzipファイルを展開しJSONファイルを出力
- ノーマライズ処理を行い、全角数字を半角に変換
- JSONからCSVに変換
- PostgreSQLに対しCOPYコマンドでバルクインサート
利用データ
以下の郵便番号データをJSONファイルに加工後、gzip圧縮をしたものをインプットデータとしました。
http://zipcloud.ibsnet.co.jp
動作コード
package main
import (
"compress/gzip"
"encoding/json"
"io"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/gocarina/gocsv"
"github.com/jackc/pgx"
"golang.org/x/text/transform"
"golang.org/x/text/unicode/norm"
)
type ZipAddress struct {
ZipCode string `json csv:"zipcode"`
Address1 string `json csv:"address1"`
Address2 string `json csv:"address2"`
Address3 string `json csv:"address3"`
}
func main() {
// S3からファイル取得
fr, err := readFromS3()
if err != nil {
panic(err)
}
defer fr.Close()
// gzipファイルの展開
gr, err := gzip.NewReader(fr)
if err != nil {
panic(err)
}
defer gr.Close()
// ノーマライズ処理
tr := transform.NewReader(gr, norm.NFKC)
pr, pw := io.Pipe()
defer pr.Close()
// JSON ⇒ CSV変換処理
go func() {
defer pw.Close()
if err := json2csv(tr, pw); err != nil {
pw.CloseWithError(err)
}
}()
// PostgreSQLにCOPYコマンド実施
if err := pgCopy(pr); err != nil {
panic(err)
}
}
func readFromS3() (io.ReadCloser, error) {
s, err := session.NewSession()
if err != nil {
return nil, err
}
cfg := &aws.Config{
Credentials: credentials.NewStaticCredentials("MINIO_ACCESS_KEY", "MINIO_SECRET_KEY", ""),
Region: aws.String("ap-northeast-1"),
Endpoint: aws.String("http://127.0.0.1:9000"),
S3ForcePathStyle: aws.Bool(true),
}
obj, err := s3.New(s, cfg).GetObject(&s3.GetObjectInput{
Bucket: aws.String("test-bucket"),
Key: aws.String("ken_sample.jsonl.gz"),
})
if err != nil {
return nil, err
}
return obj.Body, nil
}
func json2csv(r io.Reader, w io.Writer) error {
jsonDecoder := json.NewDecoder(r)
for {
zs := []*ZipAddress{{}}
err := jsonDecoder.Decode(zs[0])
if err != nil {
if err == io.EOF {
return nil
}
return err
}
if err := gocsv.MarshalWithoutHeaders(zs, w); err != nil {
return err
}
}
}
func pgCopy(r io.Reader) error {
conn, err := pgx.Connect(pgx.ConnConfig{
Host: "localhost",
Port: 5432,
User: "postgres",
Password: "postgres",
})
defer conn.Close()
_, err = conn.CopyFromReader(r, "COPY zip_address (zipcode, address1, address2, address3) FROM STDIN WITH CSV")
return err
}
変換の遷移
-
gzipファイル
(省略) -
gzip展開後JSONファイル
{"zipcode":"0600000","address1":"北海道","address2":"札幌市中央区","address3":"以下に掲載がない場合"}
{"zipcode":"0640941","address1":"北海道","address2":"札幌市中央区","address3":"旭ケ丘"}
{"zipcode":"0600041","address1":"北海道","address2":"札幌市中央区","address3":"大通東"}
{"zipcode":"0600042","address1":"北海道","address2":"札幌市中央区","address3":"大通西(1〜19丁目)"}
{"zipcode":"0640820","address1":"北海道","address2":"札幌市中央区","address3":"大通西(20〜28丁目)"}
- ノーマライズ後JSONファイル
数字が半角数字に変換されてます。
{"zipcode":"0600000","address1":"北海道","address2":"札幌市中央区","address3":"以下に掲載がない場合"}
{"zipcode":"0640941","address1":"北海道","address2":"札幌市中央区","address3":"旭ケ丘"}
{"zipcode":"0600041","address1":"北海道","address2":"札幌市中央区","address3":"大通東"}
{"zipcode":"0600042","address1":"北海道","address2":"札幌市中央区","address3":"大通西(1〜19丁目)"}
{"zipcode":"0640820","address1":"北海道","address2":"札幌市中央区","address3":"大通西(20〜28丁目)"}
- CSV変換後ファイル
0600000,北海道,札幌市中央区,以下に掲載がない場合
0640941,北海道,札幌市中央区,旭ケ丘
0600041,北海道,札幌市中央区,大通東
0600042,北海道,札幌市中央区,大通西(1〜19丁目)
0640820,北海道,札幌市中央区,大通西(20〜28丁目)
- PostgreSQLテーブル
各取得・変換処理の簡単な解説
1. S3(Minio)からgzipファイルをダウンロード
S3からのファイルのダウンロードとして (*S3).GetObjectの他に(Downloader).Downloadの方法があります。
(Downloader).Download
の方は引数に任意の地点に対し書き込み可能な io.WriterAt
が必要です。
今回はストリーム処理を行い過去データに遡っての書き込みはできないため (*S3).GetObject
での実装を選択しました。
また、検証ではminioというS3互換のサービスを利用しています。
2. gzipファイルを展開しJSONファイルを出力
gzipの展開は標準のgzipパッケージを利用しています。
gzipファイルではなくzipファイルをサンプルとして使いたかったのですが、io.Reader
ではなく任意の地点を読み込むことのできる io.ReaderAt
が引数に必要なためgzipをサンプルとして採用しました。
返り値には、展開後のファイルの io.ReadCloser
が返ります。(サンプルコードの変数名 gr
)
gzipに複数ファイルがある場合には実装方法が若干ことなるのでご注意ください。
3. ノーマライズ処理を行い、全角数字を半角に変換
go言語ではサブパッケージのtransformパッケージで簡単にストリーミングしつつノーマライズする処理が書けます。
今回は NFKC
の手法の正規化をしています。
transformパッケージは文字コードの変換処理でもよく使います。
記事では文字コードの変換は分かりにくいため省きましたが、以下のように書くと簡単に文字コードの変換が可能です。
sjisWriter := transform.NewWriter(utf8Writer, japanese.ShiftJIS.NewEncoder())
4. JSONからCSVに変換
JSONファイルの読み込みは標準のjsonパッケージを利用しています。
よくサンプルとして利用される json.Unmarshal
は引数がバイト配列のため io.Reader
を利用できる json.NewDecoder
を利用しています。
CSVの書き込みにはタグが利用できるCSVライブラリのgocsvを利用しています。
良い書き方が思いつかず、1行ずつJSONを読み込みヘッダ無しのCSVを io.Writer
に出力しています。
(もっと良い書き方があれば教えて下さい。)
後続のPostgreSQLのコピーコマンド実行時もストリーミングで変換処理を行う必要があるため、go routineにより動かしています。
※一行がJSONになっているため、実際はJSON Lineのファイルですが、分かりやすさのためにJSONとして説明してます。
5. PostgreSQLに対しCOPYコマンドでバルクインサート
標準のsqlパッケージはPostgreSQLのCOPYコマンドに対応しておらず、他のPostgreSQLクライアントのpgパッケージはメンテナンスモードのためpgxパッケージを利用しています。
pgxパッケージのv4ではCOPYコマンドの引数に直接 io.Reader
が指定できる関数が無かったため、シンプルに記載可能なv3のpgxパッケージを利用しています。
pgx v4での実装サンプルを見たい方はこちらの記事をご確認ください。
CSVの出力が Reader
ではなく Writer
であり、変換するため io.Pipe
を利用しています。
まとめ
本記事では io.Reader
を中心としたストリーミング処理のサンプルと簡単な解説を行いました。
ストリーミング処理を使うことで、S3からファイルを取得してからDBに書き込むまで、中間ファイルを作らず、全量をメモリに載せないように処理できます。
特に大量データをメモリに乗せてOOMエラーなどが発生した際などに参考にしていただければ幸いです。