2
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

activerecord-importで複数モデルをupsertする処理を少しでもすっきり書くためのクラス

Posted at

概要

  • Rails で upsertを行うためクラス BulkUpsert をかきました。
  • activerecord-import on_duplicate_key_update (いわゆるupsert)の処理をラップしたものです。
  • 関連するモデルを同時にupsertしたい場合ときに煩雑になりがちなコードを、DRYにかけます。
  • mysqlのみ で使えます(mysql以外だとエラーが出ます。)

背景

個人で運営している https://www.dokode.work/ のデータを更新のために作りました。
データ更新時、activerecord-import を利用した bulk insertもしくはbulk upsertをよく行います。ただ、その時の記述方法が煩雑で少しでもすっきりさせたいなーと思ったのがきっかけです。

所感

10行かかっていたのが6行になりました。
記述の減少量よりも、重複した記述がなくなることによりすっきり感がまして気持ちよくコードが書けるようになりました!

使い方

以下のようなBlogとCommentモデルを想定します。

irb(main):037:0> Blog.attribute_names
=> ["id", "name", "edited_at", "created_at", "updated_at"]
irb(main):038:0> Comment.attribute_names
=> ["id", "blog_id", "name", "created_at", "updated_at"]

BlogはCommentと以下のような関連を持っています。

app/models/blog.rb
class Blog < ApplicationRecord
  has_many :comments
  validates :name, presence: truet
end

普通のやりかた

irb(main):061:0> blog_list = []
irb(main):062:0> comment_list = []
irb(main):063:0> Blog.limit(1).each do |blog|
irb(main):064:1*   blog.assign_attributes(name: 'hoge')
irb(main):065:1>   comment = blog.comments.build(name: 'fuga')
irb(main):066:1>   blog_list << blog
irb(main):067:1>   comment_list << comment
irb(main):068:1> end
irb(main):070:0> Blog.import(blog_list, on_duplicate_key_update: [:name])
irb(main):071:0> Comment.import(comment_list, on_duplicate_key_update: [:name])

これでも読みやすいです。しかし、 blog_listとcomment_listやBlog.importとComment.importなどの記述が重複しているのが気になります。

今回書いたBulkUpsererを使用した例

こちらのコードを配置してください。場所は、railsで読み込まれるところ、例えば app/import/bulk_upserter.rb など。
配置後、下記を実行してください。上記と同様の結果を得ることができます。

irb(main):047:0> bulk_upserter = BulkUpserter.new(Blog.limit(1), {'Blog' => [:name]}, validate: true, raise_error: false, batch_size: 100)
irb(main):049:0> bulk_upserter.run do |blog, instances|
irb(main):050:1*   blog.assign_attributes(name: 'hoge')
irb(main):052:1>   comment = blog.comments.build(name: 'piyo')
irb(main):053:1>   instances.add([blog, comment])
irb(main):054:1> end

行数は上記が10行に対し、6行となり約半分になり、importや xx_listなどの重複した記述が減り、DRYになりました。

実装

こちらを転載します。

