1
0

Go の ORM sqlcでBulk Insertを行う(MySQL)

Posted at

sqlcでは動的なSQLは書けません。WHEREについては方法を紹介しましたが、Bulk Insertも動的になってしまうので、困りますね。
SELECTと同様、Query Builderで作るのも手段ではありますが、一応公式に用意されています。ただ、ちょっと微妙かなと思っています。

copyfrom アノテーションを使う

下記のようにcopyfromを使うのが、公式のやり方です。

-- name: InsertValues :copyfrom
INSERT INTO
    user (id, Name)
VALUES
    (?, ?);

ただ、これ、実際は、INSERTではなくてLOAD DATAに変換されます。LOAD DATAに変換されると、懸念事項がいくつかあります。
ただ、LOAD DATAのパフォーマンスはかなり良いはずなので、使いどころや使うデータについて気をつけられるのであれば、選択肢としてはありだと思います。

以下、微妙なところ。

1. エラーに気づけない

LOCALを使うので、「LOCAL が指定されている場合、デフォルトの動作は IGNORE が指定されている場合と同じです。これは、操作の最中にファイルの転送を停止する方法がサーバーにはないためです」となってしまいます。

重複キーの処理
REPLACE および IGNORE 修飾子は、一意のキー値で既存の行を複製する新しい (入力) 行の処理を制御します:

REPLACE を指定すると、新しい行で既存の行が置き換えられます。 つまり、既存の行と同じ主キーまたは一意インデックスの値を持つ行が既存の行を置換します。 セクション13.2.9「REPLACE ステートメント」を参照してください。

IGNORE を指定すると、一意のキー値で既存の行を複製する新しい行は破棄されます。 詳細は、IGNORE がステートメントの実行に与える影響を参照してください。

いずれの修飾子も指定しない場合、動作は LOCAL 修飾子が指定されているかどうかによって異なります。 LOCAL が指定されていない場合は、重複キー値が見つかるとエラーが発生し、テキストファイルの残りは無視されます。 LOCAL が指定されている場合、デフォルトの動作は IGNORE が指定されている場合と同じです。これは、操作の最中にファイルの転送を停止する方法がサーバーにはないためです。
https://dev.mysql.com/doc/refman/8.0/ja/load-data.html#load-data-duplicate-key-handling

sqlcのドキュメントにも、以下のように書かれています。

Errors and duplicate keys are treated as warnings and insertion will continue, even without an error for some cases. Use this in a transaction and use SHOW WARNINGS to check for any problems and roll back if necessary.
https://docs.sqlc.dev/en/stable/howto/insert.html#mysql

トランザクションはって、SHOW WARNINGSしろとのことですね。

2. LOAD DATA は、ステートメントベースレプリケーションでは安全でない

MySQLのbinlog_formatの設定に気をつける必要があります。まぁ、多分、大丈夫(ステートメントベースにはなっていないであろう)ですが、一応確認しとくとよいのでは。

LOAD DATA は、ステートメントベースレプリケーションでは安全でないとみなされます。 binlog_format=STATEMENT が設定されているときに LOAD DATA を使用すると、データを含む一時ファイルが、変更が適用されるレプリケーションスレーブ上に作成されます。 バイナリログの暗号化がサーバー上でアクティブな場合、この一時ファイルは暗号化されないことに注意してください。 暗号化が必要な場合は、一時ファイルを作成しない行ベースまたは混合バイナリロギング形式を使用してください。 LOAD DATA とレプリケーションの相互作用の詳細は、セクション17.5.1.19「レプリケーションと LOAD DATA」 を参照してください。
https://dev.mysql.com/doc/refman/8.0/ja/replication-features-load-data.html

使う際の注意

sqlcのYAMLファイルに、sql_packagesql_driverの設定が必要です。また、github.com/hexon/mysqltsv も必要になります。

JSON_TABLEを使う

MySQL8から使える、JSON_TABLEという関数を使って Bulk Insertすることができます(参考記事)。

INSERT INTO `テーブル名` (`カラム名`,...)
SELECT * FROM JSON_TABLE('[{カラム名: 値},...]')

のようなSQLでまとめてデータを入れることが出来ます。sqlcであれば、以下のようにしてやれば、引数として、JSONフォーマットの文字列を渡すだけで使えそうです。

-- name: BulkInsertToUser :exec
INSERT INTO user (name, age) SELECT * FROM JSON_TABLE(sqlc.arg(json))

ただ、文字列としてJSONを渡す必要があるので、型の定義にtagを付けて、配列を作って、Marshal した文字列を渡す...ということになりそうですね。

ちなみに、sqlcのYAMLの設定ファイルで、emit_json_tags: true つけておけば、自動生成されたテーブルの型にJSONのタグをつけることは出来ます(ただ、idとかも入ってしまうので、コピーしてinsert用の型を作るということになるのかなと思います)。詳細はドキュメント参照

