50
48

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

巨大なCSVファイルをRailsで処理させる

Last updated at Posted at 2015-11-11

はじめに

20MBを超えるCSVの処理を行う必要が出てきて、右往左往したのでそのメモを残します。

当初は大きくてもファイルサイズは3MBだったので、リクエスト内で処理をさせていました。(この対応にも問題がありますが。。。)
しかし、実際のデータをもらったら20MBを超えるファイルが一つ入っていて、リクエスト内で処理をさせるには限界が出てきました。
そこで色々模索して出した結論を載せておきます。

なお、本処理は業務の一環で発生するCSVアップロード処理になります。
頻度は年に数回というレベルのもので、以下のような対応をしています。
予算も時間もある方はデーターサーバーと遅延バッチ(もしくはスケジューラー)を組み合わせたほうが無難だと思います。

やりたい事

1系2系と言う一般的な冗長化しているサーバーで大容量のCSVをアップデートして、RDSにデータをインサートしたい。

困る点

冗長化されているため、ローカルにファイルを配置すると1系にアップされるのか2系にアップされるのか分からない。

かといって、20MB以上のデータをS3などには配置すると処理に無駄が発生してしまう。
(できるだけ、そのサーバーで完結させたい。)

リクエスト内で処理を行うとレスポンスタイムアウトが発生してしまう。

解決案を模索

はじめはsidekiqなどのバックグランドジョブを使用して対応しようかと思ったのですが、この方法を使う場合は冗長化されているシステムでのファイルの置き場に困ってしまう。
(正直に言うと管理するのが面倒だと思った・・・)

前の記事に書いたsucker_punchを使うと、そのプロセスでCSV処理が発生してしまい結果的に全体のレスポンスが落ちてしまう。
sucker_punchはスレッド実行のため)

結論

Rakeタスクを作成して、Railsから呼び出す事にした。

やり方を考えてみる

やりたい事は簡単で以下のフローを実施するだけである。

  1. リクエストからCSVファイルを受け取る
  2. CSVファイルの存在チェックやヘッダーと1行目のデータチェックを行う
  3. RailsRoot/tmp/配下にCSVファイルを保存する
  4. 3で作成したファイルを管理するテーブルにレコードを発行して、レスポンスを返す。

ここまでがリクエスト/レスポンスで行いたい処理だ。
ここからはバッチの処理になる。

  1. 上記4の処理中にRakeタスクを発行する。(これにより同一サーバー上でバッチ処理を行える。)
  2. Rakeタスク内部でCSVデータを検証して上記4で発行したレコードに経過処理を通知する。(結果はバッチ一覧画面で見れるようにする。)
  3. CSVの取り込みが終わったら上記3で作成したファイルを削除する。(同一サーバー上なのでファイルが存在しないことは無い。)

これで処理を終わらせる。

完璧だ!

やってみる その1

そもそもRailsからRakeタスクを発行することってできるのだろうか?
調べてみると二つの記事が見つかった。

正直、文献が少ない。
ということは、、、これはもしかしたらイバラの道かもしない。と少し思い始めた。

文献を見て試行錯誤したらやりたいことはすぐにできた。

# Rails内からRakeタスクを呼び出すために必要
require 'rake'

# タスクを読み込む必要があるため、この処理でタスクを読み込む
Rails.application.load_tasks

# Rakeタスクを実行する
Rake::Task['csv:import'].execute(:id => batch.id)

# こんな書き方はしないけど、イメージとしてこんな感じ。
redirect_to action: :csv

受け取り側はこんな感じだ。

rails g task csv

Rakeタスクを作成する。

# CSVのインポートを行うタスク
namespace :csv do
  require 'csv'

  # Railsから呼び出される特殊なタスク
  task :import => :environment do |task, args|
  	Batch.find_by_id(args[:id])
  	
  	# CSV取り込み処理(最後に完全版を載せておきます。)
  end
end

うん。完璧。

・・・ん?

結果 その1

これで、できることはできた。
できたけど、おかしい。

小さいCSVファイルでは気にならなかったが、大きいCSVファイルを渡すとすぐに分かった。
これは同一プロセス内でRakeタスクが実行されていた。
すなわち、CSVの処理が終わるまでレスポンスが返らなかったのだ。

僕は別プロセスでRakeタスクが走ると思ってたけど、どうやらそうではないらしい。
(後述する方法で別プロセスにできると思うが、この処理は色々無駄が多く管理も難しいのでやめた。)

やってみる その2

僕はここで大きな勘違いをしていた。

Rake::Task['csv:import'].execute

