1
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

Sidekiq middlewareを書く

Last updated at Posted at 2019-02-16

Sidekiqのミドルウェアを書いてSidekiqを拡張したい人に向けた記事です。

今回の目標

Sidekiqを使ったことがある方は既にご存知だと思いますが、SidekiqはGUIの管理画面が標準で搭載されており、そこから随時Jobの情報を閲覧することができます。(ここでいう情報というのは、例えば、Jobが作られた時間、リトライ回数等々)これはSidekiqがJobに対してHashで管理されている付随情報を持っているからです。

しかしながら、Worker実行中は、そのHashに対してのアクセスは限られます。今回、Jobが持っているHashの情報を利用するために、middlewareを使って機能拡張をする必要がありました。その時の検証を記事化しておきます。

検証環境  

  • ubuntu 18.10
  • Ruby 2.5.3
  • sidekiq 5.2.5

Sidekiqのmiddlewareは、Client用とServer向けがあります。
その説明については、公式ページの情報が充実しているのでそちらをご覧いただけると良いと思います。
本記事では公式ページの内容を元にして実際にSidekiqのmiddlewareを書いて機能を拡張していきます。
https://github.com/mperham/sidekiq/wiki/Middleware

念の為リンク先を参照してない方のために、Sidekiqのmiddlewareがなにかというのを一言で説明しておきます。

Client middleware
JobがRedisにpushされる前に、そのJobに対して操作をすることができるようになります。

Server middleware
Jobが実行される前後でそのJobに対して操作をすることができるようになります。

検証環境の準備

今回は検証の為、Railsは使わずplainなRubyファイルを使います。
Redisが必要となるため、もしインストールしてなければインストールを完了し起動しておいてください。

Gemfile
source "https://rubygems.org"

gem "sidekiq"

Sidekiq gemを指定してbundle installします。

worker.rb
require "sidekiq"

#カスタムmiddlewareのクラスを作成し、callメソッドの中でyieldを定義する
class CustomAttribute
  def call(worker, job_hash, queue)
      yield
  end
end

#Client用のConfigure
Sidekiq.configure_client do |config|
  config.redis = { url: 'redis:127.0.0.1:6379' }
end

#Server用のConfigure
Sidekiq.configure_server do |config|
  config.redis = { url: 'redis:127.0.0.1:6379' }
  config.server_middleware do |chain|
    #カスタムmiddlewareをchainにaddすることでmiddlewareが実行される
    chain.add CustomAttribute
  end
end

#ユーザーが実行したいWorkerクラスを定義する
class MyWorker
  include Sidekiq::Worker

  def perform(lifting)
    case lifting
    when "heavy"
      sleep 10
      puts "重労働を実行しました"
    when "middle"
      sleep 5 
      puts "普通の作業を実行しました"
    else
      sleep 1
      puts "軽い作業を実行しました"
    end
  end
end

MyWorker自体には難しいところはなく、見た目通りです。
引数で受け取った内容によって労働の種類が変わります。

早速実行してみましょう。
Sidekiqを立ち上げます。

sidekiq -r ./worker.rb

もう1つのターミナルでirb立ち上げて、

irb -r ./worker.rb

キューにJobを3つ登録していきます。

MyWorker.perform_async("middle")
MyWorker.perform_async("middle")
MyWorker.perform_async("middle")

期待通り?普通にSidekiqを実行したときと同じ内容の出力結果を得ることが確認できました。

普通の作業を実行しました
2019-02-16T14:06:34.563Z 8795 TID-gss2qrifr MyWorker JID-baab186447c6ea89f4871a4c INFO: done: 5.0 sec
普通の作業を実行しました
2019-02-16T14:06:35.039Z 8795 TID-gss2rx16r MyWorker JID-8022ef54e720e0e66f91a39d INFO: done: 5.0 sec
普通の作業を実行しました
2019-02-16T14:06:35.446Z 8795 TID-gss2qs2lv MyWorker JID-ef7be5605c4f7225769cc074 INFO: done: 5.0 sec

そして、ここからMiddleWareの話です。
上記のCustomAttributeクラスのcallメソッドのyieldではexecute_job
つまりWorkerのperformが実行されます。

