路由
之前我们创建的 fanout 类型的 exchange 它会把所有的 message 都收集并发送, 而我们这里希望能够添加一个 feature 就是我们只 care 错误级别较高的 log. 我们怎么办呢.
先来回顾绑定
在之前的例子里是这样使用绑定的:
q.bind(exchange_name)
所谓绑定就是连接了 exchange 和 queue, 这样理解也就是说这个 queue 它只对这个 exchange 来的消息感兴趣. 绑定同时它可以传递另外一个参数就是: routing_key
为了不和Bunny::Exchange#publish
混淆,这里称之为绑定键. 我们这样使用它:
q.bind(exchange_name, :routing_Key => "black");
而绑定键的意义是取决于不同的 exchange 类型的.
Direct exchange
在之前例子我们希望能够将所有的消息发送给所有的 consumer. 而现在我们却想着过滤一些想要的而避免像第一个 consumer 写入所有的 log 到磁盘而避免浪费. 而之前使用的fanout
类型 exchange 它是完全不能够定义而是盲从的广播.
我们取而代之使用了direct
类型的 exchange 来取代. 而direct 背后的路由算法是很简单的.
- 就是直接用来匹配到 queue 的绑定键和 message 的路由键的.
这个例子说明上面的描述, 一个 direct 类型的 exchange 绑定了2个 queue, 第一个 queue 的绑定key 是orange
, 而第二个绑定了2个 queue, 分别是black
和greeen
. 这个情况下如果 message 的路由 key 是 orange 就会发送到 Q1,而 black 和 green 就会到 Q2, 其他的都会被丢弃.
多绑定
对于 rabbitmq 来说多重绑定也是完全允许的. 完全可以在 x 和 q1于 q2之间建立绑定. 这个情况下, direct 类型的 exchange 就会表现的想 fanout 广播所有的消息到匹配的 queue 上. 带有 black 路由键的 message 就会被推送到 Q1 和 Q2.
分发 log
我们这里就会使用上面提到的模型来分发 log. 我们创建一个 direct 类型的 exchange. 然后通过 log 的错误级别来作为路由键.
ch.direct("logs");
# And we're ready to send a message:
x = ch.direct("logs")
x.publish(msg, :routing_key => severity)
订阅
q = ch.queue("")
ARGV.each do |severity|
q.bind("logs", :routing_key => severity)
end
总结
#!/usr/bin/env ruby
# encoding: utf-8
require "bunny"
conn = Bunny.new
conn.start
ch = conn.create_channel
x = ch.direct("direct_logs")
severity = ARGV.shift || "info"
msg = ARGV.empty? ? "Hello World!" : ARGV.join(" ")
x.publish(msg, :routing_key => severity)
puts " [x] Sent '#{msg}'"
conn.close
############ The code for receive_logs_direct.rb:
#!/usr/bin/env ruby
# encoding: utf-8
require "bunny"
if ARGV.empty?
abort "Usage: #{$0} [info] [warning] [error]"
end
conn = Bunny.new
conn.start
ch = conn.create_channel
x = ch.direct("direct_logs")
q = ch.queue("", :exclusive => true)
ARGV.each do |severity|
q.bind(x, :routing_key => severity)
end
puts " [*] Waiting for logs. To exit press CTRL+C"
begin
q.subscribe(:block => true) do |delivery_info, properties, body|
puts " [x] #{delivery_info.routing_key}:#{body}"
end
rescue Interrupt => _
ch.close
conn.close
end