自前でBulk Insertを作る

JSON_TABLEで概ね問題ないのですが、jsonのstringを作るのが、ちょっと面倒なので、自前でBULK INSERTを作っても良さそうです。

自動生成される型を使う

sqlcが生成するテーブルごとの型を利用するのもありだと思います。以下のように、型名=テーブル名構造体のメンバ=カラム名なので、これを利用します。

例として、以下のような userテーブルに対応する型が生成されたとしましょう。

type User struct {
	ID         int32
	Name       int32
	CreatedAt  time.Time
	UpdatedAt  sql.NullTime
}

以下のコードで、valuesの最初の値からテーブル名やカラム名をreflectで取得しています。

func MakeBulkInsertQuery[T any](tableName string, values []T, ignoreColumns bool) (string, []any, error) {
	typeOfTable := reflect.TypeOf(values[0])
	if typeOfTable.Kind() == reflect.Ptr {
		typeOfTable = typeOfTable.Elem()
	}

	fields := make([]string, 0)
	targetFields := make([]int, 0)
	if typeOfTable.Kind() != reflect.Struct {
		return "", nil, fmt.Errorf("%s is not struct: %s", typeOfTable, typeOfTable.Kind())
	}
	for i := 0; i < typeOfTable.NumField(); i++ {
		name := typeOfTable.Field(i).Name
        // ignoreColumns が false でなければ、id や updated_at は無視する
		if !ignoreColumns || (name != "ID" && name != "UpdatedAt") {
			targetFields = append(targetFields, i)
			fields = append(fields, strcase.ToSnake(name))
		}
	}
	args := make([]any, 0)
	for _, row := range values {
		valueOfTable := reflect.ValueOf(row)
		if valueOfTable.Kind() == reflect.Ptr {
			valueOfTable = valueOfTable.Elem()
		}
		for _, i := range targetFields {
			args = append(args, valueOfTable.Field(i).Interface())
		}
	}
	if tableName == "" {
		tableName = typeOfTable.Name()
	}
	sql, err := makeInsertSQL(tableName, fields, len(values))

	return sql, args, err
}

呼び出しは、以下のようにしてやります。

values := make([]User, 0)

for _, row := range rows {
   append(values, User{
        Name: row.Name,
        CreatedAt: time.Now(),
   })
}

MakeBulkInsertQuery[User](values, true)

テーブル名と []map[string]any を渡す

これは、もっと単純ですね。テーブル名と、[]map[string]anyを渡します。map[string]anyのキーがカラム名で、値がそれに対応する値ですね。

func MakeBulkInsertQueryWithMap(table string, values []map[string]any) (string, []any) {
	args := make([]any, 0)
	keys := make([]string, 0)
    for _, key := range values[0] {
        keys = append(keys, key)
    }
	sort.Strings(keys)
	for _, value := range values {
		for _, key := range keys {
			args = append(args, value[key])
		}
	}

	return makeInsertSQL(table, keys, len(values)), args
}

呼び出しは以下のようになります。

values := make([]map[string]any, 0)

for _, row := range rows {
   append(values, map[string]any{
        "name": row.Name,
        "created_at": time.Now(),
   })
}

MakeBulkInsertQueryWithMap("user", values)

全体のコード

全体のコードは下記のようになります。

package dbutils

import (
	"errors"
	"fmt"
	"reflect"
	"sort"
	"strings"

	"github.com/iancoleman/strcase"
)

type Map = map[string]any
type Maps = []Map

func MakeBulkInsertQuery[T any](tableName string, values []T, ignoreColumns bool) (string, []any, error) {
	if len(values) == 0 {
		return "", nil, errors.New("empty values")
	}
	typeOfTable := reflect.TypeOf(values[0])
	if typeOfTable.Kind() == reflect.Ptr {
		typeOfTable = typeOfTable.Elem()
	}

	fields := make([]string, 0)
	targetFields := make([]int, 0)
	if typeOfTable.Kind() != reflect.Struct {
		return "", nil, fmt.Errorf("%s is not struct: %s", typeOfTable, typeOfTable.Kind())
	}
	for i := 0; i < typeOfTable.NumField(); i++ {
		name := typeOfTable.Field(i).Name
        // ID や UpdatedAt は無視する
		if !ignoreColumns || (name != "ID" && name != "UpdatedAt") {
			targetFields = append(targetFields, i)
			fields = append(fields, strcase.ToSnake(name))
		}
	}
	args := make([]any, 0)
	for _, row := range values {
		valueOfTable := reflect.ValueOf(row)
		if valueOfTable.Kind() == reflect.Ptr {
			valueOfTable = valueOfTable.Elem()
		}
		for _, i := range targetFields {
			args = append(args, valueOfTable.Field(i).Interface())
		}
	}
	if tableName == "" {
		tableName = typeOfTable.Name()
	}
	sql, err := makeInsertSQL(tableName, fields, len(values))

	return sql, args, err
}