このexecute処理は同じプロセスとしてタスクが発行される!
では、直接システムコールを行えば問題ない!
という考えのもとから以下のコードを行う事にした。

system("cd #{Rails.root} & rake csv:import[#{batch.id}]")
redirect_to action: :csv

これに伴ってRakeタスクも一部変更した。

# CSVのインポートを行うタスク
namespace :csv do
  require 'csv'

  # Railsから呼び出される特殊なタスク
  # [:id]を付与して、第一引数に名前を付けた。
  task :import, [:id] => :environment do |task, args|
  	Batch.find_by_id(args[:id])
  	
  	# CSV取り込み処理(最後に完全版を載せておきます。)
  end
end

結果 その2

結果は同様だった。

`"cd #{Rails.root} & rake csv:import[#{batch.id}]"`
redirect_to action: :csv

と変更してみたが、それもダメだった。
どうやら僕の考え方は根本的に間違っているようだ。

別プロセスの調査

そもそもの考えが間違っているのだと気付いて、Rails内で別プロセスを発行する方法を模索してみた。

の記事を見つけた。

なるほど。

別プロセスで動かすためにはforkしなければならないという事をここで初めて知った。

やってみる 最後

実際に行ってみた。

fork do
	Process.setsid
	system("cd #{Rails.root} & rake csv:import[#{batch.id}]")
end

redirect_to action: :csv

結果 最後

ほぼ想定通りに実行できた。
問題点は一つだけで、親プロセス(Rails)が落ちるとRakeタスクも落ちてしまう事だった。

今回のケースではこれは大した問題ではないので、この方法で実施する事にした。

感想とまとめ

結果的にsidekiqとかを使ったほうが工数が少なかったかもしれない・・・と思ったのは秘密だが。
色々学ぶ事ができたのでこの方法で突き進む事にした。

また、この方法のメリット/デメリットは以下の通りである。

メリット

  • サードパーティを使わずにバックグランド処理ができる。
  • 同一サーバー内で処理が完結するため、ファイルを一時的に共有サーバーへ配置する必要がない。
  • 親プロセスが死ぬと子プロセスが死ぬので子プロセスの管理をしなくても問題が発生しない。(語弊あり)

デメリット

  • バックグランド処理の順番制御などが難しい。(CSVアップロードが頻発すると子プロセスが大量に発生してしまう。後述するが、PIDが取れるので制御不能ではない。)
  • 親プロセスに影響してしまう。(親が落ちると子プロセスも死ぬ。ただしスレッド処理ではないので、sucker_punchを使用するよりもはるかに処理速度が良い。)
  • Rakeタスクが死ぬと再起動させる方法がない。(頑張ればできるけど・・・)
  • ゴミファイルが溜まる可能性がある。(後述)

今回のケースでは年に数回しか行われない上に、失敗したら運用でやり直して貰えれば良いレベルのものだったのでデメリットよりメリットの方が大きかった。

実際のプログラム

実際に僕が使ったプログラムと手順を最後に載せておきます。
汎用的に使えるようにカスタマイズしてます。
(ただし、動作検証は行ってないので使う方は自己責任でお願いします。&コメントで指摘してくれると嬉しいです。)

コマンドも記載してますが、僕はRubyMineでコマンドを発行しているので間違いがあるかもしれません。

使用しているGem

以下のプログラムで使用しているGemは以下になります。

# バルクインサートで使用
gem 'activerecord-import'
# バッチ一覧表示で使用
gem 'kaminari'
# 0.4系は落ちる・・・。
gem 'mysql2', '0.3.19'

gemの説明は前回の記事に書いてますので良ければ参考にしてください。

バッチ制御用モデルの作成

リクエストで受け取ったCSVファイルデータをバッチ処理に受け渡したり、バッチの処理状況を画面に通知するために使用するモデルを作成。

$ rails generate model Batch

これで作成したモデルに以下を定義。

XXX_create_batches.rb
class CreateBatches < ActiveRecord::Migration
  def change
    create_table :batches do |t|
      # 処理しているバッチ名
      t.string :name, null: false
      # 処理ステータス
      t.integer :status, null: false, default: Batch::STATUS_READY
      # pid(ホスト名も入れると良いかもしれない。)
      t.string :pid
      # 画面に通知するメッセージ
      t.text :message
      # Rakeタスク内で絶対パスを取得するために使用
      t.text :rails_root_path

      t.timestamps null: false
    end
  end
end

モデルの中身はこんな感じです。