ここら辺りの処理はSidekiq::Processorクラスに定義されています。
コードを貼って置くので、確認してみてください。

    def process(work)
      jobstr = work.job
      queue = work.queue_name

      ack = false
      begin
        # Treat malformed JSON as a special case: job goes straight to the morgue.
        job_hash = nil
        begin
          job_hash = Sidekiq.load_json(jobstr)
        rescue => ex
          handle_exception(ex, { :context => "Invalid JSON for job", :jobstr => jobstr })
          # we can't notify because the job isn't a valid hash payload.
          DeadSet.new.kill(jobstr, notify_failure: false)
          ack = true
          raise
        end

        ack = true
        dispatch(job_hash, queue) do |worker|
          Sidekiq.server_middleware.invoke(worker, job_hash, queue) do
            execute_job(worker, cloned(job_hash['args']))
          end
        end
      rescue Sidekiq::Shutdown
        # Had to force kill this job because it didn't finish
        # within the timeout.  Don't acknowledge the work since
        # we didn't properly finish it.
        ack = false
      rescue Exception => ex
        e = ex.is_a?(::Sidekiq::JobRetry::Skip) && ex.cause ? ex.cause : ex
        handle_exception(e, { :context => "Job raised exception", :job => job_hash, :jobstr => jobstr })
        raise e
      ensure
        work.acknowledge if ack
      end
    end

    def execute_job(worker, cloned_args)
      worker.perform(*cloned_args)
    end

上記コード中でexecute_jobに引数で渡されている、job_hash['args']について確認します。

Redisへのenqueからdequeそして、Jobのretryと、Sidekiqでは、このhashの情報を使いまわしているのですが、基本的にWorker.performの際にはjob_hashの値はユーザーがJobを登録した時に渡した引数のみが渡されているようです。

さて、今回の目的としては、Worker内でjob_hashの値を参照することにあるので、middlewareを使ってそれを実現してみます。

worker.rb
require "sidekiq"

#カスタムmiddlewareのクラスを作成し、callメソッドの中でyieldを定義する
class CustomAttribute
  def call(worker, job_hash, queue)
      job_hash["args"] << job_hash["retry_count"] 
      yield
  end
end

#Client用のconfigure
Sidekiq.configure_client do |config|
  config.redis = { url: 'redis:127.0.0.1:6379' }
end

#Worker用のConfigure
Sidekiq.configure_server do |config|
  config.redis = { url: 'redis:127.0.0.1:6379' }
  config.server_middleware do |chain|
    #カスタムmiddlewareをchainにaddすることでmiddlewareが実行される
    chain.add CustomAttribute
  end
end

#ユーザーが実行したいWorkerクラスを定義する
class MyWorker
  include Sidekiq::Worker

  def perform(lifting, *args)
    case lifting
    when "heavy"
      sleep 10
      puts "重労働を実行しました"
      p args
      raise "error"
    when "middle"
      sleep 5 
      puts "普通の作業を実行しました"
    else
      sleep 1
      puts "軽い作業を実行しました"
    end
  end
end

重労働を実行してみてください。
このケースではjob_hash["args"]に対して再試行回数を追加することによって、
初回はnilですが、以降の再試行では数値が規定回数までカウントアップされていくretry_countの値を確認できます。

副作用は?

middlewareを使うことで、参照だけではなく、job_hashの値に対する書き換えの可能性もあり、副作用が気になる所です。

以下は、Sidekiq::Processorクラスのdispatchメソッドです。ここではJobのretry時に使われる@retrier.globalにSidekiqのJob情報を渡していますが、一旦、pristineという変数に代入される時に、Marshalで深いコピーがされているのがわかります。これにより、再試行時は編集の可能性がないjob_hashの値が使われています。

    def dispatch(job_hash, queue)
      # since middleware can mutate the job hash
      # we clone here so we report the original
      # job structure to the Web UI
      pristine = cloned(job_hash)

      Sidekiq::Logging.with_job_hash_context(job_hash) do
        @retrier.global(pristine, queue) do
          @logging.call(job_hash, queue) do
            stats(pristine, queue) do
              # Rails 5 requires a Reloader to wrap code execution.  In order to
              # constantize the worker and instantiate an instance, we have to call
              # the Reloader.  It handles code loading, db connection management, etc.
              # Effectively this block denotes a "unit of work" to Rails.
              @reloader.call do
                klass  = constantize(job_hash['class'])
                worker = klass.new
                worker.jid = job_hash['jid']
                @retrier.local(worker, pristine, queue) do
                  yield worker
                end
              end
            end
          end
        end
      end
    end

middlewareを考える時の注意点

今回はmiddlewareによってSidekiqの機能拡張実験を試みました。middlewareにより簡単に処理を追加できて素晴らしいと感じる反面、SidekiqはJobの管理をするというその性格上、実際にSidekiq本体のコードを慎重に追いながら機能追加の判断していくことが必要だと強く感じました。

1
2
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?