func makeInsertSQL(table string, fields []string, numOfValues int) (string, error) {
	if len(fields) == 0 {
		return "", errors.New("empty fields")
	}
	sql := fmt.Sprintf("INSERT INTO\n    %s (%s)\nVALUES\n", strcase.ToSnake(table), strings.Join(fields, ", "))
	placeHolder := "    (?"
	for i := 0; i < len(fields)-1; i++ {
		placeHolder += ",?"
	}
	placeHolder += ")"
	for i := 0; i < numOfValues-1; i++ {
		sql += placeHolder + ",\n"
	}
	sql += placeHolder + ";\n"
	return sql, nil
}

func MakeBulkInsertQueryWithMap(table string, values Maps) (string, []any, error) {
	args := make([]any, 0)
	if len(values) == 0 {
		return "", nil, errors.New("empty values")
	}
    for _, key := range values[0] {
        keys = append(keys, key)
    }
	sort.Strings(keys)
	for _, value := range values {
		for _, key := range keys {
			args = append(args, value[key])
		}
	}
	sql, err := makeInsertSQL(table, keys, len(values))
	return sql, args, err
}

sqlcから使う

次のようなメソッドを用意してやると良いのではないでしょうか。

package 自動生成されたものと同じ名前

import (
	"context"
	"database/sql"
	"errors"
	"your-project/dbutils"
)

type BulkInsert[T any] struct {
	q         *Queries
	tableName string
}

func NewBulkInsert[T any](q *Queries, tableName ...string) *BulkInsert[T] {
	name := ""
	if len(tableName) > 0 {
		name = tableName[0]
	}

	return &BulkInsert[T]{
		q,
		name,
	}
}

func (b *BulkInsert[T]) BulkInsert(ctx context.Context, values []T) (sql.Result, error) {
	if len(values) == 0 {
		return nil, errors.New("empty values")
	}
	sql, args, err := dbutils.MakeBulkInsertQuery(b.tableName, values, true)
	if err != nil {
		return nil, err
	}
	return b.q.db.ExecContext(ctx, sql, args...)
}

func (b *BulkInsert[T]) BulkInsertWithAllColumns(ctx context.Context, values []T) (sql.Result, error) {
	if len(values) == 0 {
		return nil, errors.New("empty values")
	}
	sql, args, err := dbutils.MakeBulkInsertQuery(b.tableName, values, false)
	if err != nil {
		return nil, err
	}
	return b.q.db.ExecContext(ctx, sql, args...)
}

func (q *Queries) BulkInsertWithMap(ctx context.Context, table string, values dbutils.Maps) (sql.Result, error) {
	if len(values) == 0 {
		return nil, errors.New("empty values")
	}
	sql, args, err := dbutils.MakeBulkInsertQueryWithMap(table, values)
	if err != nil {
		return nil, err
	}
	return q.db.ExecContext(ctx, sql, args...)
}

実際、BulkInsertしたいコードからは、

users := []yourdb.User{
  // 省略
}
b := yourdb.NewBulkInsert[User](q) // テーブル名と、型名が違うなら、第2引数にテーブル名を渡す
b.BulkInsert(ctx, users)
users := dbutils.Maps {
  // 省略
}

b := q.BulkInsertWithMap(ctx, "user", users)

注意事項

MySQLのプレースホルダは無限に指定できるわけではありませんので、65536個以上のデータを入れたいなら、分割しなければなりません。もしくは、途中で紹介したようにJSONを使えばよいでしょう。あと、いずれも、max_allowed_packetの値は調整する必要があるかもしれません。

メソッド側でエラーにするか、自動的に分割しても良いかもしれませんが、まぁ、そんなに大きい数をまとめて入れると、対象のテーブルの別プロセスからの読み書きにも問題出たりすると思いますので、やめといたほうが良いような気もします。取り扱ってるシステムの性格次第ではありますが。

また、微妙と言った、copyflomですが、LOAD DATAなので、爆速であることは間違いないかと思います(MySQL5系での経験上の話)。「○千万件超のデータを一気に突っ込みたい!」みたいな要件でしたら、かなり有用だと思います。

終わり

という感じで、sqlcでのBulk Insertについてでした。当初もっと短いコードのつもりだったのですが、実際実装してみると色々ありまして、長くなってしまいました。

BulkInsertWithMapで、テーブル名やカラム名のハードコーディングがちょっと...という方は、以前も紹介しましたが、下記をどうぞ。タイトルにgoquと書いてますが、別にgoqu専用というわけでもなんでもないです。

というわけで、長々続いたsqlcの記事は一旦終わりかなと思います。

1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0