4
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

Active Jobについて知る

Last updated at Posted at 2021-08-29

なぜ書くのか?

ユーザー登録時にslackに通知する機能を実装した際に、他の実装を真似てApplicationJobを継承した。実装自体はできたが、Active Jobの役割は非同期処理をするくらいの認識で、その仕組みについてしらないことだらけ。なので、Railsガイドに参考に分からない単語を調べたり、実際に実装してみて確かめながら理解を深めていくことにする。

Active Jobの目的

Active Jobの主要な目的は、あらゆるRailsアプリケーションにジョブ管理インフラを配置することです。これにより、Delayed JobとResqueなどのように、さまざまなジョブ実行機能のAPIの違いを気にせずにジョブフレームワーク機能やその他のgemを搭載することができるようになります。バックエンドでのキューイング作業では、操作方法以外のことを気にせずに済みます。さらに、ジョブ管理フレームワークを切り替える際にジョブを書き直さずに済みます。

railsガイドより引用

色々難しいことを書いているが、ここでは一旦Active Jobは非同期処理を実装するための機能であり、複数のgemと連携して使うこともできると理解しておく。

まずは言葉の理解から

Active jobの理解の前に頻繁にでてくる単語、Job(ジョブ)とqueue(キュー)について理解する。技術ブログ ActiveJobとはを参考

jobとは

ジョブは現実世界で例えるなら、買い物にいく、料理をする、片付けをするといったような一つ一つのタスクのようなもの。

queueとは

キューはタスクを登録するための入れ物のようなイメージ

このようにタスクを順番にqueueに登録していき、jobを順番に実行していく。このフローのことをジョブキューと呼ばれ、先に登録されたものから先に実行される。(先入れ先出し)

実装例

例えば、ユーザー登録時にメール送信とslackに通知する場合

キューを利用することで、メール送信とslack通知を待たずに、次の処理に入ることできる。またキューで登録された処理が仮に失敗しても、次の処理を行うことも可能にする。
jobとqueueについて理解できたので、下記では実際にジョブを作成しキューに登録、実行までの流れを確認していく。

簡単なJobを作成して実行して見る

  • User modelを作成し、UserJobを作成
shell
$ rails new active_job_sample
$ cd active_job_sample
$ bin/rails db:create
$ bin/rails g model user name
$ bin/rails db:migrate
#自動で下記のクラスを作成する
$ bin/rails generate job user 
class UserJob < ApplicationJob
  queue_as :default

  def perform(name="hoge")
    # 下記は追記した
    User.create!(title: name)
  end
end

performメソッドの種類について確認

  • 上記のperformは2種類の呼び出し方がある
メソッド名 概要
perform_now 同期処理: キューに入ることなく即座に実行される
perform_later 非同期処理: ジョブをキューに入れ、キューが空き次第ジョブを実行する

rails cでjobの動作確認したいときなどに、perform_nowを使うとよい!

Active Jobはキューイングのバックエンドが必要 ?

Rails自身が提供するのは、ジョブをメモリに保持するインプロセスのキューイングシステムだけ。 プロセスがクラッシュしたりコンピュータをリセットしたりすると、デフォルトの非同期バックエンドの振る舞いによって主要なジョブが失われてしまう。なので、開発環境や小規模アプリケーションの場合には、ActiveJobのみでも問題なさそうだが、本番環境でアプリケーションを運用していく場合は、キューイングのバックエンドが必要になる。

キューイングライブラリ

キューイングライブラリとして有名なのは、Delayed JobSidekiqResqueなど。これらは、Active Jobが導入される以前から、非同期処理を行うgemとして使われていたらしい。
Active Jobはどのgemに対しても、キューイングバックエンドに接続できるアダプタがビルトインで用意されている(優秀)。また、Active Jobを使用せずに上記のgemのみを使用することも可能である。今回は上記のgemの中でも最もdefaultなSidekiqを使用してみる。

Sidekiqって?

sidekiqの特長↓

項目
DB Redis
スレッド マルチスレッド
リトライ処理 あり
  • Redisとは
    • Redisは、ネットワーク接続された永続化可能なインメモリデータベース。ジョブの情報を保存している。
  • スレッドとはプログラムの一連の処理のまとまりのこと。マルチスレッドのプログラムは、複数の処理を並行して実行させることが可能。
  • Job(Worker)の中でraiseをキャッチして、自動でリトライする。
    • デフォルトの設定では21日間に25回のリトライを行う。リトライは、m2秒後、4秒後、8秒後・・・というように指数関数的に間隔をあけ、行われる。

sidekiqを利用してみる

Gemfile
gem 'sidekiq'
gem 'sinatra', require: false # ダッシュボードを利用するため
bundle install
application.rb
module ActiveJobSample
  class Application < Rails::Application
    ...
    config.active_job.queue_adapter = :sidekiq
    ...
  end
end
config/initializers/sidekiq.rb
Sidekiq.configure_server do |config|
    config.Redis = { url: 'Redis://localhost:6379'}
end

Sidekiq.configure_client do |config|
    config.Redis = { url: 'Redis://localhost:6379'}
end
sidekiq.yml
:verbose: false # 対話モード
:concurrency: 10 # worker process 数
:queues:  # 処理するキュー名
  - default
  • 大きなプロジェクトではqueueを複数に分けて、queueごとにsidekiqのプロセス(インスタンス)を分けるが、今回は動作確認程度なのでdefaultにする。
