Active Jobについて

  • 17
    いいね
  • 0
    コメント
この記事は最終更新日から1年以上が経過しています。

非同期処理とは

必ず時間のかかってしまう重い処理もしくは、時間のかかる可能性のある処理を「あとでやる処理リスト」に登録する。

ざっくり手順

1.非同期で実行したい処理をジョブとして登録
2.ワーカ(ジョブを処理するプログラム)が登録されたジョブを実行
3.(必要であれば)成功、失敗の記録、失敗時のリトライといったジョブ実行の後処理を行う

色々ある非同期処理ライブラリ

Resque

・特徴

・ジョブを持つキューを複数持てる。
・特定の種類のジョブに優先度をつけることができる。
・ジョブデータの保存場所:Redis

・ジョブの登録から実行まで

ジョブの処理を登録

export_user_items.rb
class ExportUserItems
  @queue = :high #ジョブの優先度を設定
  def self.perform(user_id)
    user = User.find_by(id: user_id)
    user.export_items  if user
  end
end

ジョブの実行を登録

Resque.enqueue(ExportUserItems, user_id)

ワーカを立ち上げる

QUEUE=* rake resque:work # *と記述すると全てのジョブを実行
QUEUE=high,low #優先度順に実行

Sidekiq

・特徴

・スレッド処理により省メモリで実行
・ジョブデータの保存場所:Redis

・ジョブの登録から実行まで

ジョブの処理を登録

export_user_items.rb
class ExportUserItems
  include Sidekiq::Worker #Workerモジュールをinclude
  sidekiq_options queue: :high

  def perform(user_id)
    user = User.find_by(id: user_id)
    user.export_items if user
  end
end

ジョブの実行

ExportUserItems.perform_async(user_id)

ワーカを立ち上げる

bundle exec sidekiq

Rails 4.2から標準搭載となったActive Jobってなんぞや

色々ある非同期ライブラリに対して同じインターフェースを提供するライブラリです。

あくまでアダプター的な役割しか果たさない。
Active Job単体での非同期処理は出来ません。

サポートしている非同期処理ライブラリ

・Resque
・Sidkiq
・Delayed Job

Active Jobを通してResqueを使用する

environment.rb
ActiveJob::Base.queue_adapter = :resque

以下のコマンドでJobクラスを作成できます。
rails g job test

ジョブクラスの作成

app/jobs/test_job.rb
class TestJob < ActiveJob::Base
  queue_as :default

  def perform(*args)
    # Do something later
  end
end

ジョブの実行登録

test.perform_later user
test.set(wait_until: Date.tomorrow).perform_ later #時間を指定する場合

Global ID

Active Jobを使うと以下のように#ActiveRecordのオブジェクトを直接引数に渡すことができます。

test_job.rb
class TestJob < ActiveJob::Base
  queue_as :default

  def perform(*args)
    # Do something later
  end
end

ResqueやSidekiqはジョブ内でレコードオブジェクトを使用する場合、以下のステップを踏みます。
1.idを引数として渡して、Jsonにシリアライズし、Redisに保存。
2.ジョブクラス内で、レコードを取得(find_byなどを使って)

def self.perform(user_id)
  user = User.find_by(id: user_id)
  user.export_items if user
end

Active Jobでは、引数として渡させれたオブジェクトがActive Recordのインスタンスだった場合、Global IDという、モデルオブジェクトを識別するURIに変換し、それをジョブデータとして保存します。

Global IDとは、 アプリケーション名、モデルのクラス名、idの値 を使い、モデルオブジェクトをグローバルに識別できるURIのことです。

ちなみに、.to_global_idメソッドでモデルオブジェクトをGlobal IDに変換することができます。
逆に、Global IDからモデルオブジェクトに変換する場合は、GlobalID::Locator.locate Global_IDを実行します。

また、Global IDでは、一定期間を超えると無効になる Signed Global ID を生成することも可能です。デフォルトでは1ヶ月後に無効になるように設定されています。
Signed Global IDを生成する場合は、.to_signed_global_id(expires_in: 期間)で生成することができます。

まとめ

・Active Jobを通してResqueやSidekiqなどの非同期処理ライブラリを利用することで、ジョブの登録や実行を共通のインターフェースに統一できるので、書きやすい&可読性も向上。
・Global IDにより、セキュアでかつ効率よくデータを取得できるようになった。