Ruby
Rails
Resque
sidekiq
ActiveJob
More than 3 years have passed since last update.


非同期処理とは

必ず時間のかかってしまう重い処理もしくは、時間のかかる可能性のある処理を「あとでやる処理リスト」に登録する。


ざっくり手順

1.非同期で実行したい処理をジョブとして登録

2.ワーカ(ジョブを処理するプログラム)が登録されたジョブを実行

3.(必要であれば)成功、失敗の記録、失敗時のリトライといったジョブ実行の後処理を行う


色々ある非同期処理ライブラリ


Resque


・特徴

・ジョブを持つキューを複数持てる。

・特定の種類のジョブに優先度をつけることができる。

・ジョブデータの保存場所:Redis


・ジョブの登録から実行まで

ジョブの処理を登録


export_user_items.rb

class ExportUserItems

@queue = :high #ジョブの優先度を設定
def self.perform(user_id)
user = User.find_by(id: user_id)
user.export_items if user
end
end

ジョブの実行を登録

Resque.enqueue(ExportUserItems, user_id)

ワーカを立ち上げる

QUEUE=* rake resque:work # *と記述すると全てのジョブを実行

QUEUE=high,low #優先度順に実行


Sidekiq


・特徴

・スレッド処理により省メモリで実行

・ジョブデータの保存場所:Redis


・ジョブの登録から実行まで

ジョブの処理を登録


export_user_items.rb

class ExportUserItems

include Sidekiq::Worker #Workerモジュールをinclude
sidekiq_options queue: :high

def perform(user_id)
user = User.find_by(id: user_id)
user.export_items if user
end
end


ジョブの実行

ExportUserItems.perform_async(user_id)

ワーカを立ち上げる

bundle exec sidekiq


Rails 4.2から標準搭載となったActive Jobってなんぞや

色々ある非同期ライブラリに対して同じインターフェースを提供するライブラリです。

あくまでアダプター的な役割しか果たさない。

Active Job単体での非同期処理は出来ません。


サポートしている非同期処理ライブラリ

・Resque

・Sidkiq

・Delayed Job


Active Jobを通してResqueを使用する


environment.rb

ActiveJob::Base.queue_adapter = :resque


以下のコマンドでJobクラスを作成できます。

rails g job test


ジョブクラスの作成


app/jobs/test_job.rb

class TestJob < ActiveJob::Base

queue_as :default

def perform(*args)
# Do something later
end
end



ジョブの実行登録

test.perform_later user

test.set(wait_until: Date.tomorrow).perform_ later #時間を指定する場合


Global ID

Active Jobを使うと以下のように#ActiveRecordのオブジェクトを直接引数に渡すことができます。


test_job.rb

class TestJob < ActiveJob::Base

queue_as :default

def perform(*args)
# Do something later
end
end


ResqueやSidekiqはジョブ内でレコードオブジェクトを使用する場合、以下のステップを踏みます。

1.idを引数として渡して、Jsonにシリアライズし、Redisに保存。

2.ジョブクラス内で、レコードを取得(find_byなどを使って)

def self.perform(user_id)

user = User.find_by(id: user_id)
user.export_items if user
end

Active Jobでは、引数として渡させれたオブジェクトがActive Recordのインスタンスだった場合、Global IDという、モデルオブジェクトを識別するURIに変換し、それをジョブデータとして保存します。

Global IDとは、 アプリケーション名、モデルのクラス名、idの値 を使い、モデルオブジェクトをグローバルに識別できるURIのことです。

ちなみに、.to_global_idメソッドでモデルオブジェクトをGlobal IDに変換することができます。

逆に、Global IDからモデルオブジェクトに変換する場合は、GlobalID::Locator.locate Global_IDを実行します。

また、Global IDでは、一定期間を超えると無効になる Signed Global ID を生成することも可能です。デフォルトでは1ヶ月後に無効になるように設定されています。

Signed Global IDを生成する場合は、.to_signed_global_id(expires_in: 期間)で生成することができます。


まとめ

・Active Jobを通してResqueやSidekiqなどの非同期処理ライブラリを利用することで、ジョブの登録や実行を共通のインターフェースに統一できるので、書きやすい&可読性も向上。

・Global IDにより、セキュアでかつ効率よくデータを取得できるようになった。