Ruby
Resque

Resque 消息处理系统的入门

  #
  # job shortcuts
  #

  # This method can be used to conveniently add a job to a queue.
  # It assumes the class you're passing it is a real Ruby class (not
  # a string or reference) which either:
  #
  #   a) has a @queue ivar set
  #   b) responds to `queue`
  #
  # If either of those conditions are met, it will use the value obtained
  # from performing one of the above operations to determine the queue.
  #
  # If no queue can be inferred this method will raise a `Resque::NoQueueError`
  #
  # Returns true if the job was queued, nil if the job was rejected by a
  # before_enqueue hook.
  #
  # This method is considered part of the `stable` API.
  def enqueue(klass, *args)
    enqueue_to(queue_from_class(klass), klass, *args)
  end

可以看做是job 往指定 queue 里添加 message 的入口. 第一个参数klass 应该是具有一个@queue 的变量或者是一个queue 的方法.

  # Just like `enqueue` but allows you to specify the queue you want to
  # use. Runs hooks.
  #
  # `queue` should be the String name of the queue you're targeting.
  #
  # Returns true if the job was queued, nil if the job was rejected by a
  # before_enqueue hook.
  #
  # This method is considered part of the `stable` API.
  def enqueue_to(queue, klass, *args)
    #plugin part
    # Perform before_enqueue hooks. Don't perform enqueue if any hook returns false
    before_hooks = Plugin.before_enqueue_hooks(klass).collect do |hook|
      klass.send(hook, *args)
    end
    return nil if before_hooks.any? { |result| result == false }
    # Job part
    Job.create(queue, klass, *args)

    Plugin.after_enqueue_hooks(klass).each do |hook|
      klass.send(hook, *args)
    end

    return true
  end

这个方法和上面的方法基本一致, 只是第一个参数变成了queue 名称本身. 直接可以指定.

    # Perform before_enqueue hooks. Don't perform enqueue if any hook returns false
    before_hooks = Plugin.before_enqueue_hooks(klass).collect do |hook|
      klass.send(hook, *args)
    end
    return nil if before_hooks.any? { |result| result == false }

    ...

    Plugin.after_enqueue_hooks(klass).each do |hook|
      klass.send(hook, *args)
    end

在一个enqueue 的过程中, 会有 pre hook 和 after hook.

关于这个 hook:

lib/resque/plugin.rb

    # Given an object, returns a list `after_enqueue` hook names.
    def after_enqueue_hooks(job)
      get_hook_names(job, 'after_enqueue')
    end

    # Given an object, returns a list `before_enqueue` hook names.
    def before_enqueue_hooks(job)
      get_hook_names(job, 'before_enqueue')
    end


    @job_methods = {}
    def job_methods(job)
      @job_methods[job] ||= job.methods.collect{|m| m.to_s}
    end

    # Given an object, and a method prefix, returns a list of methods prefixed
    # with that name (hook names).
    def get_hook_names(job, hook_method_prefix)
      methods = (job.respond_to?(:hooks) && job.hooks) || job_methods(job)
      methods.select{|m| m.start_with?(hook_method_prefix)}.sort
    end


看上面的具体实现, 能够拿到有的 hook 是方法, 或者可以自定义 hook 方法的 prefix.
然后通过klass.send(hook, *args)执行.

接下来就是正儿八经的Job 阶段了.

先来大概了解下 Job 的设计目的

module Resque
  # A Resque::Job represents a unit of work. Each job lives on a
  # single queue and has an associated payload object. The payload
  # is a hash with two attributes: `class` and `args`. The `class` is
  # the name of the Ruby class which should be used to run the
  # job. The `args` are an array of arguments which should be passed
  # to the Ruby class's `perform` class-level method.
  #
  # You can manually run a job using this code:
  #
  #   job = Resque::Job.reserve(:high)
  #   klass = Resque::Job.constantize(job.payload['class'])
  #   klass.perform(*job.payload['args'])
  class Job

一个Resque::Job 包含了一系列的任务, 每一个 Job 都是对应一个特定的 queue, 而这个 job 也是会携带一个载体来完成任务, 通常就是 hash, 然后有2个属性, 一个是class 另外一个是args.

  • 这个class 其目的就是 Ruby 的 class 名称, 它可以执行这个 job.
  • args 将会作为参数给上面 job 类的perform 方法.

好, 创建一个 job

    # Creates a job by placing it on a queue. Expects a string queue
    # name, a string class name, and an optional array of arguments to
    # pass to the class' `perform` method.
    #
    # Raises an exception if no queue or class is given.
    def self.create(queue, klass, *args)
      Resque.validate(klass, queue)

      if Resque.inline?
        # Instantiating a Resque::Job and calling perform on it so callbacks run
        # decode(encode(args)) to ensure that args are normalized in the same manner as a non-inline job
        new(:inline, {'class' => klass, 'args' => decode(encode(args))}).perform
      else
        Resque.push(queue, :class => klass.to_s, :args => args)
      end
    end
  • new(:inline, {'class' => klass, 'args' => decode(encode(args))}).perform

直接new 一个 job, 然后执行其 perform 方法.

  • Resque.push(queue, :class => klass.to_s, :args => args)

