経緯
業務でSQSを使う必要が出てきて、調査・検証した結果意外とややこしいことがわかったので備忘録として残そうと思います。
ElasticMQ
とりあえず、ローカルで検証をおこなうためにSQSと互換のあるElastcMQを使います。
ElasticMQ
docker-composeはこんな感じ。
version: "3"
services:
elasticmq:
container_name: elasticmq
image: softwaremill/elasticmq-native
volumes:
# 設定ファイル
- ./elasticmq/custom.conf://opt/elasticmq.conf:ro
ports:
- "9324:9324"
- "9325:9325"
ちょっと特殊な設定としては、ボリュームを設定して設定ファイルをコンテナ側に読み込ませる必要があります。(別にAPIで作るなり、コンテナの中に入って作ったりもできると思いますがこの方が簡単なので)
volumes:
# 設定ファイル
- ./elasticmq/custom.conf://opt/elasticmq.conf:ro
あとは、ポートをバインドしてあげる感じです。
(9324はAPI用のポート、9325は管理画面?用のポートになります。)
ports:
- "9324:9324"
- "9325:9325"
dockerのvolumeに繋ぐようにして設定したファイルは以下(設定項目はコメントアウトしてある通りでそれぞれの詳細は追って記載)
include classpath( "application.conf" )
node-address {
protocol = http
host = localhost
port = 9324
context-path = ""
}
rest-sqs {
enabled = true
bind-port = 9324
bind-hostname = "0.0.0.0"
# Possible values: relaxed, strict
sqs-limits = strict
}
generate-node-address = false
queues {
# http://localhost:9324/queue/sample1
sample1 {
defaultVisibilityTimeout = 0 seconds # 可視性タイムアウトの設定
delay = 0 seconds # 遅延キューの設定
receiveMessageWait = 0 seconds # ロングポーリング
fifo = false #FIFOキューの設定
}
# http://localhost:9324/queue/sample2
sample2 {
defaultVisibilityTimeout = 3 seconds # 可視性タイムアウトの設定
delay = 0 seconds # 遅延キューの設定
receiveMessageWait = 0 seconds # ロングポーリング
fifo = false #FIFOキューの設定
}
# http://localhost:9324/queue/sample3
sample3 {
defaultVisibilityTimeout = 0 seconds # 可視性タイムアウトの設定
delay = 3 seconds # 遅延キューの設定
receiveMessageWait = 0 seconds # ロングポーリング
fifo = false #FIFOキューの設定
}
# http://localhost:9324/queue/sample-fifo1.fifo
sample-fifo1 {
defaultVisibilityTimeout = 0 seconds # 可視性タイムアウトの設定
delay = 0 seconds # 遅延キューの設定
receiveMessageWait = 0 seconds # ロングポーリング
fifo = true #FIFOキューの設定
contentBasedDeduplication = true # 重複排除ID
}
# http://localhost:9324/queue/sample-fifo2.fifo
sample-fifo2 {
defaultVisibilityTimeout = 0 seconds # 可視性タイムアウトの設定
delay = 0 seconds # 遅延キューの設定
receiveMessageWait = 0 seconds # ロングポーリング
fifo = true #FIFOキューの設定
contentBasedDeduplication = false # 重複排除ID
}
}
とりあえず今回の検証のために用意した簡単なUtilです。
メッセージの送受信・削除に関してのメソッドを用意
object SQSUtil {
private val sqsClient: AmazonSQS = AmazonSQSClientBuilder.defaultClient()
/**
* SQSにメッセージを送信(FIFOキューではグループIDなどが必要になるため使用できない)
*
* @param queueUrlVal
* @param message
*/
suspend fun sendMessages(queueUrlVal: String, message: String) {
val sendRequest = SendMessageRequest()
sendRequest.run {
withQueueUrl(queueUrlVal)
withMessageBody(message)
}
try {
sqsClient.sendMessage(sendRequest)
} catch (ex: Exception) {
println(ex.message)
throw RuntimeException()
}
}
suspend fun sendBatchMessages(queueUrlVal: String, messages: List<SendMessageBatchRequestEntry>) {
val sendMessageBatchRequest = SendMessageBatchRequest()
sendMessageBatchRequest.run {
withQueueUrl(queueUrlVal)
withEntries(messages)
}
try {
sqsClient.sendMessageBatch(sendMessageBatchRequest)
} catch (ex: Exception) {
println(ex.message)
throw RuntimeException()
}
}
/**
* メッセージを一括で送信
*
* @param queueUrlVal
* @param maxNumberOfMessages
* @return
*/
suspend fun receiveMessages(queueUrlVal: String, maxNumberOfMessages: Int): List<Message> {
val receiveMessageRequest = ReceiveMessageRequest()
receiveMessageRequest.run {
withQueueUrl(queueUrlVal)
withMaxNumberOfMessages(maxNumberOfMessages)
}
return try {
sqsClient.receiveMessage(receiveMessageRequest).messages
} catch (ex: Exception) {
throw RuntimeException()
}
}
/**
* キューから対象のメッセージを削除
*
* @param queueUrlVal
* @param receiptHandle
*/
suspend fun deleteMessage(queueUrlVal: String, receiptHandle: String) {
val deleteMessageRequest = DeleteMessageRequest()
deleteMessageRequest.run {
withQueueUrl(queueUrlVal)
withReceiptHandle(receiptHandle)
}
try {
sqsClient.deleteMessage(deleteMessageRequest)
} catch (ex: Exception) {
println(ex.message)
throw RuntimeException()
}
}
/**
* キューの中身を全て削除
*
* @param queueUrlVal
*/
suspend fun deleteMessages(queueUrlVal: String) {
val purgeRequest = PurgeQueueRequest()
purgeRequest.run {
withQueueUrl(queueUrlVal)
}
try {
sqsClient.purgeQueue(purgeRequest)
} catch (ex: Exception) {
println(ex.message)
throw RuntimeException()
}
}
}
SQSの設定項目
SQSのそれぞれの設定項目に関してテストコードかきながら動作の確認していこうと思います。
可視性タイムアウト
メッセージをコンシューマ(処理する側)がメッセージを取得してからそのメッセージを一定時間見えなくする設定
@Nested
inner class 通常キューの場合_可視性タイムアウト3秒 {
@Test
fun メッセージを取得してから3秒間メッセージが取得できないこと() {
runBlocking { SQSUtil.sendMessages(queueUrlVal = QUEUE_URL2, message = "テスト") }
val messages1 = runBlocking {
SQSUtil.receiveMessages(queueUrlVal = QUEUE_URL2, maxNumberOfMessages = 10)
}
val messages2 = runBlocking {
SQSUtil.receiveMessages(queueUrlVal = QUEUE_URL2, maxNumberOfMessages = 10)
}
assertThat(messages1.size).isEqualTo(1)
assertThat(messages2.size).isEqualTo(0)
}
@Test
fun メッセージを取得してから3秒後にメッセージが取得できること() {
runBlocking { SQSUtil.sendMessages(queueUrlVal = QUEUE_URL2, message = "テスト") }
val messages1 = runBlocking {
SQSUtil.receiveMessages(queueUrlVal = QUEUE_URL2, maxNumberOfMessages = 10)
}
assertThat(messages1.size).isEqualTo(1)
Thread.sleep(3000) // 可視性タイムアウトが3秒で設定してあるため3秒待つ
val messages2 = runBlocking {
SQSUtil.receiveMessages(queueUrlVal = QUEUE_URL2, maxNumberOfMessages = 10)
}
assertThat(messages2.size).isEqualTo(1)
}
}
遅延キュー
キューがメッセージを取得してから指定時間コンシューマ側から取得できなくする設定
@Nested
inner class 通常キューの場合_遅延キュー3秒 {
@Test
fun メッセージを送信してから3秒経過していない場合メッセージが取得できないこと() {
runBlocking { SQSUtil.sendMessages(queueUrlVal = QUEUE_URL3, message = "テスト") }
val messages1 = runBlocking {
SQSUtil.receiveMessages(queueUrlVal = QUEUE_URL3, maxNumberOfMessages = 10)
}
assertThat(messages1.size).isEqualTo(0)
}
@Test
fun メッセージを送信してから3秒経過したらメッセージが取得できること() {
runBlocking { SQSUtil.sendMessages(queueUrlVal = QUEUE_URL3, message = "テスト") }
Thread.sleep(3000)
val messages1 = runBlocking {
SQSUtil.receiveMessages(queueUrlVal = QUEUE_URL3, maxNumberOfMessages = 10)
}
assertThat(messages1.size).isEqualTo(1)
}
}
FIFO(先入れ先出し)キュー
これが地味に厄介で単純なFirst In First Out
じゃない、、(SimpleQueueServiceとか言ってるのにシンプルなのかと若干思いました。。)
標準キューとFIFOキューの違いについて
これら以外にも標準キューにはなかった設定項目も出てくる(重複除外IDとかメッセージグループID)
メッセージグループID
メッセージを一意に設定するためのIDで、標準キューにはなかった項目。標準キューでは指定できないが、FIFOキューにメッセージを登録する場合には必ず設定しなければならない。
同一のメッセージグループIDで同一のメッセージはキューに登録できない。
あと色々いじってて気づいたのですが、このIDに日本語使えないです。(使うことないと思いますが、、)
@Test
fun メッセージの順序が正しいこと() {
val messageGroupId = "Group1" // メッセージグループIDは同一にする
val message1 = SendMessageBatchRequestEntry().run {
withId(UUID.randomUUID().toString())
withMessageGroupId(messageGroupId)
withMessageBody("テスト1")
}
val message2 = SendMessageBatchRequestEntry().run {
withId(UUID.randomUUID().toString())
withMessageGroupId(messageGroupId)
withMessageBody("テスト2")
}
runBlocking {
SQSUtil.sendBatchMessages(
queueUrlVal = FIFO_QUEUE_URL1,
messages = listOf(message1, message2)
)
}
val actual = runBlocking {
SQSUtil.receiveMessages(queueUrlVal = FIFO_QUEUE_URL1, maxNumberOfMessages = 10)
}
assertThat(actual[0].body).isEqualTo("テスト1")
assertThat(actual[1].body).isEqualTo("テスト2")
}
@Test
fun 同一グループで同一のメッセージは登録されないこと() {
val messageGroupId = "Group1" // メッセージグループIDは同一にする
val message1 = SendMessageBatchRequestEntry().run {
withId(UUID.randomUUID().toString())
withMessageGroupId(messageGroupId)
withMessageBody("テスト1")
}
val message2 = SendMessageBatchRequestEntry().run {
withId(UUID.randomUUID().toString())
withMessageGroupId(messageGroupId)
withMessageBody("テスト1")
}
runBlocking {
SQSUtil.sendBatchMessages(
queueUrlVal = FIFO_QUEUE_URL1,
messages = listOf(message1, message2)
)
}
val actual = runBlocking {
SQSUtil.receiveMessages(queueUrlVal = FIFO_QUEUE_URL1, maxNumberOfMessages = 10)
}
assertThat(actual.size).isEqualTo(1)
}
重複排除ID
この重複排除IDはメッセージグループIDと同じくFIFOキューで設定できる項目です。メッセージグループIDのように設定でき、同一の重複排除IDが設定されているメッセージは5分間キューに送信できても受信で気なくなるというものになります。
@Nested
inner class FIFOキューの場合_コンテンツに基づく重複排除ON {
@Test
fun 重複IDが同一の場合_メッセージの重複が許されないこと() { // 5分間は同一のメッセージ重複IDのメッセージが送信できない
val messageGroupId = "Group1" // メッセージグループIDは同一にする
val messageDuplicateId = "DuplicateId"
val message1 = SendMessageBatchRequestEntry().run {
withId(UUID.randomUUID().toString())
withMessageGroupId(messageGroupId)
withMessageDeduplicationId(messageDuplicateId)
withMessageBody("テスト1")
}
val message2 = SendMessageBatchRequestEntry().run {
withId(UUID.randomUUID().toString())
withMessageGroupId(messageGroupId)
withMessageDeduplicationId(messageDuplicateId)
withMessageBody("テスト2")
}
runBlocking {
SQSUtil.sendBatchMessages(
queueUrlVal = FIFO_QUEUE_URL2,
messages = listOf(message1, message2)
)
}
val actual = runBlocking {
SQSUtil.receiveMessages(queueUrlVal = FIFO_QUEUE_URL2, maxNumberOfMessages = 10)
}
assertThat(actual.size).isEqualTo(1) //2件送信しても1件のみ登録されていること
}
}
まとめ
SQSというものを初めて使ってみましたが、使い方次第ではいろいろなことができそうな気がしました。
まだ、デッドレターキューとかちゃんとみれていないのでそっちも時間があったら調べっようかなと思います。