2
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

Goで多段のファイル変換処理をしてみた

Last updated at Posted at 2021-12-15

はじめに

Go言語で圧縮ファイル展開処理やフォーマット変換処理などをio.Reader, io.Writerを使いストリーミングで処理できるサンプルを作ってみました。
ストリーミングでの処理をすることで処理元のデータサイズに依存せずに処理を行うことができます。

処理内容

以下の一連の処理を行います。

  1. S3(Minio)からgzipファイルをダウンロード
  2. gzipファイルを展開しJSONファイルを出力
  3. ノーマライズ処理を行い、全角数字を半角に変換
  4. JSONからCSVに変換
  5. PostgreSQLに対しCOPYコマンドでバルクインサート

image.png

利用データ

以下の郵便番号データをJSONファイルに加工後、gzip圧縮をしたものをインプットデータとしました。
http://zipcloud.ibsnet.co.jp

動作コード

package main

import (
	"compress/gzip"
	"encoding/json"
	"io"

	"github.com/aws/aws-sdk-go/aws"
	"github.com/aws/aws-sdk-go/aws/credentials"
	"github.com/aws/aws-sdk-go/aws/session"
	"github.com/aws/aws-sdk-go/service/s3"
	"github.com/gocarina/gocsv"
	"github.com/jackc/pgx"
	"golang.org/x/text/transform"
	"golang.org/x/text/unicode/norm"
)

type ZipAddress struct {
	ZipCode  string `json csv:"zipcode"`
	Address1 string `json csv:"address1"`
	Address2 string `json csv:"address2"`
	Address3 string `json csv:"address3"`
}

func main() {
	// S3からファイル取得
	fr, err := readFromS3()
	if err != nil {
		panic(err)
	}
	defer fr.Close()

	// gzipファイルの展開
	gr, err := gzip.NewReader(fr)
	if err != nil {
		panic(err)
	}
	defer gr.Close()

	// ノーマライズ処理
	tr := transform.NewReader(gr, norm.NFKC)

	pr, pw := io.Pipe()
	defer pr.Close()

	// JSON ⇒ CSV変換処理
	go func() {
		defer pw.Close()
		if err := json2csv(tr, pw); err != nil {
			pw.CloseWithError(err)
		}
	}()

	// PostgreSQLにCOPYコマンド実施
	if err := pgCopy(pr); err != nil {
		panic(err)
	}
}

func readFromS3() (io.ReadCloser, error) {
	s, err := session.NewSession()
	if err != nil {
		return nil, err
	}

	cfg := &aws.Config{
		Credentials:      credentials.NewStaticCredentials("MINIO_ACCESS_KEY", "MINIO_SECRET_KEY", ""),
		Region:           aws.String("ap-northeast-1"),
		Endpoint:         aws.String("http://127.0.0.1:9000"),
		S3ForcePathStyle: aws.Bool(true),
	}
	obj, err := s3.New(s, cfg).GetObject(&s3.GetObjectInput{
		Bucket: aws.String("test-bucket"),
		Key:    aws.String("ken_sample.jsonl.gz"),
	})
	if err != nil {
		return nil, err
	}
	return obj.Body, nil
}

func json2csv(r io.Reader, w io.Writer) error {
	jsonDecoder := json.NewDecoder(r)
	for {
		zs := []*ZipAddress{{}}
		err := jsonDecoder.Decode(zs[0])
		if err != nil {
			if err == io.EOF {
				return nil
			}
			return err
		}
		if err := gocsv.MarshalWithoutHeaders(zs, w); err != nil {
			return err
		}
	}
}

func pgCopy(r io.Reader) error {
	conn, err := pgx.Connect(pgx.ConnConfig{
		Host:     "localhost",
		Port:     5432,
		User:     "postgres",
		Password: "postgres",
	})
	defer conn.Close()
	_, err = conn.CopyFromReader(r, "COPY zip_address (zipcode, address1, address2, address3) FROM STDIN WITH CSV")
	return err
}

変換の遷移

  • gzipファイル
    (省略)

  • gzip展開後JSONファイル

{"zipcode":"0600000","address1":"北海道","address2":"札幌市中央区","address3":"以下に掲載がない場合"}
{"zipcode":"0640941","address1":"北海道","address2":"札幌市中央区","address3":"旭ケ丘"}
{"zipcode":"0600041","address1":"北海道","address2":"札幌市中央区","address3":"大通東"}
{"zipcode":"0600042","address1":"北海道","address2":"札幌市中央区","address3":"大通西(1〜19丁目)"}
{"zipcode":"0640820","address1":"北海道","address2":"札幌市中央区","address3":"大通西(20〜28丁目)"}
  • ノーマライズ後JSONファイル

