http://aws.typepad.com/aws_japan/2014/01/amazon-sqs-new-dead-letter-queue.html (リンク切れ WebArchiveをどうぞ http://web.archive.org/web/20161112043540/http://aws.typepad.com/aws_japan/2014/01/amazon-sqs-new-dead-letter-queue.html )
- 最大受信数を設定し、それを超える場合は専用のキューに移動する機能。
- メッセージやジョブに問題がありエラーで完了できない場合や、時間がかかり過ぎて可視性タイムアウトする場合に、同じジョブが際限なく繰り返されてしまう問題を回避、および問題を調査できる。
結論
- 最大受信数(maxReceiveCount)を超える取得のタイミングで、Dead Letter Queueへ移動する
- 最大受信数が「1」なら2度目の取得の際にメッセージをDead Letter Queueに入れ、返さない
- Dead Letter Queueに移動された後も、処理中のジョブは継続可能だが、完了後にメッセージを削除しても、Dead Letter Queueに入ったメッセージは消えない
- Dead Letter Queueに移動されたメッセージはもとと同じメッセージIDをもつ
確認ログ
受信1回だけ
/home/vagrant/.rbenv/versions/2.2.4/bin/ruby -e '$stdout.sync=true;$stderr.sync=true;load($0=ARGV.shift)' /shared/sqs_jikken/dead_letter_queue_example.rb
Delete all messages from queues.
[A] Send(d6917813-e4f2-4116-a663-9df24c92c9fc)
[A] Received.(1)
[A] 1 messages found. (Visible:0 NotVisible: 1)
[D] 0 messages found. (Visible:0 NotVisible: 0)
[A] Start
[A] 1 messages found. (Visible:0 NotVisible: 1)
[D] 0 messages found. (Visible:0 NotVisible: 0)
[A] 1 messages found. (Visible:0 NotVisible: 1)
[D] 0 messages found. (Visible:0 NotVisible: 0)
[A] 1 messages found. (Visible:0 NotVisible: 1)
[D] 0 messages found. (Visible:0 NotVisible: 0)
[A] 1 messages found. (Visible:0 NotVisible: 1)
[D] 0 messages found. (Visible:0 NotVisible: 0)
[A] 1 messages found. (Visible:0 NotVisible: 1)
[D] 0 messages found. (Visible:0 NotVisible: 0)
[A] 1 messages found. (Visible:0 NotVisible: 1)
[D] 0 messages found. (Visible:0 NotVisible: 0)
[A] 1 messages found. (Visible:0 NotVisible: 1)
[D] 0 messages found. (Visible:0 NotVisible: 0)
[A] 1 messages found. (Visible:0 NotVisible: 1)
[D] 0 messages found. (Visible:0 NotVisible: 0)
[A] 1 messages found. (Visible:0 NotVisible: 1)
[D] 0 messages found. (Visible:0 NotVisible: 0)
[A] Visibility Timeout!
[A] 1 messages found. (Visible:0 NotVisible: 1)
[D] 0 messages found. (Visible:0 NotVisible: 0)
[A] 1 messages found. (Visible:1 NotVisible: 0)
[D] 0 messages found. (Visible:0 NotVisible: 0)
[A] 1 messages found. (Visible:1 NotVisible: 0)
[D] 0 messages found. (Visible:0 NotVisible: 0)
[A] 1 messages found. (Visible:1 NotVisible: 0)
[D] 0 messages found. (Visible:0 NotVisible: 0)
[A] 1 messages found. (Visible:1 NotVisible: 0)
[D] 0 messages found. (Visible:0 NotVisible: 0)
[A] 1 messages found. (Visible:1 NotVisible: 0)
[D] 0 messages found. (Visible:0 NotVisible: 0)
[A] 1 messages found. (Visible:1 NotVisible: 0)
[D] 0 messages found. (Visible:0 NotVisible: 0)
[A] 1 messages found. (Visible:1 NotVisible: 0)
[D] 0 messages found. (Visible:0 NotVisible: 0)
[A] 1 messages found. (Visible:1 NotVisible: 0)
[D] 0 messages found. (Visible:0 NotVisible: 0)
[A] Stop
[A] Deleted
[A] 1 messages found. (Visible:1 NotVisible: 0)
[D] 0 messages found. (Visible:0 NotVisible: 0)
[A] 1 messages found. (Visible:1 NotVisible: 0)
[D] 0 messages found. (Visible:0 NotVisible: 0)
[A] 0 messages found. (Visible:0 NotVisible: 0)
[D] 0 messages found. (Visible:0 NotVisible: 0)
[A] 0 messages found. (Visible:0 NotVisible: 0)
[D] 0 messages found. (Visible:0 NotVisible: 0)
[D] Message not found.
Process finished with exit code 0
受信繰り返したとき
/home/vagrant/.rbenv/versions/2.2.4/bin/ruby -e '$stdout.sync=true;$stderr.sync=true;load($0=ARGV.shift)' /shared/sqs_jikken/dead_letter_queue_example.rb
Delete all messages from queues.
[A] Send(c699230d-2ea2-483f-b4e3-00f899cb4bc0)
[A] Received.(1)
[A] 1 messages found. (Visible:0 NotVisible: 1)
[D] 0 messages found. (Visible:0 NotVisible: 0)
[A] Start
[A] 1 messages found. (Visible:0 NotVisible: 1)
[D] 0 messages found. (Visible:0 NotVisible: 0)
[A] 1 messages found. (Visible:0 NotVisible: 1)
[D] 0 messages found. (Visible:0 NotVisible: 0)
[A] 1 messages found. (Visible:0 NotVisible: 1)
[D] 0 messages found. (Visible:0 NotVisible: 0)
[A] 1 messages found. (Visible:0 NotVisible: 1)
[D] 0 messages found. (Visible:0 NotVisible: 0)
[A] 1 messages found. (Visible:0 NotVisible: 1)
[D] 0 messages found. (Visible:0 NotVisible: 0)
[A] 1 messages found. (Visible:0 NotVisible: 1)
[D] 0 messages found. (Visible:0 NotVisible: 0)
[A] 1 messages found. (Visible:0 NotVisible: 1)
[D] 0 messages found. (Visible:0 NotVisible: 0)
[A] 1 messages found. (Visible:0 NotVisible: 1)
[D] 0 messages found. (Visible:0 NotVisible: 0)
[A] 1 messages found. (Visible:0 NotVisible: 1)
[D] 0 messages found. (Visible:0 NotVisible: 0)
[A] Visibility Timeout!
[A] 1 messages found. (Visible:0 NotVisible: 1)
[D] 0 messages found. (Visible:0 NotVisible: 0)
[A] 1 messages found. (Visible:0 NotVisible: 1)
[D] 1 messages found. (Visible:1 NotVisible: 0)
[A] 0 messages found. (Visible:0 NotVisible: 0)
[D] 0 messages found. (Visible:0 NotVisible: 0)
[A] 1 messages found. (Visible:0 NotVisible: 1)
[D] 1 messages found. (Visible:1 NotVisible: 0)
[A] 0 messages found. (Visible:0 NotVisible: 0)
[D] 0 messages found. (Visible:0 NotVisible: 0)
[A] 0 messages found. (Visible:0 NotVisible: 0)
[D] 1 messages found. (Visible:1 NotVisible: 0)
[A] 0 messages found. (Visible:0 NotVisible: 0)
[D] 1 messages found. (Visible:1 NotVisible: 0)
[A] 0 messages found. (Visible:0 NotVisible: 0)
[D] 1 messages found. (Visible:1 NotVisible: 0)
[A] 0 messages found. (Visible:0 NotVisible: 0)
[D] 1 messages found. (Visible:1 NotVisible: 0)
[A] Stop
[A] Deleted
[A] 0 messages found. (Visible:0 NotVisible: 0)
[D] 1 messages found. (Visible:1 NotVisible: 0)
[A] 0 messages found. (Visible:0 NotVisible: 0)
[D] 1 messages found. (Visible:1 NotVisible: 0)
[A] 0 messages found. (Visible:0 NotVisible: 0)
[D] 1 messages found. (Visible:1 NotVisible: 0)
[A] 0 messages found. (Visible:0 NotVisible: 0)
[D] 1 messages found. (Visible:1 NotVisible: 0)
[A] 0 messages found. (Visible:0 NotVisible: 0)
[D] 1 messages found. (Visible:1 NotVisible: 0)
[D] Message found.
Process finished with exit code 0
確認に使ったやっつけスクリプト
require 'aws-sdk'
require 'pp'
ACTIVE_QUEUE_URL = 'https://sqs.ap-northeast-1.amazonaws.com/xxxxxxxxxxxxx/active_queue'
DEAD_LETTER_QUEUE_URL = 'https://sqs.ap-northeast-1.amazonaws.com/xxxxxxxxxxxxx/dead_letter_queue'
CONTINUE_RECEIVING = false
region = ENV['AWS_REGION'] || 'ap-northeast-1'
sqs = Aws::SQS::Client.new(region: region)
sqs.purge_queue(queue_url: ACTIVE_QUEUE_URL)
sqs.purge_queue(queue_url: DEAD_LETTER_QUEUE_URL)
puts 'Delete all messages from queues.'
resp = sqs.get_queue_attributes(
queue_url: ACTIVE_QUEUE_URL,
attribute_names: %w(VisibilityTimeout)
)
visibility_timeout = resp.attributes['VisibilityTimeout'].to_i
# 1個入れる
message_id = sqs.send_message(
queue_url: ACTIVE_QUEUE_URL,
message_body: 'BODY'
).message_id
puts "[A] Send(#{message_id})"
class Status
def initialize
@complete = false
end
def complete?
@complete
end
def incomplete?
! complete?
end
def complete!
@complete = true
end
end
status = Status.new
threads = []
# 取り出す
message = nil
threads << Thread.new(sqs) do |sqs|
Thread.pass
count = 0
while status.incomplete? do
resp = sqs.receive_message(
queue_url: ACTIVE_QUEUE_URL,
max_number_of_messages: 1
)
if resp.messages[0]
puts "[A] Received.(#{count = count.succ})"
if resp.messages[0].message_id == message_id
message ||= resp.messages[0]
break unless CONTINUE_RECEIVING
end
end
sleep 1
end
end
threads << Thread.new(sqs) do |sqs|
Thread.pass
until message do
sleep 1
end
puts '[A] Start'
# 非可視時間+α待機
sleep visibility_timeout
puts '[A] Visibility Timeout!'
sleep 10
puts '[A] Stop'
# 削除
sqs.delete_message(
queue_url: ACTIVE_QUEUE_URL,
receipt_handle: message.receipt_handle
)
puts '[A] Deleted'
sleep 5
status.complete!
end
# メッセージ数を監視
threads << Thread.new(sqs) do |sqs|
Thread.pass
while status.incomplete? do
attributes = sqs.get_queue_attributes(
queue_url: ACTIVE_QUEUE_URL,
attribute_names: %w(ApproximateNumberOfMessages ApproximateNumberOfMessagesNotVisible)
).attributes
puts "[A] #{attributes['ApproximateNumberOfMessages'].to_i + attributes['ApproximateNumberOfMessagesNotVisible'].to_i} messages found. (Visible:#{attributes['ApproximateNumberOfMessages']} NotVisible: #{attributes['ApproximateNumberOfMessagesNotVisible']})"
attributes = sqs.get_queue_attributes(
queue_url: DEAD_LETTER_QUEUE_URL,
attribute_names: %w(ApproximateNumberOfMessages ApproximateNumberOfMessagesNotVisible)
).attributes
puts "[D] #{attributes['ApproximateNumberOfMessages'].to_i + attributes['ApproximateNumberOfMessagesNotVisible'].to_i} messages found. (Visible:#{attributes['ApproximateNumberOfMessages']} NotVisible: #{attributes['ApproximateNumberOfMessagesNotVisible']})"
puts ''
sleep 1
end
end
threads.each(&:join)
# Dに移動しているか確認する
resp = sqs.receive_message(
queue_url: DEAD_LETTER_QUEUE_URL,
max_number_of_messages: 1
)
message_ids = resp.messages.map(&:message_id)
if message_ids.include?(message_id)
puts '[D] Message found.'
else
puts '[D] Message not found.'
end