はじめに
SQSを使用した際のメモを、わかりやすくまとめてみました。
pythonのコードも載せたので、ぜひ活用してみて下さい。
SQS
Amazon Simple Queue Service (SQS) は、完全マネージド型のメッセージキューイングサービスで、マイクロサービス、分散システム、およびサーバーレスアプリケーションの切り離しとスケーリングが可能です。
使用するメリットとして、キューとしての非同期処理の実現に加え、AWSの持つ高い障害耐性と処理性能を得ることができます。
上司に説明する際は、この公式URLをみて貰えばいいと思います。
ユースケース
キューは単体で使用するものではないため、どのような場合で使用するのかに一番関心が置かれます。
基本的に以下のパターンに合致した場合、一度SQS使用の検討を行って下さい。
SQSを採用することで、処理の信頼性・処理速度の向上が望めます。
<パターン>
・一回の処理で許容する処理時間を超過しそうな場合
・一度に大量のデータ件数を処理する場合
・一つの処理内部でなんでもやろうとしている場合(※1
※1)
この場合、SNSというサービスと組み合わせて使用することになります。
種類
SQSには2種類のサービスが提供されているため、各使用パターンをを押さえておきましょう。
標準キュー
メリット
無制限のスループット:ほぼ無制限のトランザクション(TPS)をサポートします。
デメリット
ベストエフォート型の順序付け:メッセージが送信されたときと異なる順序で配信されることがあります。
少なくとも1回の配信:メッセージは少なくとも1回は確実に配信されますが、複数のメッセージのコピーが配信されることもあります。
FIFOキュー
メリット
高スループット:デフォルトでは、FIFOキューは毎秒最大300件のトランザクション(TPS)をサポートします。
先入れ先出し配信:メッセージが送信または受信された順序が厳密に保持されます (先入れ先出し)。
デメリット
標準キューよりTPSが下がる
まとめると
キューの処理が以下の場合は標準キューを採用、それ以外はFIFOキューを採用
・順序性に依存しない
・処理の重複を許容できる
設定のための用語解説
作成に入る前に、簡単に設定時の用語を説明します。
可視性タイムアウト
設定した期間、重複したメッセージ配信が行われないようにする機能
ベストプラクティスとして、後続処理時間+アルファを設定すると良い
シュートポーリングとロングポーリング
キューからのメッセージ取得(サブスクライブ)の際、対象のキューがない場合、
一定時間キューに新規登録されるメッセージを待機かどうかの機能
待機を行うことをロングポーリング、待機を行ないことをシュートポーリングと言う
「メッセージ受信待機時間」にて設定
デットレターキュー
バグったデータを別キューに格納する機能
Cloudwatchと連携し、デバッグに使用できる。
「最大受信数」にて設定したメッセージを受信数後、デッドレターキューに移動される
※最大メッセージサイズ:値は 1~256 KB の間である必要があります
暗号化
AWS Key Management Service (AWS KMS)を使用して、データを送信する機能
手順
Lambda(Python3)を使用したサンプルを作ってみましょう。
1.SQSを作成
→標準キューで設定はデフォルトのまま作成
キュー名はsampleQ
2.Lambda作成(SQS登録処理)
→pythonで一から作成
実行ロールにAmazonSQSFullAccessを追加
以下のソースを貼り付ける
import json
import boto3
def lambda_handler(event, context):
name = 'sampleQ'
sqs = boto3.resource('sqs')
try:
# キューの名前を指定してインスタンスを取得
queue = sqs.get_queue_by_name(QueueName=name)
except:
# 指定したキューがない場合はexceptionが返却される
return {
'statusCode': 500,
'body': json.dumps('SQS_No_Exist')
}
# メッセージをキューに送信
response = queue.send_message(MessageBody ='text')
print(response)
return {
'statusCode': 200,
'body': json.dumps('Success')
}
3.Lambda作成(サブスクライブ処理)
→pythonで一から作成
実行ロールにAmazonSQSFullAccessを追加
以下のソースを貼り付ける
import json
import boto3
def lambda_handler(event, context):
name = 'sampleQ'
sqs = boto3.resource('sqs')
try:
# キューの名前を指定してインスタンスを取得
queue = sqs.get_queue_by_name(QueueName=name)
except:
# 指定したキューがない場合はexceptionが返却される
return {
'statusCode': 500,
'body': json.dumps('SQS_No_Exist')
}
# メッセージをキューから取得
response = queue.receive_messages()
# 戻りはlist(sqs.Message)型
# https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/sqs.html#SQS.Message.body
message = response[0]
print(message.body)
message.delete()
return {
'statusCode': 200,
'body': json.dumps('Success')
}
まとめ
簡単なメッセージ登録と取得処理までを実装しました。
簡単でしたね!
取得したメッセージは処理の内部で削除する必要があるのでそこだけ注意してください!