sqlcでは動的なSQLは書けません。WHEREについては方法を紹介しましたが、Bulk Insertも動的になってしまうので、困りますね。
SELECTと同様、Query Builderで作るのも手段ではありますが、一応公式に用意されています。ただ、ちょっと微妙かなと思っています。
copyfrom アノテーションを使う
下記のようにcopyfrom
を使うのが、公式のやり方です。
-- name: InsertValues :copyfrom
INSERT INTO
user (id, Name)
VALUES
(?, ?);
ただ、これ、実際は、INSERT
ではなくてLOAD DATA
に変換されます。LOAD DATA
に変換されると、懸念事項がいくつかあります。
ただ、LOAD DATA
のパフォーマンスはかなり良いはずなので、使いどころや使うデータについて気をつけられるのであれば、選択肢としてはありだと思います。
以下、微妙なところ。
1. エラーに気づけない
LOCAL
を使うので、「LOCAL が指定されている場合、デフォルトの動作は IGNORE が指定されている場合と同じです。これは、操作の最中にファイルの転送を停止する方法がサーバーにはないためです」となってしまいます。
重複キーの処理
REPLACE および IGNORE 修飾子は、一意のキー値で既存の行を複製する新しい (入力) 行の処理を制御します:REPLACE を指定すると、新しい行で既存の行が置き換えられます。 つまり、既存の行と同じ主キーまたは一意インデックスの値を持つ行が既存の行を置換します。 セクション13.2.9「REPLACE ステートメント」を参照してください。
IGNORE を指定すると、一意のキー値で既存の行を複製する新しい行は破棄されます。 詳細は、IGNORE がステートメントの実行に与える影響を参照してください。
いずれの修飾子も指定しない場合、動作は LOCAL 修飾子が指定されているかどうかによって異なります。 LOCAL が指定されていない場合は、重複キー値が見つかるとエラーが発生し、テキストファイルの残りは無視されます。 LOCAL が指定されている場合、デフォルトの動作は IGNORE が指定されている場合と同じです。これは、操作の最中にファイルの転送を停止する方法がサーバーにはないためです。
https://dev.mysql.com/doc/refman/8.0/ja/load-data.html#load-data-duplicate-key-handling
sqlcのドキュメントにも、以下のように書かれています。
Errors and duplicate keys are treated as warnings and insertion will continue, even without an error for some cases. Use this in a transaction and use SHOW WARNINGS to check for any problems and roll back if necessary.
https://docs.sqlc.dev/en/stable/howto/insert.html#mysql
トランザクションはって、SHOW WARNINGS
しろとのことですね。
2. LOAD DATA は、ステートメントベースレプリケーションでは安全でない
MySQLのbinlog_format
の設定に気をつける必要があります。まぁ、多分、大丈夫(ステートメントベースにはなっていないであろう)ですが、一応確認しとくとよいのでは。
LOAD DATA は、ステートメントベースレプリケーションでは安全でないとみなされます。 binlog_format=STATEMENT が設定されているときに LOAD DATA を使用すると、データを含む一時ファイルが、変更が適用されるレプリケーションスレーブ上に作成されます。 バイナリログの暗号化がサーバー上でアクティブな場合、この一時ファイルは暗号化されないことに注意してください。 暗号化が必要な場合は、一時ファイルを作成しない行ベースまたは混合バイナリロギング形式を使用してください。 LOAD DATA とレプリケーションの相互作用の詳細は、セクション17.5.1.19「レプリケーションと LOAD DATA」 を参照してください。
https://dev.mysql.com/doc/refman/8.0/ja/replication-features-load-data.html
使う際の注意
sqlcのYAMLファイルに、sql_package
と sql_driver
の設定が必要です。また、github.com/hexon/mysqltsv
も必要になります。
JSON_TABLE
を使う
MySQL8から使える、JSON_TABLE
という関数を使って Bulk Insertすることができます(参考記事)。
INSERT INTO `テーブル名` (`カラム名`,...)
SELECT * FROM JSON_TABLE('[{カラム名: 値},...]')
のようなSQLでまとめてデータを入れることが出来ます。sqlcであれば、以下のようにしてやれば、引数として、JSONフォーマットの文字列を渡すだけで使えそうです。
-- name: BulkInsertToUser :exec
INSERT INTO user (name, age) SELECT * FROM JSON_TABLE(sqlc.arg(json))
ただ、文字列としてJSONを渡す必要があるので、型の定義にtagを付けて、配列を作って、Marshal
した文字列を渡す...ということになりそうですね。
ちなみに、sqlcのYAMLの設定ファイルで、emit_json_tags: true
つけておけば、自動生成されたテーブルの型にJSONのタグをつけることは出来ます(ただ、idとかも入ってしまうので、コピーしてinsert用の型を作るということになるのかなと思います)。詳細はドキュメント参照。
自前でBulk Insertを作る
JSON_TABLEで概ね問題ないのですが、jsonのstringを作るのが、ちょっと面倒なので、自前でBULK INSERTを作っても良さそうです。
自動生成される型を使う
sqlcが生成するテーブルごとの型を利用するのもありだと思います。以下のように、型名=テーブル名
、構造体のメンバ=カラム名
なので、これを利用します。
例として、以下のような user
テーブルに対応する型が生成されたとしましょう。
type User struct {
ID int32
Name int32
CreatedAt time.Time
UpdatedAt sql.NullTime
}
以下のコードで、values
の最初の値からテーブル名やカラム名をreflect
で取得しています。
func MakeBulkInsertQuery[T any](tableName string, values []T, ignoreColumns bool) (string, []any, error) {
typeOfTable := reflect.TypeOf(values[0])
if typeOfTable.Kind() == reflect.Ptr {
typeOfTable = typeOfTable.Elem()
}
fields := make([]string, 0)
targetFields := make([]int, 0)
if typeOfTable.Kind() != reflect.Struct {
return "", nil, fmt.Errorf("%s is not struct: %s", typeOfTable, typeOfTable.Kind())
}
for i := 0; i < typeOfTable.NumField(); i++ {
name := typeOfTable.Field(i).Name
// ignoreColumns が false でなければ、id や updated_at は無視する
if !ignoreColumns || (name != "ID" && name != "UpdatedAt") {
targetFields = append(targetFields, i)
fields = append(fields, strcase.ToSnake(name))
}
}
args := make([]any, 0)
for _, row := range values {
valueOfTable := reflect.ValueOf(row)
if valueOfTable.Kind() == reflect.Ptr {
valueOfTable = valueOfTable.Elem()
}
for _, i := range targetFields {
args = append(args, valueOfTable.Field(i).Interface())
}
}
if tableName == "" {
tableName = typeOfTable.Name()
}
sql, err := makeInsertSQL(tableName, fields, len(values))
return sql, args, err
}
呼び出しは、以下のようにしてやります。
values := make([]User, 0)
for _, row := range rows {
append(values, User{
Name: row.Name,
CreatedAt: time.Now(),
})
}
MakeBulkInsertQuery[User](values, true)
テーブル名と []map[string]any
を渡す
これは、もっと単純ですね。テーブル名と、[]map[string]any
を渡します。map[string]any
のキーがカラム名で、値がそれに対応する値ですね。
func MakeBulkInsertQueryWithMap(table string, values []map[string]any) (string, []any) {
args := make([]any, 0)
keys := make([]string, 0)
for _, key := range values[0] {
keys = append(keys, key)
}
sort.Strings(keys)
for _, value := range values {
for _, key := range keys {
args = append(args, value[key])
}
}
return makeInsertSQL(table, keys, len(values)), args
}
呼び出しは以下のようになります。
values := make([]map[string]any, 0)
for _, row := range rows {
append(values, map[string]any{
"name": row.Name,
"created_at": time.Now(),
})
}
MakeBulkInsertQueryWithMap("user", values)
全体のコード
全体のコードは下記のようになります。
package dbutils
import (
"errors"
"fmt"
"reflect"
"sort"
"strings"
"github.com/iancoleman/strcase"
)
type Map = map[string]any
type Maps = []Map
func MakeBulkInsertQuery[T any](tableName string, values []T, ignoreColumns bool) (string, []any, error) {
if len(values) == 0 {
return "", nil, errors.New("empty values")
}
typeOfTable := reflect.TypeOf(values[0])
if typeOfTable.Kind() == reflect.Ptr {
typeOfTable = typeOfTable.Elem()
}
fields := make([]string, 0)
targetFields := make([]int, 0)
if typeOfTable.Kind() != reflect.Struct {
return "", nil, fmt.Errorf("%s is not struct: %s", typeOfTable, typeOfTable.Kind())
}
for i := 0; i < typeOfTable.NumField(); i++ {
name := typeOfTable.Field(i).Name
// ID や UpdatedAt は無視する
if !ignoreColumns || (name != "ID" && name != "UpdatedAt") {
targetFields = append(targetFields, i)
fields = append(fields, strcase.ToSnake(name))
}
}
args := make([]any, 0)
for _, row := range values {
valueOfTable := reflect.ValueOf(row)
if valueOfTable.Kind() == reflect.Ptr {
valueOfTable = valueOfTable.Elem()
}
for _, i := range targetFields {
args = append(args, valueOfTable.Field(i).Interface())
}
}
if tableName == "" {
tableName = typeOfTable.Name()
}
sql, err := makeInsertSQL(tableName, fields, len(values))
return sql, args, err
}
func makeInsertSQL(table string, fields []string, numOfValues int) (string, error) {
if len(fields) == 0 {
return "", errors.New("empty fields")
}
sql := fmt.Sprintf("INSERT INTO\n %s (%s)\nVALUES\n", strcase.ToSnake(table), strings.Join(fields, ", "))
placeHolder := " (?"
for i := 0; i < len(fields)-1; i++ {
placeHolder += ",?"
}
placeHolder += ")"
for i := 0; i < numOfValues-1; i++ {
sql += placeHolder + ",\n"
}
sql += placeHolder + ";\n"
return sql, nil
}
func MakeBulkInsertQueryWithMap(table string, values Maps) (string, []any, error) {
args := make([]any, 0)
if len(values) == 0 {
return "", nil, errors.New("empty values")
}
for _, key := range values[0] {
keys = append(keys, key)
}
sort.Strings(keys)
for _, value := range values {
for _, key := range keys {
args = append(args, value[key])
}
}
sql, err := makeInsertSQL(table, keys, len(values))
return sql, args, err
}
sqlcから使う
次のようなメソッドを用意してやると良いのではないでしょうか。
package 自動生成されたものと同じ名前
import (
"context"
"database/sql"
"errors"
"your-project/dbutils"
)
type BulkInsert[T any] struct {
q *Queries
tableName string
}
func NewBulkInsert[T any](q *Queries, tableName ...string) *BulkInsert[T] {
name := ""
if len(tableName) > 0 {
name = tableName[0]
}
return &BulkInsert[T]{
q,
name,
}
}
func (b *BulkInsert[T]) BulkInsert(ctx context.Context, values []T) (sql.Result, error) {
if len(values) == 0 {
return nil, errors.New("empty values")
}
sql, args, err := dbutils.MakeBulkInsertQuery(b.tableName, values, true)
if err != nil {
return nil, err
}
return b.q.db.ExecContext(ctx, sql, args...)
}
func (b *BulkInsert[T]) BulkInsertWithAllColumns(ctx context.Context, values []T) (sql.Result, error) {
if len(values) == 0 {
return nil, errors.New("empty values")
}
sql, args, err := dbutils.MakeBulkInsertQuery(b.tableName, values, false)
if err != nil {
return nil, err
}
return b.q.db.ExecContext(ctx, sql, args...)
}
func (q *Queries) BulkInsertWithMap(ctx context.Context, table string, values dbutils.Maps) (sql.Result, error) {
if len(values) == 0 {
return nil, errors.New("empty values")
}
sql, args, err := dbutils.MakeBulkInsertQueryWithMap(table, values)
if err != nil {
return nil, err
}
return q.db.ExecContext(ctx, sql, args...)
}
実際、BulkInsertしたいコードからは、
users := []yourdb.User{
// 省略
}
b := yourdb.NewBulkInsert[User](q) // テーブル名と、型名が違うなら、第2引数にテーブル名を渡す
b.BulkInsert(ctx, users)
users := dbutils.Maps {
// 省略
}
b := q.BulkInsertWithMap(ctx, "user", users)
注意事項
MySQLのプレースホルダは無限に指定できるわけではありませんので、65536個以上のデータを入れたいなら、分割しなければなりません。もしくは、途中で紹介したようにJSONを使えばよいでしょう。あと、いずれも、max_allowed_packet
の値は調整する必要があるかもしれません。
メソッド側でエラーにするか、自動的に分割しても良いかもしれませんが、まぁ、そんなに大きい数をまとめて入れると、対象のテーブルの別プロセスからの読み書きにも問題出たりすると思いますので、やめといたほうが良いような気もします。取り扱ってるシステムの性格次第ではありますが。
また、微妙と言った、copyflom
ですが、LOAD DATA
なので、爆速であることは間違いないかと思います(MySQL5系での経験上の話)。「○千万件超のデータを一気に突っ込みたい!」みたいな要件でしたら、かなり有用だと思います。
終わり
という感じで、sqlcでのBulk Insertについてでした。当初もっと短いコードのつもりだったのですが、実際実装してみると色々ありまして、長くなってしまいました。
BulkInsertWithMap
で、テーブル名やカラム名のハードコーディングがちょっと...という方は、以前も紹介しましたが、下記をどうぞ。タイトルにgoqu
と書いてますが、別にgoqu専用というわけでもなんでもないです。
というわけで、長々続いたsqlcの記事は一旦終わりかなと思います。