##目標
AWS SQSを構築しメッセージをキューに投入(AWSコンソール上から投入する)、その後投入したメッセージをAWS SDKを利用してEC2から取得(ポーリング)する。
##SQSとは
アプリケーション間でやりとりされるメッセージを送受信するためのAWSが提供するキューサービスのこと。
受信側がメッセージをSQSに問い合わせてメッセージを取得するPull型のサービスです。
SQSに関する基本情報は以下記事がわかりやすくまとめられていると思いました。
Amazon SQS
##前提
・AWS SDK(AWS SDK for Rubyを本記事では利用)を使用可能なEC2インスタンスが構築済みであること(※)。
※AWS SDK for Rubyを構築した際の参考手順
【AWS SDK】EC2自動構築用スクリプト(1.AWS SDK for Rubyのセットアップ)
##作業の流れ
項番 | タイトル |
---|---|
1 | SQSの構築 |
2 | SQSにメッセージを送信 |
3 | EC2からメッセージをポーリングする |
##手順
###1.SQSの構築
①SQSコンソールを開く
②キューを作成
をクリック
③作成するキューの設定
今回はスタンダードキューを利用します(※1)
キューの名前は任意のものを記載します。
※1 SQSには以下2つのタイプのキューが存在します。特徴を簡単にまとめます。
・スタンダードキュー
⇒最低1回のメッセージ配信を保証するが、リクエストのタイミングによっては同一メッセージが複数回配信される可能性がある。
⇒メッセージ配信の順序はベストエフォート型(メッセージ送信と配信の順序が異なる可能性がある。)
・FIFOキュー
⇒同一メッセージが複数回配信されることはない。
⇒メッセージ配信の順序は先入れ先出し型(メッセージ送信と配信の順序が同一となる。)
⇒高性能なため、スタンダードキューよりも少し料金が高い。
キューの詳細設定(※2)をします。
今回はメッセージ受信待機時間
のみ20秒に設定し、それ以外はデフォルトのままとしています。
※2 以下簡単にキューの詳細設定の内容を記載致します。
・可視性タイムアウト
メッセージを受信した後に一定時間、該当のメッセージを他システムから見えなくさせる機能のこと。
可視性タイムアウトとして指定した時間内にメッセージ処理を完了させ、メッセージ削除することで、
同一メッセージに対する複数回処理を防ぐことが可能となります。
・配信遅延(遅延キュー)
キューにメッセージを送信した後、指定した時間が経過した後にメッセージを表示させるようにする機能
・メッセージ受信待機時間
ロングポーリングorショートポーリングの設定を行うことが出来ます。
デフォルトはショートポーリング(メッセージ受信待機時間が0秒)となっており、キューが空の場合でもすぐ再リクエストを行います。
SQSはメッセージのリクエスト回数によって料金が判断されるため、空メッセージのリクエストを繰り返すと料金がかさんでしまいます。
その対策として有効なのが、ロングポーリング(メッセージ受信待機時間が0秒より上)となります。
ロングポーリングを設定すると空メッセージを受け取った場合、指定した秒数分待機する(キュー接続を持続する)ため、
SQSのリクエスト回数を減らし料金を節約することが可能となります。
要件にもよりますが、基本的にはロングポーリングを使用するのがAWSの推奨となっているようです。
キューへのアクセス設定はデフォルトのままとします。
最後に暗号化、デッドレターキュー(※3)、タグの設定もありますが今回は設定なしとします。
キューの作成
をクリック
※3 デッドレターキュー
処理結果がエラーとなったメッセージを何回かリトライ後、自動的に別のキューに移動する機能
問題のあるメッセージが別キューに集中するため識別しやすくなり、当該メッセージが正規のキューに残存することを防ぎます。
###2.SQSにメッセージを送信
作成したSQSキューにAWSコンソール上からメッセージを送信してみます。
①作成したSQSキューの詳細画面からメッセージの送受信
をクリック
②メッセージ本文
を記載後メッセージ送信
をクリック
この手順を何回か繰り返します。
③送信したメッセージの内容確認
メッセージ送信欄の下段にあるメッセージ受信欄からメッセージをポーリング
をクリック
メッセージが表示されればOKです
###3.EC2からメッセージをポーリングする
①AWS SDK for Rubyが利用可能なEC2インスタンスにポーリング実行用スクリプトを配備
# **********************************************************************************
# <機能概要>
# 指定したSQSキューのメッセージをポーリングする
#
# <機能詳細>
# 常駐プロセスとして稼働し、指定したSQSキューのメッセージをポーリングし取得時刻及びメッセージ本文を標準出力する。
# 取得したメッセージは出力後削除する。
# メッセージが空の場合は指定した時間(wait_time_seconds)分待機した後、キューが空であることを伝えるメッセージを出力する。
#
# <スクリプト用法>
# ruby <スクリプトパス>
# **********************************************************************************
require 'aws-sdk'
# キュー名
queue_name = "MyTestQueue"
# SQS操作用インスタンス作成
sqs = Aws::SQS::Client.new
# キューURLの取得
begin
queue_url = sqs.get_queue_url(queue_name: queue_name).queue_url
rescue Aws::SQS::Errors::NonExistentQueue
puts "A queue named '#{queue_name}' does not exist."
exit(false)
end
loop do
# キューからメッセージを取得
receive_message_result = sqs.receive_message({
queue_url: queue_url,
message_attribute_names: ["All"], # 全属性のメッセージを取得
max_number_of_messages: 5, # 最大でも5メッセージの取得
wait_time_seconds: 20 # メッセージが空の場合は20秒待機
})
# メッセージを取得した時刻を取得
timestamp = Time.new
# メッセージが空のときはemptyメッセージ出力
if receive_message_result.messages.nil?
puts "#{timestamp.strftime("%Y-%m-%d %H:%M:%S")}: Message is empty."
end
receive_message_result.messages.each do |message|
# 取得したメッセージ本文を表示
puts "#{timestamp.strftime("%Y-%m-%d %H:%M:%S")}: #{message.body}"
# メッセージをキューから削除
sqs.delete_message({
queue_url: queue_url,
receipt_handle: message.receipt_handle
})
end
end
②スクリプト実行し、メッセージをポーリングする。
AWSコンソールから事前投入した3つメッセージは即座に処理され、メッセージが空の場合は指定した時間分待機することを確認できました。
またメッセージ待機中に新規メッセージをキューにプットした場合、そのメッセージは即座に処理されることも確認できました。
[ec2-user@basehost ~]$ ruby queue.rb
2020-07-21 04:13:32: test2 # AWSコンソールから事前投入したメッセージその1
2020-07-21 04:13:33: test3 # AWSコンソールから事前投入したメッセージその2
2020-07-21 04:13:33: test1 # AWSコンソールから事前投入したメッセージその3
2020-07-21 04:13:53: Message is empty. # wait_time_secondsで指定した時間分(20秒)待機後、emptyメッセージ出力
2020-07-21 04:14:01: This message is the message sent during the waiting time. # メッセージ待機時間中にキューにプットしたメッセージ、プット後即座に処理された。
2020-07-21 04:14:21: Message is empty.
2020-07-21 04:14:41: Message is empty.
2020-07-21 04:15:01: Message is empty.