之前我们提到的 rabbitmq 的例子基本上都是对于 message 和 worker 来说一对一的, 也就说在 message 它的生命周期里面有一个 publisher 发出到了某个 queue 里待一会儿然后就去了一个 worker 里, 然后就没有然后了( rabbitmq 就从它哪里移除了). 而其实 rabbitmq 同时考虑到这样一个情况, 一个 message 很有价值的 message 不应该是一生只配一个 worker, 它应该有多个 worker来消费处理. 这里讲到的就是publish/subscribe
的模式.
为了演示这个模式,我们创建一个 log 系统. 它又2部分组成,一个是分发 log. 而另外一个是接收并打印 log. 在这个 log 系统里第一个接收 log 的系统都会拿到 message. 这样就可以拿到 log 被存储到硬盘. 同时我们要用另外一个 worker 来接收并在屏幕显示它.
而首先需要在 publish 消息的时候它要广播奥所有的接收者哪里.
exchange
这里先说说 rabbitmq 里的消息模型了. 开说之前我们先回顾下我们已经接触到的:
- producer 用来发消息的
- queue 用来存放消息的
- consumer 用来接收处理消息的
在 rabbitmq 的设计思想里面,对于 producer 来说, 它是永远不会直接发送消息到 queue 里面的. 甚至对于它来说它都不知道消息是要被发往 queue 的.
而取而代之的是对于它来说, 消息是只能发送到的exchange
这个东西的. 而 exchange其实是一个很简单的东西,它一方面会接受来自 producer 的消息,另一方面会将消息发往 queue. 对于 exchange 来说它要非常清楚如何处理拿到的消息. 它应该去特定的 queue 还是一些 queue 呢. 这个 rule 就是来源于 exchange 的 type
而 exchange 的 type 就是下面这些:
- direct
- topic
- headers
- fanout
我们先来说fanout
, 老样子, 建立连接, 创建 channel, 然后创建 exchange.
这个 **fanout(扇出)** 的 exchange 是非常简单的.它就是将拿到的所有 message 然后分发到所有可知的 queue 里面.
x = ch.fanout("logs")
x.publish(msg)
### 临时queue
之前我们用到了如` hello` 或者` task_queue` 诸如此类的名字来做 queue 的 name, 要知道可以给 queueu 命名也是对于我们来说很重要的. 因为我们需要在给特定名字的 queueu 里塞东西的同时还是需要连接到特定的 queue 上拿 message. 但是对于我们当下这个例子里, 我们似乎不必担心如何取名字, 而只是需要拿到所有的 log message. 接下来我们想要做的事情是这样的:
每次我们都想要连接一个干净的 queue. 这个情况下我们就可以创建一个随机的 queue name, 甚至我们都可以让 server 直接给我们一个随便的名字. 另外,一旦我们断了这个连接它应该自动删除的.
对于` bunny` 这个客户端来说,如果我们给一个 queue name 是一个空字符串,我们就可以创建一个不持久的随机的 queuename 的 queue.
q = ch.queue("", :exclusive => true)
```
这个执行之后就会返回一个随机的 queue name, 而且连接断了之后 它会被自动删除,因为我们给了它一个exclusive
为 true 值的参数设定.
绑定
我们已经创建了 fanout 类型的 exchange 和 queue. 现在我们可以让 exchange 发 message 到 queue 了. 而对于 exchange 和 queue 之间的关系我们称之为 绑定
q.bind('logs')
总结
#!/usr/bin/env ruby
# encoding: utf-8
require "bunny"
conn = Bunny.new
conn.start
ch = conn.create_channel
x = ch.fanout("logs")
msg = ARGV.empty? ? "Hello World!" : ARGV.join(" ")
x.publish(msg)
puts " [x] Sent #{msg}"
conn.close
################The code for receive_logs.rb:
#!/usr/bin/env ruby
# encoding: utf-8
require "bunny"
conn = Bunny.new
conn.start
ch = conn.create_channel
x = ch.fanout("logs")
q = ch.queue("", :exclusive => true)
q.bind(x)
puts " [*] Waiting for logs. To exit press CTRL+C"
begin
q.subscribe(:block => true) do |delivery_info, properties, body|
puts " [x] #{body}"
end
rescue Interrupt => _
ch.close
conn.close
end