app/import/bulk_upserter.rb
class BulkUpserter
  attr_reader :scope, :on_duplicate_key_update, :raise_error, :validate, :batch_size, :reports, :error, :exit_on_error

  # activerecort-importのon_duplicate_key_updateを使いやすくしたもの
  # 使い方例
  # bulk_upserter = BulkUpserter.new(Blog.limit(100), {'Blog' => [:name], 'Comment' => [:name]})
  # bulk_upserter.run {|blog, instances| blog.assign_attributes(name: 'fuga') || instances.add(blog) || blog.comments.build(name: 'piyo') || instances.add(blog)}
  #
  # bulk_upserter.reports はactiverecord_importのreportの配列
  # => #<BulkUpserter::Reports:0x000055f74b08c4e0 @messages={:Blog=>[#<struct ActiveRecord::Import::Result failed_instances=[#<Blog id: 1, name: nil, edited_at: nil, created_at: "2019-11-23 07:17:30", updated_at: "2019-11-23 10:56:26">], num_inserts=0, ids=[], results=[]>], :Comment=>[#<struct ActiveRecord::Import::Result failed_instances=[], num_inserts=1, ids=[], results=[]>]}>
  # on_duplicate_key_update はモデル名毎にupdateするカラム名を配列で渡す
  # => {'Blog' => [:name], 'Comment' => [:name]}
  # raise_error は import時にvalidationエラーが起きた時即時にエラーをraiseしたい場合はtrue (default false)
  # validate は import時にvalidationを行わない場合は false (default true)
  # batch_size は 処理のバッチサイズ1000ぐらいが適当
  # exit_on_error はエラーをraiseしたときにすぐにう処理を抜けたい場合は true (defaultはfalse)
  def initialize(scope, on_duplicate_key_update, raise_error: false, validate: true, batch_size: 1000, exit_on_error: false)
    raise 'BulkUpserter is only for mysql' if ActiveRecord::Base.connection_config[:adapter] != 'mysql2'
    @scope = scope
    @on_duplicate_key_update = on_duplicate_key_update
    @raise_error = raise_error
    @validate = validate
    @batch_size = batch_size
    @log = []
    @error = false
    @reports = Reports.new
    @exit_on_error = exit_on_error
  end

  def run
    ActiveRecord::Base.transaction do
      scope.find_in_batches(batch_size: batch_size) do |scope|
        collection = Collection.new(scope)

        # モデル名をキーとし、objectの配列を取得する
        collection.load_with_sorting do |primary_object, instances|
          yield(primary_object, instances)
          instances.list
        end

        # モデル毎にimportする
        # collection.sorted_data
        # => {'Blog' => [blog, blog, blog], 'Comment' => [comment, comment, comment]}
        # model_name
        # => 'Blog'
        # objects
        # => [blog, blog, blog]
        collection.sorted_data.each do |model_name, objects|
          import_objects(objects.compact, on_duplicate_key_update: on_duplicate_key_update[model_name], raise_error: raise_error, validate: validate)
        end
      end
      raise ActiveRecord::Rollback if error
    end
  end

  def valid?
    !error
  end

  private

  def import_objects(objects, on_duplicate_key_update: {}, raise_error: false, validate: true)
    raise StandardError, '単一のobjectのリストにしてください' if objects.uniq.empty?

    model = Object.const_get(objects.first.model_name.name)
    begin
      report = model.import(objects, validate: validate, raise_error: raise_error, on_duplicate_key_update: on_duplicate_key_update)
      @reports.add(model.name, report)
      @error = true if report.failed_instances.present?
      raise 'Error raised !' if error && exit_on_error
    rescue ActiveRecord::RecordInvalid
      @error = true
    end
  end

  class Collection
    attr_reader :sorted_data, :scope

    def initialize(scope)
      @sorted_data = {}
      @scope = scope
    end

    def load_with_sorting
      scope.each do |primary_object|
        instances = Instances.new
        instances = yield(primary_object, instances)
        sorting(instances)
      end
    end

    private

    def sorting(instances)
      instances.each do |instance|
        model_name = instance.model_name.name
        sorted_data[model_name] ? sorted_data[model_name] << instance : sorted_data[model_name] = [instance]
      end
    end
  end

  class Instances
    attr_reader :list

    def initialize
      @list = []
    end

    def add(*objects)
      list.concat(objects)
    end
  end

  class Reports
    include Enumerable
    attr_reader :messages

    def initialize
      @messages = {}
    end

    def add(attribute, message)
      if messages[attribute.to_sym]
        messages[attribute.to_sym] << message
      else
        messages[attribute.to_sym] = [message]
      end
    end

    def each
      messages.each_key do |attribute|
        messages[attribute].each { |error| yield attribute, error }
      end
    end
  end
end

解説

指定できるパラーメータは以下の通りです。

bulk_upsert = BulkUpserter.new(
  Blog.limit(100), # upsertしたいベースとなるレコード
  {'Blog' => [:name], 'Comment' => [:name]}), # upsertしたいカラムをモデル別に記述
  validate: true, # validationをおこわない場合は false
  batch_size: 1000, # バッチ処理のデータ数単位。1000ぐらいが適当。
  exit_on_error: false, # validationエラーが起きたら読み込み途中で終了したい場合はtrue
)

実行する時は、以下のように行いたい処理をブロックで渡してください。

ポイントは、importするレコードを instances.add() で渡すところ。
追加されたinstanceの配列がモデル毎にupsertされます。

bulk_upserter.run do |blog, instances|
  blog.assign_attributes(name: 'hoge')
  comment = blog.comments.build(name: 'piyo')
  instances.add([blog, comment])
end

エラーの有無は、上記実行後に下記を実行してください。falseの場合ipmort時にエラーがあります。

bulk_upserter.valid?
=> true # なお、validationエラーがある場合は falseを返します。

エラーがあった場合は下記からactiverecord-importのログを利用して下さい。
モデル毎、バッチ毎に保存されています。

bulk_upserter.reports
bulk_upserter.reports['Blog'] # Blog import時のログ
bulk_upserter.reports['Comment'] # Comment import時のログ
2
2
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?