LoginSignup
0
0

More than 5 years have passed since last update.

再说 rabbitmq 续

Posted at

之前我们说到了通过几行简单的代码来创建 rabbitmq 的连接然后通过命名的方式创建了 queue 然后塞几个 message, 然后通过同样的方式取到消息. 这里我们将会创建一个 queue 在先, 然后通过程序的方式来完成一个时间序列的任务消息, 有多个角色来处理. 这个例子我们关注的是避免立即来处理那些占用资源较多的任务. 在一个 http request 中我们不可能让整个应用来对待处理的结果, 我们需要一个背后的处理进程来监视这些任务, 让有了类似的任务就会拿来处理, 而且不只是一个, 会有很多人待命的感觉.

准备

之前我们是发送了一个字符串hello world, 这里我们同样适用字符串来表示一个复杂的认为, 我们并没有如图像处理, 文本处理这样的实际任务, 但是我们装作自己很忙, 通过Kernel.sleep 来实现. 这里我们会稍作修改之前的发送消息的代码. 这里我们称之为new_task.rb

msg  = ARGV.empty? ? "Hello World!" : ARGV.join(" ")

q.publish(msg, :persistent => true)
puts " [x] Sent #{msg}"

同样接收消息的代码也需要修改, 我们称之为worker.rb

q.subscribe(:block => true) do |delivery_info, properties, body|
  puts " [x] Received #{body}"
  # imitate some work
  sleep body.count(".").to_i
  puts " [x] Done"
end

执行它们:

# shell 1
ruby -rubygems worker.rb
# shell 2
ruby -rubygems new_task.rb


循环分发消息

使用task queue 的一个好处就是它能够很容易的并发处理任务, 而随着任务的数据增减我们可以同时来操作处理任务的进程数量来实现扩展. 这里我们先假设使用2个任务处理:

然后反复就是下面的这样子了:

# shell 3
ruby -rubygems new_task.rb First message.
ruby -rubygems new_task.rb Second message..
ruby -rubygems new_task.rb Third message...
ruby -rubygems new_task.rb Fourth message....
ruby -rubygems new_task.rb Fifth message.....
#Let's see what is delivered to our workers:

# shell 1
ruby -rubygems worker.rb
# => [*] Waiting for messages. To exit press CTRL+C
# => [x] Received 'First message.'
# => [x] Received 'Third message...'
# => [x] Received 'Fifth message.....'
# shell 2
ruby -rubygems worker.rb
# => [*] Waiting for messages. To exit press CTRL+C
# => [x] Received 'Second message..'
# => [x] Received 'Fourth message....'

默认的 rabbitmq 对于所有的 worker 都是一视同仁, 也就是说所有的消息会平均发送到可用的 worker 上面.

消息的确认通知

这个过程很关键, 假设我们一个 worker 处理一个很棘手而且花时间的任务, 但是中途死掉了, 我们如何来确保这个 message 不掉呢, 这个场景下就需要告诉 rabbitmq, 我这个 worker 没有完成任务该死你应该换一个 worker 重试. 当前我们的例子里只要 worker 接受到消息就会告诉 rabbitmq 你可以从你哪里清除这个 message 了. 这样的情况下我们就用了消息的确认ack 通知, 当 worker 发送给 rabbitmq 这个消息, 接下来的处理就可以由 rabbitmq 来自由处理俄.

默认的设置是关闭这个消息 ack, 我们通过: manual_ack 来开启它. 然后当我们处理完消息就发送 ack 到 rabbitmq 哪里.

q.subscribe(:manual_ack => true, :block => true) do |delivery_info, properties, body|
  puts " [x] Received '#{body}'"
  # imitate some work
  sleep body.count(".").to_i
  puts " [x] Done"
  ch.ack(delivery_info.delivery_tag)
end

消息的可持久性

以上我们已经处理及时 consumer 死掉了也不会让消息掉, 而如果 rabbitmq 本身挂掉呢 消息还能不丢吗?
显然默认的情况下是如果 rabbitmq 消息挂掉了如果你并没有告诉它怎么办 消息会丢的. 两件事情你需要去做如果你像要消息不丢. 我们要声明 queue 和 message 他们本身是可持久的durable

ch.queue("hello", :durable=>true)

这里有个问题就是它本身没有语法毛病,但是 rabbitmq 是不允许通过不同的参数声明来作用于已经存在的配置上的. 那就使用不用的 queue 名字吧.

ch.queue("task_queue", :durable => true)

然后呢, 我们就是要把消息本身标记为可持久的. 通过: persistent 参数来实现:

x.publish(msg, :persistent => true)

公平方法

虽然之前说了 rabbitmq 本身它会确保均匀分发 message 到所有可用的 worker 上的, 但是如果 queue 的里消息处理事件比并不是一样的,加入有2个 worker 然后偶数的比较繁杂, 这样的话一个 worker 会累死的. 如何处理这样的情况呢. 那就是prefetch.

上面的情况下我们可以通过设置当前 channel 的 prefetch数量为1 来实现, 什么意思呢 就是说每次只给每一个 worker 一个 message, 只有这个 worker 把当前的任务处理完成之后才会继续分发.

来感受一下,当前的代码版本:



#!/usr/bin/env ruby
# encoding: utf-8

require "bunny"

conn = Bunny.new
conn.start

ch   = conn.create_channel
q    = ch.queue("task_queue", :durable => true)

msg  = ARGV.empty? ? "Hello World!" : ARGV.join(" ")

q.publish(msg, :persistent => true)
puts " [x] Sent #{msg}"

sleep 1.0
conn.close
(new_task.rb source)

#
# #### And our worker.rb:##########
#

#!/usr/bin/env ruby
# encoding: utf-8

require "bunny"

conn = Bunny.new
conn.start

ch   = conn.create_channel
q    = ch.queue("task_queue", :durable => true)

ch.prefetch(1)
puts " [*] Waiting for messages. To exit press CTRL+C"

begin
  q.subscribe(:manual_ack => true, :block => true) do |delivery_info, properties, body|
    puts " [x] Received '#{body}'"
    # imitate some work
    sleep body.count(".").to_i
    puts " [x] Done"
    ch.ack(delivery_info.delivery_tag)
  end
rescue Interrupt => _
  conn.close
end


0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0