$ bundle exec sidekiq -C config/sidekiq.yml
  • http://localhost:3000/sidekiqにアクセスしてダッシュボードを確認

スクリーンショット 2021-08-28 17.24.12.png

これでようやく、sidekiqのキューにジョブを登録する準備が整った:tada:
コンソールからジョブを実行してみる

$ rails c
irb(main):001:0> UserJob.perform_later
Enqueued UserJob (Job ID: 02002483-9077-4832-963b-1058ef6534fe) to Sidekiq(default)
=> #<UserJob:0x00007ffa08f36f38 @arguments=[], @job_id="02002483-9077-4832-963b-1058ef6534fe", @queue_name="default", @priority=nil, @executions=0, @exception_executions={}, @timezone="UTC", @provider_job_id="26bcc5664da36768f743b851">

スクリーンショット 2021-08-28 17.32.57.png

完了が1になった!いい感じにsidekiqが機能し、非同期で実装できていそう!
この他にも、setメソッドを使いジョブの実行をスケジュールすることも可能

内部実装を見ていく

Activejobかキューイングライブラリsidekiqと連携するまで

module ActiveJob # :nodoc:
 ...
 class Base
  include Enqueuing
  ...
 end
 ...
end

ActiveJob::BaseActiveJob::Enqueuingをincludeしており、ActiveJob::Enqueuingにクラスメソッドとしてperform_laterメソッドが定義されている

module ActiveJob
  module Enqueuing
    extend ActiveSupport::Concern
  ...
    module ClassMethods
      def perform_later(...)
        job = job_or_instantiate(...)
        enqueue_result = job.enqueue

        yield job if block_given?

        enqueue_result
      end

      private
        def job_or_instantiate(*args) # :doc:
          args.first.is_a?(self) ? args.first : new(*args)
        end
      ...
    end
    ...
    def enqueue(options = {})
      self.scheduled_at = options[:wait].seconds.from_now.to_f if options[:wait]
      self.scheduled_at = options[:wait_until].to_f if options[:wait_until]
      self.queue_name   = self.class.queue_name_from_part(options[:queue]) if options[:queue]
      self.priority     = options[:priority].to_i if options[:priority]
      self.successfully_enqueued = false

      run_callbacks :enqueue do
        if scheduled_at
          queue_adapter.enqueue_at self, scheduled_at
        else
          queue_adapter.enqueue self
        end

        self.successfully_enqueued = true
      rescue EnqueueError => e
        self.enqueue_error = e
      end
    ...
  end
end

perform_laterの引数に何も指定しなければ定義したActiveJobがインスタンス化され、enqueueメソッドが呼び出される。enqueueメソッドで、ジョブがscheduledされていないか?やどのキューに登録するかを確認する。queue_adapterconfig.active_job.queue_adapterで指定した名前から生成されるアダプタクラスになる。つまり、今回では:sidekiq を指定しているので、下記のqueue_adapterメソッドを呼びActiveJob::QueueAdapters::SidekiqAdapterが生成されることになる。


def queue_adapter=(name_or_adapter)
  case name_or_adapter
  when Symbol, String
    queue_adapter = ActiveJob::QueueAdapters.lookup(name_or_adapter).new
    assign_adapter(name_or_adapter.to_s, queue_adapter)
  else
    if queue_adapter?(name_or_adapter)
      adapter_name = "#{name_or_adapter.class.name.demodulize.remove('Adapter').underscore}"
      assign_adapter(adapter_name, name_or_adapter)
    else
      raise ArgumentError
    end
  end
end

その後、ActiveJob::QueueAdapters::SidekiqAdapter内で、ジョブのクラス名や引数を Sidekiq::Client に投げている。

def enqueue(job) # :nodoc:
  # Sidekiq::Client does not support symbols as keys
  job.provider_job_id = Sidekiq::Client.push \
    "class"   => JobWrapper,
    "wrapped" => job.class,
    "queue"   => job.queue_name,
    "args"    => [ job.serialize ]
end

Sidekiqによるエンキューとデキュー

コードが多くなりすぎるので、テキストのみ掲載

Redisのキューにジョブの情報を登録

  • Sidekiq.client.push処理でsidekiq内の実装で、受け取ったジョブ情報をJsonに直し、ジョブIDの情報を追加して、 Redisのqueues:#{queue_name}キーに追加

Redisのキューから登録したジョブを実行する

  • bundle exec sidekiq を実行し、Sidekiq::Managerがworker作成。その後、workerはそれぞれのスレッドを起動
  • Sidekiq::Processorがメインの処理で、queueから取り出したjobを実行する
    • process_oneメソッド内で、Redisからjobの情報をdequeueする
    • processメソッドで、取得したjobから各種情報を引き出しロードし、dispatchメソッド内でjobのインスタンスを作成して、jobの処理を実行している。

感想

社内のソースコードを真似すればいい感じに実装されるといったことが多々あり、あたかも自分が簡単に複雑な機能を実装したと勘違いしてしまいやすい。今回のslack通知を実装した際も、なんか非同期になってる、なんかRedisにジョブの情報を保存してるといった曖昧な理解しかできなかった。なんとなくでその場を乗り切ったとしても、自分の実力は上がらないし知識も増えない。だから、今後も日々の業務で理解しきれなかったことを今回のようにミニマムで実際に動かしてみたり、コードリーディングを通して仕組みを理解するようにする。

参考記事

4
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
4
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?