LoginSignup
0
0

More than 5 years have passed since last update.

再说 rabbitmq 续2

Last updated at Posted at 2017-05-24

之前我们提到的 rabbitmq 的例子基本上都是对于 message 和 worker 来说一对一的, 也就说在 message 它的生命周期里面有一个 publisher 发出到了某个 queue 里待一会儿然后就去了一个 worker 里, 然后就没有然后了( rabbitmq 就从它哪里移除了). 而其实 rabbitmq 同时考虑到这样一个情况, 一个 message 很有价值的 message 不应该是一生只配一个 worker, 它应该有多个 worker来消费处理. 这里讲到的就是publish/subscribe 的模式.

为了演示这个模式,我们创建一个 log 系统. 它又2部分组成,一个是分发 log. 而另外一个是接收并打印 log. 在这个 log 系统里第一个接收 log 的系统都会拿到 message. 这样就可以拿到 log 被存储到硬盘. 同时我们要用另外一个 worker 来接收并在屏幕显示它.

而首先需要在 publish 消息的时候它要广播奥所有的接收者哪里.

exchange

这里先说说 rabbitmq 里的消息模型了. 开说之前我们先回顾下我们已经接触到的:

  • producer 用来发消息的
  • queue 用来存放消息的
  • consumer 用来接收处理消息的

在 rabbitmq 的设计思想里面,对于 producer 来说, 它是永远不会直接发送消息到 queue 里面的. 甚至对于它来说它都不知道消息是要被发往 queue 的.

而取而代之的是对于它来说, 消息是只能发送到的exchange 这个东西的. 而 exchange其实是一个很简单的东西,它一方面会接受来自 producer 的消息,另一方面会将消息发往 queue. 对于 exchange 来说它要非常清楚如何处理拿到的消息. 它应该去特定的 queue 还是一些 queue 呢. 这个 rule 就是来源于 exchange 的 type

Kobito.GnCuN2.png

而 exchange 的 type 就是下面这些:

  • direct
  • topic
  • headers
  • fanout

我们先来说fanout, 老样子, 建立连接, 创建 channel, 然后创建 exchange.

ch.fanout("logs")```

这个 **fanout(扇出)** 的 exchange 是非常简单的.它就是将拿到的所有 message 然后分发到所有可知的 queue 里面.

x = ch.fanout("logs")
x.publish(msg)

### 临时queue

之前我们用到了如` hello` 或者` task_queue` 诸如此类的名字来做 queue 的 name, 要知道可以给 queueu 命名也是对于我们来说很重要的. 因为我们需要在给特定名字的 queueu 里塞东西的同时还是需要连接到特定的 queue 上拿 message. 但是对于我们当下这个例子里, 我们似乎不必担心如何取名字, 而只是需要拿到所有的 log message. 接下来我们想要做的事情是这样的:

每次我们都想要连接一个干净的 queue. 这个情况下我们就可以创建一个随机的 queue name, 甚至我们都可以让 server 直接给我们一个随便的名字. 另外,一旦我们断了这个连接它应该自动删除的.

对于` bunny` 这个客户端来说,如果我们给一个 queue name 是一个空字符串,我们就可以创建一个不持久的随机的 queuename 的 queue.

q = ch.queue("", :exclusive => true)
```

这个执行之后就会返回一个随机的 queue name, 而且连接断了之后 它会被自动删除,因为我们给了它一个exclusive 为 true 值的参数设定.

绑定

我们已经创建了 fanout 类型的 exchange 和 queue. 现在我们可以让 exchange 发 message 到 queue 了. 而对于 exchange 和 queue 之间的关系我们称之为 绑定

q.bind('logs')

总结

#!/usr/bin/env ruby
# encoding: utf-8

require "bunny"

conn = Bunny.new
conn.start

ch   = conn.create_channel
x    = ch.fanout("logs")

msg  = ARGV.empty? ? "Hello World!" : ARGV.join(" ")

x.publish(msg)
puts " [x] Sent #{msg}"

conn.close

################The code for receive_logs.rb:

#!/usr/bin/env ruby
# encoding: utf-8

require "bunny"

conn = Bunny.new
conn.start

ch  = conn.create_channel
x   = ch.fanout("logs")
q   = ch.queue("", :exclusive => true)

q.bind(x)

puts " [*] Waiting for logs. To exit press CTRL+C"

begin
  q.subscribe(:block => true) do |delivery_info, properties, body|
    puts " [x] #{body}"
  end
rescue Interrupt => _
  ch.close
  conn.close
end
0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0