RabbitMQで優先度付きキューが必要だったのですが、http://www.rabbitmq.com/specification.html を見ても、2段階の優先度対応が planned の状態で結局まだ実装がない状態でした。
キューを複数用意して対応するのがよいのでは的な情報があったので、その考えをベースに自分の都合にあった優先度付きキューの実装をしたので、gem で公開しました。
https://rubygems.org/gems/bunny_priority_queue
https://github.com/kikumoto/bunny_priority_queue
言語は、ruby で bunny ライブラリに依存しています。
アイデアとしては、優先度別のキューを用意し、Consumerはすべてのキューを Subscribe して、メッセージの着信があったときに着信したキューより優先度の高いキューにメッセージないかをチェックし、あればそちらを処理、なければ着信したのを処理するようにしています。
また最優先以外のキューにTTLでタイムアウトを設定することで、キューにメッセージが一定時間滞留すると、Dead Letter Exchange 送りにして、1つ上の優先キューにメッセージを詰み直すということをしています。
インストールは Gemfile に
gem "bunny_priority_queue"
を書いて
bundle install
してください。
とりあえず Producer と Consumer で使う値をまとめておきます。別にこうである必要はないです。
なお、優先度の高い順に優先度の名前を付けておくことが必要です。
# -*- coding: utf-8 -*-
EXCHANE_NAME = "sample_exchange"
QUEUE_PREFIX = "sample_queue"
PRIORITY_HIGH = "high"
PRIORITY_NORMAL = "normal"
PRIORITY_LOW = "low"
PRIORITIES = [
PRIORITY_HIGH,
PRIORITY_NORMAL,
PRIORITY_LOW,
]
Consumer は以下のような感じです。
# -*- coding: utf-8 -*-
require 'bunny_priority_queue'
require_relative './common.rb'
begin
conn = Bunny.new.start
exchange = conn.create_channel.direct(EXCHANE_NAME)
q = BunnyPriorityQueue.create(
QUEUE_PREFIX,
PRIORITIES,
exchange
)
q.bind
q.subscribe(:block => true) do |delivery_info, properties, body|
puts "routing_key: #{delivery_info.routing_key}"
puts "properties : #{properties}"
puts "payload : #{body}"
end
ensure
conn.close
end
Producer は以下のような感じです。
# -*- coding: utf-8 -*-
require 'bunny_priority_queue'
require_relative './common.rb'
begin
conn = Bunny.new.start
exchange = conn.create_channel.direct(EXCHANE_NAME)
q = BunnyPriorityQueue.new(
QUEUE_PREFIX,
PRIORITIES,
)
exchange.publish("normal", :routing_key => q.name(PRIORITY_NORMAL))
ensure
conn.close
end
とりあえず、自分が必要な感じで作ってみたものですが、需要があれば幸いです。
まだまだ荒削り感たっぷりなので、適宜 Pull Req. などでガシガシ突っこんでください。