app/models/batch.rb
# バッチの進捗状況を管理します。
class Batch < ActiveRecord::Base
  # バッチ名の定義
  # modelの部分にActiveRecodeのモデルクラスを定義する事で汎用的に使用できます。
  TYPE = {"csv_user" => {
                    name: "ユーザー取り込みバッチ",
                    model: User},
                "csv_category" => {
                    name: "カテゴリー取り込みバッチ",
                    model: Category}
  }

  #####
  # バッチのステータス
  #

  ## 開始前
  STATUS_READY = 0
  ## 処理中
  STATUS_PROCESSING_VALID = 1   # 検証中
  STATUS_PROCESSING_IMPORT = 2  # インポート中
  ## この値より大きい場合は処理終了とみなすための値
  STATUS_FINISH = 5
  ## 異常処理終了
  STATUS_FAIL = 8
  ## 正常処理終了
  STATUS_SUCCESS = 9

  # 処理終了時に保存している管理ファイルを削除する。
  after_save do
    # 処理が終了しているか判断
    if self.status > STATUS_FINISH
      # ファイルを削除する。
      begin
        File.unlink file_path
      rescue
        # 同一サーバーで処理をしているので、ファイルが無い事は無い。
        # ただし、念のため例外が発生しないようにしている。
      end
    end
  end

  # 「1/サーバー台数」の確率になるが、やらないよりマシなのでバッチがキャンセルされる時は指定のファイルを削除する。
  after_destroy do
    begin
      File.unlink file_path
    rescue
      # 別のサーバーにある場合はファイルを消せない。(通常運用をしている場合は管理しているファイルは削除されているはずなので、消せない場合はあまりないと割り切って諦める。)
    end
  end

  # CSVインポートで使用するモデルを取得します。
  def self.use_model(batch_type)
    # 指定のタスクが存在するかチェックします。
    raise "[csv_#{batch_type}] task is nothing!" if TYPE["csv_#{batch_type}"].nil?

    TYPE["csv_#{batch_type}"][:model]
  end

  # バッチの処理を行う。
  def self.csv_import_run(batch_type, file)
    # 形式チェックで使用。
    Batch.get_csv_model(batch_type)

    # バッチクラスを生成します。
    # Rake処理中にも絶対パスが取得できるようにRailsRootパスを保存しておく。(念のため)
    batch = Batch.new(name: "csv_#{batch_type}", rails_root_path: Rails.root, message: "CSVファイル読み込み中...")
    batch.save

    # バックグランドへ受け渡すためにファイルを保存します。
    batch.save_file file

    # Rakeタスクを別プロセスで実行させる。
    # 2度保存する処理が少し無駄だが、CSVアップロードをプロセス内で行うよりマシ。
    batch.pid = Batch.system_command("cd #{Rails.root} & rake csv:import[#{batch.id}]")
    batch.save
  end

  # バッチ名を取得
  def show_name
    NAME[self.name][:name]
  end

  # バッチのステータスを返却
  def show_status
    case self.status
      when STATUS_READY
        "取り込み準備中"
      when STATUS_PROCESSING_VALID
        "CSVデータ検証中"
      when STATUS_PROCESSING_IMPORT
        "CSV取り込み中"
      when STATUS_FAIL
        "取り込み処理失敗"
      when STATUS_SUCCESS
        "取り込み完了"
      else
        "不明な処理ステータス"
    end
  end

  # 保存するファイルパスを返却します。
  def save_file(file)
    # ファイルを書き込みます。
    File.open(self.file_path, 'wb') do|f|
      f.write(file.read)
    end
  end

  # このバッチが管理するファイルパスを返却
  # idで管理させるため、Batchレコードを作成後に呼び出す事
  def file_path
    # パスの定義
    path = "#{self.rails_root_path}/tmp/batch"

    # ファイルパスがない場合は作成する。
    FileUtils.mkdir_p(path) unless FileTest.exist?(path)

    # ファイルパスにIDを付けて返却
    "#{path}/#{self.id}"
  end

  # 処理が終了しているか?
  def finish?
    # 処理実行時間から1日以上経っている場合は処理終了とみなす。
    return true if self.updated_at < DateTime.now - 1.day
    # 終了ステータスの場合は処理終了とみなす。
    self.status > STATUS_FINISH
  end

  private

  # システムコマンドを別プロセスで実施します。
  def self.system_command(command)
    # 念のためログを出力
    Rails.logger.info command

    # 別プロセスで実行させる。
    pid = nil
    fork do
      Process.setsid
      fork do
        # PIDの取得
        pid = `#{command} 2>&1 & echo $!`.lstrip.chomp
        if pid =~ /[0-9]/
          Rails.logger.info "csv import rake task pid = #{pid}"
        else
          Rails.logger.error 'command has not pid'
        end
      end
    end
    # PIDの返却(僕はpidをログに出力までしかしてません。)
    pid
  end