通过Resquepush 方法来执行.

ok, 下一步

  #
  # queue manipulation
  #

  # Pushes a job onto a queue. Queue name should be a string and the
  # item should be any JSON-able Ruby object.
  #
  # Resque works generally expect the `item` to be a hash with the following
  # keys:
  #
  #   class - The String name of the job to run.
  #    args - An Array of arguments to pass the job. Usually passed
  #           via `class.to_class.perform(*args)`.
  #
  # Example
  #
  #   Resque.push('archive', :class => 'Archive', :args => [ 35, 'tar' ])
  #
  # Returns nothing
  def push(queue, item)
    data_store.push_to_queue(queue,encode(item))
  end

这里就是将一个 job 推送到 queue 里. queue 的名字应该是 string 或者可以序列化为 json 的 ruby 对象.

哦 这个代码有点牛逼...


module Resque
  # An interface between Resque's persistence and the actual
  # implementation.
  class DataStore
    extend Forwardable

    HEARTBEAT_KEY = "workers:heartbeat"

    def initialize(redis)
      @redis                = redis
      @queue_access         = QueueAccess.new(@redis) # ①
      @failed_queue_access  = FailedQueueAccess.new(@redis)
      @workers              = Workers.new(@redis)
      @stats_access         = StatsAccess.new(@redis)
    end

    def_delegators :@queue_access, :push_to_queue, # ②
                                   :pop_from_queue,
                                   :queue_size,
                                   :peek_in_queue,
                                   :queue_names,
                                   :remove_queue,
                                   :everything_in_queue,
                                   :remove_from_queue,
                                   :watch_queue,
                                   :list_range
               ...                    

    class QueueAccess
      def initialize(redis)
        @redis = redis
      end
      def push_to_queue(queue,encoded_item)
        @redis.pipelined do
          watch_queue(queue)
          @redis.rpush redis_key_for_queue(queue), encoded_item
        end
      end

到这里就是最后一步, 通过 redis lib 来将数据保存到 redis

    # job
    # queue
    # message -> redis list
    class QueueAccess
      def initialize(redis)
        @redis = redis
      end
      def push_to_queue(queue,encoded_item)
        @redis.pipelined do
          watch_queue(queue)
          @redis.rpush redis_key_for_queue(queue), encoded_item
        end
      end

来看看 Resque 的 worker 机制.

对于worker 的定义, 通常和 rabbitmq 等 amqp 协议实现 client 里的定义是一致的.
通常用来说明那些拿到 queue 里的 message 来进行下一步处理的对象.

目前在 Resque 里的 worker 使用, 并没有让我们来直接实现某个接口或者new 出一个 worker 的实例等.
而是默认通过rake 的 task方式来直接指定 queue 的名称已经 worker 的数量来启动 worker.

想想其实并没有意义和必要和 再次 实现 worker 了, 因为事实上我们已经定义了如何里处理 message, 那就是
在之前的代码里提到的perform 方法.

再来看看大致的流程:

  1. enqueue

    post '/' do
      # byebug
      Resque.enqueue(Job, {a: 1, b: 2}.to_s)
      redirect "/"
    end
    
  2. perform

    module Demo
      module Job
        @queue = :default
    
        def self.perform(params)
          puts params
          File.open("puts.log", 'a') { |file| file.write(params.to_s) }
    
          sleep 1
          puts "Processed a job!"
        end
      end
    

这里在enqeue 的时候通过Job 这个 class 对象就能够按约定通过@queue 来拿到它要往哪个 queue 里 feed 数据.
然后在perform 的时候就可以直接使用了.

    # This is the main workhorse method. Called on a Worker instance,
    # it begins the worker life cycle.
    #
    # The following events occur during a worker's life cycle:
    #
    # 1. Startup:   Signals are registered, dead workers are pruned,
    #               and this worker is registered.
    # 2. Work loop: Jobs are pulled from a queue and processed.
    # 3. Teardown:  This worker is unregistered.
    #
    # Can be passed a float representing the polling frequency.
    # The default is 5 seconds, but for a semi-active site you may
    # want to use a smaller value.
    #
    # Also accepts a block which will be passed the job as soon as it
    # has completed processing. Useful for testing.
    def work(interval = 5.0, &block)
      interval = Float(interval)
      startup

      loop do
        break if shutdown?

        unless work_one_job(&block)
          break if interval.zero?
          log_with_severity :debug, "Sleeping for #{interval} seconds"
          procline paused? ? "Paused" : "Waiting for #{queues.join(',')}"
          sleep interval
        end
      end

      unregister_worker
    rescue Exception => exception
      return if exception.class == SystemExit && !@child && run_at_exit_hooks
      log_with_severity :error, "Failed to start worker : #{exception.inspect}"
      unregister_worker(exception)
    end

这就是作为worker 的核心内容.

    # Processes a given job in the child.
    def perform(job)
      begin
        if fork_per_job?
          reconnect
          run_hook :after_fork, job
        end
        job.perform # here
      rescue Object => e
        report_failed_job(job,e)
      else
        log_with_severity :info, "done: #{job.inspect}"
      ensure
        yield job if block_given?
      end
    end