数字が半角数字に変換されてます。

{"zipcode":"0600000","address1":"北海道","address2":"札幌市中央区","address3":"以下に掲載がない場合"}
{"zipcode":"0640941","address1":"北海道","address2":"札幌市中央区","address3":"旭ケ丘"}
{"zipcode":"0600041","address1":"北海道","address2":"札幌市中央区","address3":"大通東"}
{"zipcode":"0600042","address1":"北海道","address2":"札幌市中央区","address3":"大通西(1〜19丁目)"}
{"zipcode":"0640820","address1":"北海道","address2":"札幌市中央区","address3":"大通西(20〜28丁目)"}
  • CSV変換後ファイル
0600000,北海道,札幌市中央区,以下に掲載がない場合
0640941,北海道,札幌市中央区,旭ケ丘
0600041,北海道,札幌市中央区,大通東
0600042,北海道,札幌市中央区,大通西(119丁目)
0640820,北海道,札幌市中央区,大通西(2028丁目)
  • PostgreSQLテーブル

image.png

各取得・変換処理の簡単な解説

1. S3(Minio)からgzipファイルをダウンロード

S3からのファイルのダウンロードとして (*S3).GetObjectの他に(Downloader).Downloadの方法があります。
(Downloader).Download の方は引数に任意の地点に対し書き込み可能な io.WriterAt が必要です。
今回はストリーム処理を行い過去データに遡っての書き込みはできないため (*S3).GetObject での実装を選択しました。
また、検証ではminioというS3互換のサービスを利用しています。

2. gzipファイルを展開しJSONファイルを出力

gzipの展開は標準のgzipパッケージを利用しています。
gzipファイルではなくzipファイルをサンプルとして使いたかったのですが、io.Reader ではなく任意の地点を読み込むことのできる io.ReaderAt引数に必要なためgzipをサンプルとして採用しました。
返り値には、展開後のファイルの io.ReadCloser が返ります。(サンプルコードの変数名 gr
gzipに複数ファイルがある場合には実装方法が若干ことなるのでご注意ください。

3. ノーマライズ処理を行い、全角数字を半角に変換

go言語ではサブパッケージのtransformパッケージで簡単にストリーミングしつつノーマライズする処理が書けます。
今回は NFKC の手法の正規化をしています。
transformパッケージは文字コードの変換処理でもよく使います。
記事では文字コードの変換は分かりにくいため省きましたが、以下のように書くと簡単に文字コードの変換が可能です。

sjisWriter := transform.NewWriter(utf8Writer, japanese.ShiftJIS.NewEncoder())

4. JSONからCSVに変換

JSONファイルの読み込みは標準のjsonパッケージを利用しています。
よくサンプルとして利用される json.Unmarshal は引数がバイト配列のため io.Reader を利用できる json.NewDecoder を利用しています。
CSVの書き込みにはタグが利用できるCSVライブラリのgocsvを利用しています。
良い書き方が思いつかず、1行ずつJSONを読み込みヘッダ無しのCSVを io.Writer に出力しています。
(もっと良い書き方があれば教えて下さい。)
後続のPostgreSQLのコピーコマンド実行時もストリーミングで変換処理を行う必要があるため、go routineにより動かしています。

※一行がJSONになっているため、実際はJSON Lineのファイルですが、分かりやすさのためにJSONとして説明してます。

5. PostgreSQLに対しCOPYコマンドでバルクインサート

標準のsqlパッケージはPostgreSQLのCOPYコマンドに対応しておらず、他のPostgreSQLクライアントのpgパッケージはメンテナンスモードのためpgxパッケージを利用しています。
pgxパッケージのv4ではCOPYコマンドの引数に直接 io.Reader が指定できる関数が無かったため、シンプルに記載可能なv3のpgxパッケージを利用しています。
pgx v4での実装サンプルを見たい方はこちらの記事をご確認ください。
CSVの出力が Reader ではなく Writer であり、変換するため io.Pipe を利用しています。

まとめ

本記事では io.Reader を中心としたストリーミング処理のサンプルと簡単な解説を行いました。
ストリーミング処理を使うことで、S3からファイルを取得してからDBに書き込むまで、中間ファイルを作らず、全量をメモリに載せないように処理できます。
特に大量データをメモリに乗せてOOMエラーなどが発生した際などに参考にしていただければ幸いです。

2
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?