end

インポートされるCSVとモデル

インポートされるCSVのイメージは以下のような感じです。

"account","name","memo"
"test1","テスト太郎","このメモは取り込みたくない。"
...

というCSVをアップロードする場合は以下のモデルを定義します。

app/models/user.rb
# ユーザーデータ
class User < ActiveRecord::Base
  # validate
  validates :account, :name, presence: true

  # 使用カラムを取得します。
  def self.use_columns
    # シンボルは不可(memoは取り込み対象から外す。)
    ["account", "name"]
  end

  # PKカラムを返却します。(同じデータがある場合は上書きをするため)
  def self.pk_columns
    # シンボル可
    [:account]
  end

  # CSVデータの検証を行います。
  def csv_valid?(params)
    begin
      self.attributes = params
    rescue => e
      logger.error("user import error:#{e}")
    end
    self.valid?
  end
end

呼び出し側の処理(コントローラー)

一応Viewも載せておきます。

<%= form_tag csv_user_path, multipart: true do %>
  <div class="form-group">
    <label for="input_file">ユーザーデータ</label>
    <%= file_field_tag :file %>
    <p class="help-block">ex) user.csv</p>
  </div>
  <button type="submit" class="btn btn-default">アップロード</button>
<% end %>

これを適当なコントローラで拾ってください。

$ rails generate controller Csv

処理の流れは大体以下のようになります。

app/controllers/csv_controller.rb
class CsvController < ApplicationController
  # 実際は汎用的に使っているため、幾つか抽象化するための変数があります。
  def user_csv_upload
    # 汎用的に呼び出されるため事前に。
    file = params[:file]
    batch_type = :user
    return flash[:error] = "ファイルを選択してください。" unless file

    # 一行目のデータをチェックする場合は以下の処理を行います。
    # バッチ処理でも同じ事をするので冗長の場合はmodule化して使ってください。
    begin
      # モデルを取得します。
      model = Batch.use_model(batch_type)

      open(file.path, 'r:utf-8:utf-8', undef: :replace) do |f|
        csv = ::CSV.new(f, :headers => :first_row)
        csv.each do |row|
          next if row.header_row?

          # CSVの行情報をHASHに変換
          table = Hash[[row.headers, row.fields].transpose]

          # 指定パラメータのデータのみを対象とします。
          params = table.to_hash.slice(*model.use_columns)

          # 対象データを取得します。(ここでパラメータを受け渡してもOK)
          instance = model.new

          # データの整合性チェックをします。
          # ※各モデル毎にvalid?の仕組みが違う場合はcsv_valid?を使用する事をお勧めします。
          unless instance.csv_valid?(params)
            # 失敗した場合は処理を抜けます。
            return flash[:error] = "ファイル形式が違います。#{instance.errors.full_messages}"
          end

          # 2行目以降のデータは取り込まない。
          break
        end
      end
    rescue => e
      return flash[:error] = "CSV取り込み処理でエラーが発生しました。#{e.message}"
    end

    # 処理をバッチに委ねます。
    Batch.csv_import_run(batch_type, file)
    flash[:info] = "CSVの取り込みを開始します。進捗状態は処理一覧で確認できます。"
    
    # 適当なURLにリダイレクトします。
    redirect_to action: :user
  end
end

バッチ内部の処理

Rakeタスクを作成します。

$ rails g task csv

中身を作成します。

