AMQP is a multi-channelled protocol. Channels provide a way to multiplex a heavyweight TCP/IP connection into several light weight connections. This makes the protocol more “firewall friendly” since port usage is predictable. It also means that traffic shaping and other network QoS features can be easily employed. Channels are independent of each other and can perform different functions simultaneously with other channels, the available bandwidth being shared between the concurrent activities.
理解:
AMQP本身就是一个多信道的协议,信道提供了一种方式能够在
TCP/IP
的基本上实现更加轻量级的多连接. 这样可以使连接创建和销毁一样简单.这样就很容易预测带宽和吞吐量. 他们之间是独立的. 而且能够和其他信道一起工作,可用的带宽在他们之间是共享的.
打开和关闭 channel
- open
AMQP.connect("amqp://guest:guest@dev.rabbitmq.com:5672") do |client|
AMQP::Channel.new(client) do |channel, open_ok|
# when this block is executed, channel is open and ready for use
end
end
另外创建 channel 还可以使用非 block 的形式,虽然创建channel 并不是同步的,但是之后在使用前会延迟创建.
AMQP.connect("amqp://guest:guest@dev.rabbitmq.com:5672") do |client|
channel = AMQP::Channel.new(client)
exchange = channel.default_exchange
# ...
end
上面例子中虽然 chanel 不是实时创建,但是紧接着用它来创建 exchange,是没有问题的.他们的创建完全会延迟的.
- close
AMQP.connect("amqp://guest:guest@dev.rabbitmq.com:5672") do |client|
AMQP::Channel.new(client) do |channel, open_ok|
channel.close do |close_ok|
# when this block is executed, channel is successfully closed
end
end
end
Even though in the example above channel isn’t immediately open, it is safe to declare exchanges using it. Exchange declaration will be delayed until after channel is open. Same applies to queue declaration and other operations on exchanges and queues. Library methods that rely on channel being open will be enqueued and executed in a FIFO manner when broker confirms channel opening. Note, however, that this “pseudo-synchronous mode” is easy to abuse and introduce race conditions AMQP gem cannot resolve for you. AMQP is an inherently asynchronous protocol and AMQP gem embraces this fact.
所有依赖与 channel 的方法都会以队列的形式等待 channel 是否创建完成.
这样的假同步方式可以很好的避免和减少 AMQP 中的各种竞争关系.AMQP 是异步的协议, gem AMQP
很好的强调了这个特性.
Key methods of Channel class are
Channel provides a number of convenience methods that instantiate queues
and exchanges
of various types associated with this channel:
- queue
- default_exchange
- direct
- fanout
- topic
- close
而 rabbitmq 在实现 amqp 的协议基础上有了自己的一些增强.
Examples:
- Opening a channel with a callback
# this assumes EventMachine reactor is running
AMQP.connect("amqp://guest:guest@dev.rabbitmq.com:5672") do |client|
AMQP::Channel.new(client) do |channel, open_ok| # here
# when this block is executed, channel is open and ready for use
end
end
- Instantiating a channel that will be open eventually
# this assumes EventMachine reactor is running
AMQP.connect("amqp://guest:guest@dev.rabbitmq.com:5672") do |client|
channel = AMQP::Channel.new(client)
exchange = channel.default_exchange
# ...
end
- Queue declaration with incompatible attributes results in a channel-level exception
AMQP.start("amqp://guest:guest@dev.rabbitmq.com:5672") do |connection, open_ok|
AMQP::Channel.new do |channel, open_ok|
puts "Channel ##{channel.id} is now open!"
channel.on_error do |ch, close|
puts "Handling channel-level exception"
connection.close {
EM.stop { exit }
}
end
EventMachine.add_timer(0.4) do
# these two definitions result in a race condition. For sake of this example,
# however, it does not matter. Whatever definition succeeds first, 2nd one will
# cause a channel-level exception (because attributes are not identical)
AMQP::Queue.new(channel, "amqpgem.examples.channel_exception", :auto_delete => true, :durable => false) do |queue|
puts "#{queue.name} is ready to go"
end
AMQP::Queue.new(channel, "amqpgem.examples.channel_exception", :auto_delete => true, :durable => true) do |queue|
puts "#{queue.name} is ready to go"
end
end
end
end
- Closing a channel your application no longer needs
# this assumes EventMachine reactor is running
AMQP.connect("amqp://guest:guest@dev.rabbitmq.com:5672") do |client|
AMQP::Channel.new(client) do |channel, open_ok|
channel.close do |close_ok|
# when this block is executed, channel is successfully closed
end
end
end
消息处理
在建立连接, 然后建起 channel 之后, 连接到具体的 queue 之后, 我们总算是可以处理具体的消息了.那么看看怎么办.
有这么几种方式我们可以获得 queue:
AMQP::Queue.new(channel,
"amqpgem.examples.channel_exception",
:auto_delete => true,
:durable => true) do |queue|
puts "#{queue.name} is ready to go"
# here
end
ch = AMQP::Channel.new(connection, :auto_recovery => false)
ch.prefetch(1)
queue = ch.queue(queue_name, durable: true)
consumer = queue.subscribe(ack: true) do |header, body|
# get queue status
queue.status{|number_of_messages, number_of_consumers| puts number_of_messages}
begin
message = JSON.parse(body)
puts message
header.ack # here
rescue => e
puts e.message
end
end
在建立 queue 的时候,我们可以传入这些参数:
- 当然第一个参数就是 queue 的名字了.
- passive 如果设置这个值, 服务器将不会创建
exchange
,如果不存在, 客户端可以通过这个参数来判断exchange
是否存在在不改变服务器端状态的情况下. - durable 如果设置这个值,这个
exchange
就会被设置味durable
. 这样的情况下,他会在服务器重启之后重新建立exchange
和所有的绑定. - auto_delete 如果设置这个值, 当所有的
queue
完成之后就会,这个exchange
会被删除. - exclusive 如果设置了这个值, 这个
queue
将会被这个连接专有独占. - nowait 如果设置了这个值,服务端就不会响应这个方法,客户端将不会等待这个响应方法.如果服务器没有完成这个方法,它将会报一个
channel
或者连接的异常.
看看 subscribe 方法
# @option opts [Boolean ]:ack (false)
# If this field is set to false the server does not expect acknowledgments
# for messages. That is, when a message is delivered to the client
# the server automatically and silently acknowledges it on behalf
# of the client. This functionality increases performance but at
# the cost of reliability. Messages can get lost if a client dies
# before it can deliver them to the application.
#
# @option opts [Boolean] :nowait (false)
# If set, the server will not respond to the method. The client should
# not wait for a reply method. If the server could not complete the
# method it will raise a channel or connection exception.
#
# @option opts [#call] :confirm (nil)
# If set, this proc will be called when the server confirms subscription
# to the queue with a basic.consume-ok message. Setting this option will
# automatically set :nowait => false. This is required for the server
# to send a confirmation.
#
# @option opts [Boolean] :exclusive (false)
# Request exclusive consumer access, meaning only this consumer can access the queue.
# This is useful when you want a long-lived shared queue to be temporarily accessible by just
# one application (or thread, or process). If application exclusive consumer is part of crashes
# or loses network connection to the broker, channel is closed and exclusive consumer is thus cancelled.
#
#
# @yield [headers, payload] When block only takes one argument, yields payload to it. In case of two arguments, yields headers and payload.
# @yieldparam [AMQP::Header] headers Headers (metadata) associated with this message (for example, routing key).
# @yieldparam [String] payload Message body (content). On Ruby 1.9, you may want to check or enforce content encoding.
首先,这个方法会有几个参数:
- ack 这个值如为 false,服务端就会在客户端接到消息之后直接删掉这个消息,这个方法会提供性能,但是降低服务的保证性.
- nowait 这个和 queue 里的哪个参数一致,就是服务端不会响应这个方法,客户端不会等待 reply 方法.
- confirm 如果设置这个值,服务器将会确认订阅消息是否成功.
- exclusive 独占这个 queue.
这个方法如果后跟 block, 如果只有一个参数.它会被指定给payload
.如果是2个参数,第一个是 header, 第二个是 payload.