概要
- Rails で upsertを行うためクラス
BulkUpsert
をかきました。 - activerecord-import on_duplicate_key_update (いわゆるupsert)の処理をラップしたものです。
- 関連するモデルを同時にupsertしたい場合ときに煩雑になりがちなコードを、DRYにかけます。
- mysqlのみ で使えます(mysql以外だとエラーが出ます。)
背景
個人で運営している https://www.dokode.work/ のデータを更新のために作りました。
データ更新時、activerecord-import を利用した bulk insertもしくはbulk upsertをよく行います。ただ、その時の記述方法が煩雑で少しでもすっきりさせたいなーと思ったのがきっかけです。
所感
10行かかっていたのが6行になりました。
記述の減少量よりも、重複した記述がなくなることによりすっきり感がまして気持ちよくコードが書けるようになりました!
使い方
以下のようなBlogとCommentモデルを想定します。
irb(main):037:0> Blog.attribute_names
=> ["id", "name", "edited_at", "created_at", "updated_at"]
irb(main):038:0> Comment.attribute_names
=> ["id", "blog_id", "name", "created_at", "updated_at"]
BlogはCommentと以下のような関連を持っています。
class Blog < ApplicationRecord
has_many :comments
validates :name, presence: truet
end
普通のやりかた
irb(main):061:0> blog_list = []
irb(main):062:0> comment_list = []
irb(main):063:0> Blog.limit(1).each do |blog|
irb(main):064:1* blog.assign_attributes(name: 'hoge')
irb(main):065:1> comment = blog.comments.build(name: 'fuga')
irb(main):066:1> blog_list << blog
irb(main):067:1> comment_list << comment
irb(main):068:1> end
irb(main):070:0> Blog.import(blog_list, on_duplicate_key_update: [:name])
irb(main):071:0> Comment.import(comment_list, on_duplicate_key_update: [:name])
これでも読みやすいです。しかし、 blog_listとcomment_listやBlog.importとComment.importなどの記述が重複しているのが気になります。
今回書いたBulkUpsererを使用した例
こちらのコードを配置してください。場所は、railsで読み込まれるところ、例えば app/import/bulk_upserter.rb
など。
配置後、下記を実行してください。上記と同様の結果を得ることができます。
irb(main):047:0> bulk_upserter = BulkUpserter.new(Blog.limit(1), {'Blog' => [:name]}, validate: true, raise_error: false, batch_size: 100)
irb(main):049:0> bulk_upserter.run do |blog, instances|
irb(main):050:1* blog.assign_attributes(name: 'hoge')
irb(main):052:1> comment = blog.comments.build(name: 'piyo')
irb(main):053:1> instances.add([blog, comment])
irb(main):054:1> end
行数は上記が10行に対し、6行となり約半分になり、importや xx_listなどの重複した記述が減り、DRYになりました。
実装
こちらを転載します。
class BulkUpserter
attr_reader :scope, :on_duplicate_key_update, :raise_error, :validate, :batch_size, :reports, :error, :exit_on_error
# activerecort-importのon_duplicate_key_updateを使いやすくしたもの
# 使い方例
# bulk_upserter = BulkUpserter.new(Blog.limit(100), {'Blog' => [:name], 'Comment' => [:name]})
# bulk_upserter.run {|blog, instances| blog.assign_attributes(name: 'fuga') || instances.add(blog) || blog.comments.build(name: 'piyo') || instances.add(blog)}
#
# bulk_upserter.reports はactiverecord_importのreportの配列
# => #<BulkUpserter::Reports:0x000055f74b08c4e0 @messages={:Blog=>[#<struct ActiveRecord::Import::Result failed_instances=[#<Blog id: 1, name: nil, edited_at: nil, created_at: "2019-11-23 07:17:30", updated_at: "2019-11-23 10:56:26">], num_inserts=0, ids=[], results=[]>], :Comment=>[#<struct ActiveRecord::Import::Result failed_instances=[], num_inserts=1, ids=[], results=[]>]}>
# on_duplicate_key_update はモデル名毎にupdateするカラム名を配列で渡す
# => {'Blog' => [:name], 'Comment' => [:name]}
# raise_error は import時にvalidationエラーが起きた時即時にエラーをraiseしたい場合はtrue (default false)
# validate は import時にvalidationを行わない場合は false (default true)
# batch_size は 処理のバッチサイズ1000ぐらいが適当
# exit_on_error はエラーをraiseしたときにすぐにう処理を抜けたい場合は true (defaultはfalse)
def initialize(scope, on_duplicate_key_update, raise_error: false, validate: true, batch_size: 1000, exit_on_error: false)
raise 'BulkUpserter is only for mysql' if ActiveRecord::Base.connection_config[:adapter] != 'mysql2'
@scope = scope
@on_duplicate_key_update = on_duplicate_key_update
@raise_error = raise_error
@validate = validate
@batch_size = batch_size
@log = []
@error = false
@reports = Reports.new
@exit_on_error = exit_on_error
end
def run
ActiveRecord::Base.transaction do
scope.find_in_batches(batch_size: batch_size) do |scope|
collection = Collection.new(scope)
# モデル名をキーとし、objectの配列を取得する
collection.load_with_sorting do |primary_object, instances|
yield(primary_object, instances)
instances.list
end
# モデル毎にimportする
# collection.sorted_data
# => {'Blog' => [blog, blog, blog], 'Comment' => [comment, comment, comment]}
# model_name
# => 'Blog'
# objects
# => [blog, blog, blog]
collection.sorted_data.each do |model_name, objects|
import_objects(objects.compact, on_duplicate_key_update: on_duplicate_key_update[model_name], raise_error: raise_error, validate: validate)
end
end
raise ActiveRecord::Rollback if error
end
end
def valid?
!error
end
private
def import_objects(objects, on_duplicate_key_update: {}, raise_error: false, validate: true)
raise StandardError, '単一のobjectのリストにしてください' if objects.uniq.empty?
model = Object.const_get(objects.first.model_name.name)
begin
report = model.import(objects, validate: validate, raise_error: raise_error, on_duplicate_key_update: on_duplicate_key_update)
@reports.add(model.name, report)
@error = true if report.failed_instances.present?
raise 'Error raised !' if error && exit_on_error
rescue ActiveRecord::RecordInvalid
@error = true
end
end
class Collection
attr_reader :sorted_data, :scope
def initialize(scope)
@sorted_data = {}
@scope = scope
end
def load_with_sorting
scope.each do |primary_object|
instances = Instances.new
instances = yield(primary_object, instances)
sorting(instances)
end
end
private
def sorting(instances)
instances.each do |instance|
model_name = instance.model_name.name
sorted_data[model_name] ? sorted_data[model_name] << instance : sorted_data[model_name] = [instance]
end
end
end
class Instances
attr_reader :list
def initialize
@list = []
end
def add(*objects)
list.concat(objects)
end
end
class Reports
include Enumerable
attr_reader :messages
def initialize
@messages = {}
end
def add(attribute, message)
if messages[attribute.to_sym]
messages[attribute.to_sym] << message
else
messages[attribute.to_sym] = [message]
end
end
def each
messages.each_key do |attribute|
messages[attribute].each { |error| yield attribute, error }
end
end
end
end
解説
指定できるパラーメータは以下の通りです。
bulk_upsert = BulkUpserter.new(
Blog.limit(100), # upsertしたいベースとなるレコード
{'Blog' => [:name], 'Comment' => [:name]}), # upsertしたいカラムをモデル別に記述
validate: true, # validationをおこわない場合は false
batch_size: 1000, # バッチ処理のデータ数単位。1000ぐらいが適当。
exit_on_error: false, # validationエラーが起きたら読み込み途中で終了したい場合はtrue
)
実行する時は、以下のように行いたい処理をブロックで渡してください。
ポイントは、importするレコードを instances.add()
で渡すところ。
追加されたinstanceの配列がモデル毎にupsertされます。
bulk_upserter.run do |blog, instances|
blog.assign_attributes(name: 'hoge')
comment = blog.comments.build(name: 'piyo')
instances.add([blog, comment])
end
エラーの有無は、上記実行後に下記を実行してください。falseの場合ipmort時にエラーがあります。
bulk_upserter.valid?
=> true # なお、validationエラーがある場合は falseを返します。
エラーがあった場合は下記からactiverecord-importのログを利用して下さい。
モデル毎、バッチ毎に保存されています。
bulk_upserter.reports
bulk_upserter.reports['Blog'] # Blog import時のログ
bulk_upserter.reports['Comment'] # Comment import時のログ