lib/tasks/csv.rake
# CSVのインポートを行うタスク
namespace :csv do
  # CSVの処理を行うので`require`しておく。(Rails内部で行う場合は不要だが、Rakeで行う場合は必要)
  require 'csv'

  # Railsから呼び出される。
  task :import, [:id] => :environment do |task, args|
    # 読み込みデータ数
    total_count = 0

    # ファイルを取り込み開始します。
    batch = Batch.find_by_id(args[:id])
    batch.status = Batch::STATUS_PROCESSING_VALID
    batch.message = "#{total_count}件〜#{total_count+100}件目のチェックを行ってます。"
    batch_status.save

    # 使用するモデルを取得します。(設計ミス。use_modelが使えない。)
    model = Batch::TYPE[batch.name][:model]

    # CSVデータを読み込みます。
    begin
      open(batch.file_path, 'r:utf-8:utf-8', undef: :replace) do |f|
        data_list = []
        csv = CSV.new(f, :headers => :first_row)
        csv.each do |row|
          next if row.header_row?

          # CSVの行情報をHASHに変換
          table = Hash[[row.headers, row.fields].transpose]

          # パラメータを取得します。
          params = table.to_hash.slice(*model.use_columns)

          # 対象データを取得します。
          instance = model.new

          # データの検証を行います。
          unless instance.csv_valid?(params)
            # 失敗した場合は処理を抜けます。
            batch.message = "#{total_count+1}件目でエラーが発生しました。#{instance.errors.full_messages}"
            batch.status = Batch::STATUS_FAIL
            batch.save
            return
          end

          # 保存するデータを追加します。
          data_list << instance

          # 取り込み件数を増やします。
          total_count += 1

          # チェック件数が10000件を超えるたびにメッセージを更新する。
          if total_count % 10000 == 0
            batch.status = Batch::STATUS_PROCESSING_IMPORT
            batch.message = "CSVの取り込みを行ってます。"
            batch.save
            model.import data_list, on_duplicate_key_update: model.pk_columns, validate: false
            data_list = []
            batch.status = Batch::STATUS_PROCESSING_VALID
            batch.message = "#{total_count}件〜#{total_count+100}件目のチェックを行ってます。"
            batch.save
            # 処理を占有しないように1秒まつ。(この方法で良いのだろうか・・・)
            sleep(1)
          end
        end

        # DB登録処理を行う。
        batch.status = Batch::STATUS_PROCESSING_IMPORT
        batch.message = "CSVの取り込みを行ってます。"
        batch.save
        model.import data_list, on_duplicate_key_update: model.pk_columns, validate: false
      end

      # CSV取り込み終了
      batch.status = Batch::STATUS_SUCCESS
      batch.message = "取り込み件数[#{total_count}]を取り込み完了しました。"
      batch.save
    rescue => e
      batch.status = Batch::STATUS_FAIL
      batch.message = "CSV取り込み処理でエラーが発生しました。#{e.message}"
      batch.save
    end
  end
end

バッチの進行状況

Batchモデルの中身を表示すれば、バッチの状況がわかるようになってます。

@batches = Batch.page(params[:page]).order(status: :asc, updated_at: :desc)

Viewは以下のような感じです。

<%= paginate(@batches) %>
<table class="table">
  <thead>
  <tr>
    <th>バッチ種別</th>
    <th>ステータス</th>
    <th>状況</th>
    <th>取り込み開始日</th>
    <th>最終処理更新日</th>
    <th>操作</th>
  </tr>
  </thead>
  <tbody>
  <% @batches.each { |batch| %>
      <tr>
        <td><%= batch.show_name %></td>
        <td><%= batch.show_status %></td>
        <td><%= batch.message %></td>
        <td><%= batch.created_at.strftime("%Y/%m/%d %H:%M:%S") %></td>
        <td><%= batch.updated_at.strftime("%Y/%m/%d %H:%M:%S") %></td>
        <td><%= batch.finish? ? link_to("削除", admin_csv_batch_destroy_path(batch_status.id), method: :delete) : "" %></td>
      </tr>
  <% } %>
  </tbody>
</table>

まとめ

・・・という長いコードを読んで、ここまで読む人はいないと思うけど。

巨大なCSVをバックグランド処理は長々書いているけど、実際は半日くらいで出来たので良しとしよう。
(バッチサーバーとかを用意するよりはコスト安だと思う。)

プロセスの事などを知れたので勉強になったので良しとしよう。
(正しい理解かどうかはわかりませんが・・・)

あとはCSVの取り込み処理は以外とあるけど、ここに書いた方法が一番スマートな気がする。
上記の方法は数種類のタイプに対応したコードを書いたので、長くなったけど一つのファイルだけだったらもっと短くなると思う。

追記

ローカル環境でテストをしていると、メインプロセスの応答(リクエスト/レスポンス)が遅くなる場合があるので、forkの処理を変更しました。
また、rakeタスク中にsleepを入れました。

今回の場合はある程度占有されていても問題がない(年に数回しか実行されないので)のですが、頻繁にこのような処理を実行する場合はやはり別プロセスもしくは別サーバーにした方が無難ですね・・・。

(2015/11/17)追記

WEBrick(ローカル)で行っているときには問題にならなかったが、サーバーで行ったときには定期的にデータをインポートしないとrakeタスクが突然死した。
なので、定期的にデータをimportするように処理を変更した。
(ちなみに、定期的にimportしたほうが処理が早かった。※10000件くらいを目安にすると良いと思う。)

また、結局rakeタスクをunicorn内部で持たせるのは微妙だったため最終的にwheneverにしてしまった。(上記の記事には書いてません。)

以上!

50
48
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
50
48

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?