LoginSignup
0
0

More than 5 years have passed since last update.

再说 rabbitmq 续3

Posted at

路由

之前我们创建的 fanout 类型的 exchange 它会把所有的 message 都收集并发送, 而我们这里希望能够添加一个 feature 就是我们只 care 错误级别较高的 log. 我们怎么办呢.

先来回顾绑定

在之前的例子里是这样使用绑定的:

q.bind(exchange_name)

所谓绑定就是连接了 exchange 和 queue, 这样理解也就是说这个 queue 它只对这个 exchange 来的消息感兴趣. 绑定同时它可以传递另外一个参数就是: routing_key 为了不和Bunny::Exchange#publish 混淆,这里称之为绑定键. 我们这样使用它:

q.bind(exchange_name, :routing_Key => "black");

而绑定键的意义是取决于不同的 exchange 类型的.

Direct exchange

在之前例子我们希望能够将所有的消息发送给所有的 consumer. 而现在我们却想着过滤一些想要的而避免像第一个 consumer 写入所有的 log 到磁盘而避免浪费. 而之前使用的fanout 类型 exchange 它是完全不能够定义而是盲从的广播.

我们取而代之使用了direct 类型的 exchange 来取代. 而direct 背后的路由算法是很简单的.

  • 就是直接用来匹配到 queue 的绑定键和 message 的路由键的.

Kobito.Uvy9ab.png

这个例子说明上面的描述, 一个 direct 类型的 exchange 绑定了2个 queue, 第一个 queue 的绑定key 是orange, 而第二个绑定了2个 queue, 分别是blackgreeen. 这个情况下如果 message 的路由 key 是 orange 就会发送到 Q1,而 black 和 green 就会到 Q2, 其他的都会被丢弃.

多绑定

对于 rabbitmq 来说多重绑定也是完全允许的. 完全可以在 x 和 q1于 q2之间建立绑定. 这个情况下, direct 类型的 exchange 就会表现的想 fanout 广播所有的消息到匹配的 queue 上. 带有 black 路由键的 message 就会被推送到 Q1 和 Q2.

分发 log

我们这里就会使用上面提到的模型来分发 log. 我们创建一个 direct 类型的 exchange. 然后通过 log 的错误级别来作为路由键.

ch.direct("logs");
# And we're ready to send a message:

x = ch.direct("logs")
x.publish(msg, :routing_key => severity)

订阅

q = ch.queue("")
ARGV.each do |severity|
  q.bind("logs", :routing_key => severity)
end

Kobito.c1BrvA.png

总结

#!/usr/bin/env ruby
# encoding: utf-8

require "bunny"

conn = Bunny.new
conn.start

ch       = conn.create_channel
x        = ch.direct("direct_logs")
severity = ARGV.shift || "info"
msg      = ARGV.empty? ? "Hello World!" : ARGV.join(" ")

x.publish(msg, :routing_key => severity)
puts " [x] Sent '#{msg}'"

conn.close


############ The code for receive_logs_direct.rb:

#!/usr/bin/env ruby
# encoding: utf-8

require "bunny"

if ARGV.empty?
  abort "Usage: #{$0} [info] [warning] [error]"
end

conn = Bunny.new
conn.start

ch  = conn.create_channel
x   = ch.direct("direct_logs")
q   = ch.queue("", :exclusive => true)

ARGV.each do |severity|
  q.bind(x, :routing_key => severity)
end

puts " [*] Waiting for logs. To exit press CTRL+C"

begin
  q.subscribe(:block => true) do |delivery_info, properties, body|
    puts " [x] #{delivery_info.routing_key}:#{body}"
  end
rescue Interrupt => _
  ch.close
  conn.close
end
0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0