優先度キュー(Priority Queue)とは?
- RabbitMQ3.5.0から実装された機能
- 通常MQではメッセージは入れた順番でしか取得できない(先入れ先出し)。しかし、優先度キューを使い、メッセージに優先度を設定することで、優先度の高い順に取得することができる。
環境
- Amazon Linux AMI 2016.03.3
- RabbitMQ 3.5.4
- Erlang R14B04
- bunny 2.4.0
セットアップ
RabbitMQインストール
sudo rpm --import http://www.rabbitmq.com/rabbitmq-signing-key-public.asc
sudo yum -y install http://www.rabbitmq.com/releases/rabbitmq-server/v3.5.4/rabbitmq-server-3.5.4-1.noarch.rpm
RabbitMQと一緒にerlangもインストールされる
RabbitMQ起動
sudo service rabbitmq-server start
#=>
#Starting rabbitmq-server: SUCCESS
#rabbitmq-server.
bunnyインストール
gem install bunny
優先度キューを作成して、キューにメッセージを登録する
- queue作成時に、
x-max-priority
を設定することで、優先度キューになる
publish_with_priority.rb
# -*- coding: utf-8 -*-
require "bunny"
# RabbitMQに接続
conn = Bunny.new(:host => "localhost", :vhost => "/", :user => "guest", :pass => "guest")
conn.start
# channelを作成
ch = conn.create_channel
# queue1というキューを作成
# 優先度キューを設定
q = ch.queue("queue1", :durable => true, :arguments => {"x-max-priority" => 10 })
# 優先度を設定して、queue1にメッセージを追加
q.publish("priority 5" , { :priority => 5} )
q.publish("priority 1" , { :priority => 1} )
q.publish("priority 3" , { :priority => 3} )
q.publish("priority 9" , { :priority => 9} )
q.publish("priority 10", { :priority => 10} )
q.publish("priority none" ) # 優先度設定無し
# close
conn.stop
優先度キューからメッセージを取得する
subscribe_with_priority.rb
# -*- coding: utf-8 -*-
require "bunny"
# RabbitMQに接続
conn = Bunny.new(:host => "localhost", :vhost => "/", :user => "guest", :pass => "guest")
conn.start
# channelを作成
ch = conn.create_channel
# queue1キュー取得
q = ch.queue("queue1", :durable => true, :arguments => {"x-max-priority" => 10 })
# メッセージを取得
# 取得するとキューからメッセージは削除される
q.subscribe do |delivery_info, properties, msg|
p "queue = #{q.name}, msg = #{msg}"
end
# close
conn.stop
実行結果
ruby subscribe_with_priority.rb
#=>
#"queue = queue1, msg = priority 10"
#"queue = queue1, msg = priority 9"
#"queue = queue1, msg = priority 5"
#"queue = queue1, msg = priority 3"
#"queue = queue1, msg = priority 1"
#"queue = queue1, msg = priority none"
- priorityが高い順に取得した(priorityを設定していない場合は0(最低)となる)
色々設定してみる
既にあるキューのx-max-priorityを変更した場合
# "x-max-priority" => 10で設定されているキューに対して20を設定
q = ch.queue("queue1", :durable => true, :arguments => {"x-max-priority" => 20 })
- エラーになった
/home/ec2-user/.gem/ruby/2.0/gems/bunny-2.4.0/lib/bunny/channel.rb:1927:in `raise_if_continuation_resulted_in_a_channel_error!':
PRECONDITION_FAILED - inequivalent arg 'x-max-priority' for queue 'queue1' in vhost '/': received '20' but current is '10' (Bunny::PreconditionFailed)
x-max-priorityを超えた値を設定した場合
q = ch.queue("queue1", :durable => true, :arguments => {"x-max-priority" => 10 })
q.publish("priority 10" , { :priority => 10} )
q.publish("priority 30" , { :priority => 30} )
q.publish("priority 20" , { :priority => 20} )
q.publish("priority 100", { :priority => 100} )
- 以下のようになった
"queue = queue1, msg = priority 10"
"queue = queue1, msg = priority 20"
"queue = queue1, msg = priority 30"
"queue = queue1, msg = priority 100"
同じpriorityを設定した場合
q.publish("priority 1-1", { :priority => 1} )
q.publish("priority 1-2", { :priority => 1} )
q.publish("priority 1-3", { :priority => 1} )
q.publish("priority 2-1", { :priority => 2} )
q.publish("priority 2-2", { :priority => 2} )
q.publish("priority 2-3", { :priority => 2} )
- 先に入れたメッセージを取得した
"queue = queue1, msg = priority 2-1"
"queue = queue1, msg = priority 2-2"
"queue = queue1, msg = priority 2-3"
"queue = queue1, msg = priority 1-1"
"queue = queue1, msg = priority 1-2"
"queue = queue1, msg = priority 1-3"
マイナス・小数値を設定した場合
q.publish("priority 10" , { :priority => 10} )
q.publish("priority -1" , { :priority => -1} )
q.publish("priority 9.5", { :priority => 9.5} )
q.publish("priority 1" , { :priority => 1} )
- 以下のようになった
"queue = queue1, msg = priority 10"
"queue = queue1, msg = priority -1"
"queue = queue1, msg = priority 9.5"
"queue = queue1, msg = priority 1"