1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

KotlinでSQS(ElasticMQ)を使ってみた

Last updated at Posted at 2022-09-17

経緯

業務でSQSを使う必要が出てきて、調査・検証した結果意外とややこしいことがわかったので備忘録として残そうと思います。

ElasticMQ

とりあえず、ローカルで検証をおこなうためにSQSと互換のあるElastcMQを使います。
ElasticMQ
docker-composeはこんな感じ。

docker-compose.yml
version: "3"
services:
  elasticmq:
    container_name: elasticmq
    image: softwaremill/elasticmq-native
    volumes:
      # 設定ファイル
      - ./elasticmq/custom.conf://opt/elasticmq.conf:ro
    ports:
      - "9324:9324"
      - "9325:9325"

ちょっと特殊な設定としては、ボリュームを設定して設定ファイルをコンテナ側に読み込ませる必要があります。(別にAPIで作るなり、コンテナの中に入って作ったりもできると思いますがこの方が簡単なので)

volumes:
  # 設定ファイル
  - ./elasticmq/custom.conf://opt/elasticmq.conf:ro

あとは、ポートをバインドしてあげる感じです。
(9324はAPI用のポート、9325は管理画面?用のポートになります。)

ports:
  - "9324:9324"
  - "9325:9325"

dockerのvolumeに繋ぐようにして設定したファイルは以下(設定項目はコメントアウトしてある通りでそれぞれの詳細は追って記載)

custom.conf
include classpath( "application.conf" )

node-address {
  protocol = http
  host = localhost
  port = 9324
  context-path = ""
}

rest-sqs {
  enabled = true
  bind-port = 9324
  bind-hostname = "0.0.0.0"
  # Possible values: relaxed, strict
  sqs-limits = strict
}

generate-node-address = false

queues {
    # http://localhost:9324/queue/sample1
    sample1 {
        defaultVisibilityTimeout = 0 seconds # 可視性タイムアウトの設定
        delay = 0 seconds # 遅延キューの設定
        receiveMessageWait = 0 seconds # ロングポーリング
        fifo = false #FIFOキューの設定
    }

    # http://localhost:9324/queue/sample2
    sample2 {
        defaultVisibilityTimeout = 3 seconds # 可視性タイムアウトの設定
        delay = 0 seconds # 遅延キューの設定
        receiveMessageWait = 0 seconds # ロングポーリング
        fifo = false #FIFOキューの設定
    }

    # http://localhost:9324/queue/sample3
    sample3 {
        defaultVisibilityTimeout = 0 seconds # 可視性タイムアウトの設定
        delay = 3 seconds # 遅延キューの設定
        receiveMessageWait = 0 seconds # ロングポーリング
        fifo = false #FIFOキューの設定
    }

    # http://localhost:9324/queue/sample-fifo1.fifo
    sample-fifo1 {
        defaultVisibilityTimeout = 0 seconds # 可視性タイムアウトの設定
        delay = 0 seconds # 遅延キューの設定
        receiveMessageWait = 0 seconds # ロングポーリング
        fifo = true #FIFOキューの設定
        contentBasedDeduplication = true # 重複排除ID
    }

    # http://localhost:9324/queue/sample-fifo2.fifo
    sample-fifo2 {
        defaultVisibilityTimeout = 0 seconds # 可視性タイムアウトの設定
        delay = 0 seconds # 遅延キューの設定
        receiveMessageWait = 0 seconds # ロングポーリング
        fifo = true #FIFOキューの設定
        contentBasedDeduplication = false # 重複排除ID
    }
}

とりあえず今回の検証のために用意した簡単なUtilです。
メッセージの送受信・削除に関してのメソッドを用意

SQSUtil.kt
object SQSUtil {
  private val sqsClient: AmazonSQS = AmazonSQSClientBuilder.defaultClient()

  /**
   * SQSにメッセージを送信(FIFOキューではグループIDなどが必要になるため使用できない)
   *
   * @param queueUrlVal
   * @param message
   */
  suspend fun sendMessages(queueUrlVal: String, message: String) {
    val sendRequest = SendMessageRequest()
    sendRequest.run {
      withQueueUrl(queueUrlVal)
      withMessageBody(message)
    }

    try {
      sqsClient.sendMessage(sendRequest)
    } catch (ex: Exception) {
      println(ex.message)
      throw RuntimeException()
    }
  }

  suspend fun sendBatchMessages(queueUrlVal: String, messages: List<SendMessageBatchRequestEntry>) {
    val sendMessageBatchRequest = SendMessageBatchRequest()
    sendMessageBatchRequest.run {
      withQueueUrl(queueUrlVal)
      withEntries(messages)
    }

    try {
      sqsClient.sendMessageBatch(sendMessageBatchRequest)
    } catch (ex: Exception) {
      println(ex.message)
      throw RuntimeException()
    }
  }

  /**
   * メッセージを一括で送信
   *
   * @param queueUrlVal
   * @param maxNumberOfMessages
   * @return
   */
  suspend fun receiveMessages(queueUrlVal: String, maxNumberOfMessages: Int): List<Message> {
    val receiveMessageRequest = ReceiveMessageRequest()
    receiveMessageRequest.run {
      withQueueUrl(queueUrlVal)
      withMaxNumberOfMessages(maxNumberOfMessages)
    }

    return try {
      sqsClient.receiveMessage(receiveMessageRequest).messages
    } catch (ex: Exception) {
      throw RuntimeException()
    }
  }

  /**
   * キューから対象のメッセージを削除
   *
   * @param queueUrlVal
   * @param receiptHandle
   */
  suspend fun deleteMessage(queueUrlVal: String, receiptHandle: String) {
    val deleteMessageRequest = DeleteMessageRequest()
    deleteMessageRequest.run {
      withQueueUrl(queueUrlVal)
      withReceiptHandle(receiptHandle)
    }

    try {
      sqsClient.deleteMessage(deleteMessageRequest)
    } catch (ex: Exception) {
      println(ex.message)
      throw RuntimeException()
    }
  }

  /**
   * キューの中身を全て削除
   *
   * @param queueUrlVal
   */
  suspend fun deleteMessages(queueUrlVal: String) {
    val purgeRequest = PurgeQueueRequest()
    purgeRequest.run {
      withQueueUrl(queueUrlVal)
    }

    try {
      sqsClient.purgeQueue(purgeRequest)
    } catch (ex: Exception) {
      println(ex.message)
      throw RuntimeException()
    }
  }
}

SQSの設定項目

SQSのそれぞれの設定項目に関してテストコードかきながら動作の確認していこうと思います。

可視性タイムアウト

メッセージをコンシューマ(処理する側)がメッセージを取得してからそのメッセージを一定時間見えなくする設定

  @Nested
  inner class 通常キューの場合_可視性タイムアウト3秒 {
    @Test
    fun メッセージを取得してから3秒間メッセージが取得できないこと() {
      runBlocking { SQSUtil.sendMessages(queueUrlVal = QUEUE_URL2, message = "テスト") }

      val messages1 = runBlocking {
        SQSUtil.receiveMessages(queueUrlVal = QUEUE_URL2, maxNumberOfMessages = 10)
      }
      val messages2 = runBlocking {
        SQSUtil.receiveMessages(queueUrlVal = QUEUE_URL2, maxNumberOfMessages = 10)
      }
      assertThat(messages1.size).isEqualTo(1)
      assertThat(messages2.size).isEqualTo(0)
    }

    @Test
    fun メッセージを取得してから3秒後にメッセージが取得できること() {
      runBlocking { SQSUtil.sendMessages(queueUrlVal = QUEUE_URL2, message = "テスト") }
      val messages1 = runBlocking {
        SQSUtil.receiveMessages(queueUrlVal = QUEUE_URL2, maxNumberOfMessages = 10)
      }
      assertThat(messages1.size).isEqualTo(1)

      Thread.sleep(3000) // 可視性タイムアウトが3秒で設定してあるため3秒待つ

      val messages2 = runBlocking {
        SQSUtil.receiveMessages(queueUrlVal = QUEUE_URL2, maxNumberOfMessages = 10)
      }
      assertThat(messages2.size).isEqualTo(1)
    }
  }

遅延キュー

キューがメッセージを取得してから指定時間コンシューマ側から取得できなくする設定

  @Nested
  inner class 通常キューの場合_遅延キュー3秒 {
    @Test
    fun メッセージを送信してから3秒経過していない場合メッセージが取得できないこと() {
      runBlocking { SQSUtil.sendMessages(queueUrlVal = QUEUE_URL3, message = "テスト") }
      val messages1 = runBlocking {
        SQSUtil.receiveMessages(queueUrlVal = QUEUE_URL3, maxNumberOfMessages = 10)
      }
      
      assertThat(messages1.size).isEqualTo(0)
    }

    @Test
    fun メッセージを送信してから3秒経過したらメッセージが取得できること() {
      runBlocking { SQSUtil.sendMessages(queueUrlVal = QUEUE_URL3, message = "テスト") }

      Thread.sleep(3000)

      val messages1 = runBlocking {
        SQSUtil.receiveMessages(queueUrlVal = QUEUE_URL3, maxNumberOfMessages = 10)
      }
      assertThat(messages1.size).isEqualTo(1)
    }
  }

FIFO(先入れ先出し)キュー

これが地味に厄介で単純なFirst In First Outじゃない、、(SimpleQueueServiceとか言ってるのにシンプルなのかと若干思いました。。)

標準キューとFIFOキューの違いについて

これら以外にも標準キューにはなかった設定項目も出てくる(重複除外IDとかメッセージグループID)

メッセージグループID

メッセージを一意に設定するためのIDで、標準キューにはなかった項目。標準キューでは指定できないが、FIFOキューにメッセージを登録する場合には必ず設定しなければならない。
同一のメッセージグループIDで同一のメッセージはキューに登録できない。
あと色々いじってて気づいたのですが、このIDに日本語使えないです。(使うことないと思いますが、、)

    @Test
    fun メッセージの順序が正しいこと() {
      val messageGroupId = "Group1" // メッセージグループIDは同一にする
      val message1 = SendMessageBatchRequestEntry().run {
        withId(UUID.randomUUID().toString())
        withMessageGroupId(messageGroupId)
        withMessageBody("テスト1")
      }
      val message2 = SendMessageBatchRequestEntry().run {
        withId(UUID.randomUUID().toString())
        withMessageGroupId(messageGroupId)
        withMessageBody("テスト2")
      }
      runBlocking {
        SQSUtil.sendBatchMessages(
          queueUrlVal = FIFO_QUEUE_URL1,
          messages = listOf(message1, message2)
        )
      }

      val actual = runBlocking {
        SQSUtil.receiveMessages(queueUrlVal = FIFO_QUEUE_URL1, maxNumberOfMessages = 10)
      }

      assertThat(actual[0].body).isEqualTo("テスト1")
      assertThat(actual[1].body).isEqualTo("テスト2")
    }

    @Test
    fun 同一グループで同一のメッセージは登録されないこと() {
      val messageGroupId = "Group1" // メッセージグループIDは同一にする
      val message1 = SendMessageBatchRequestEntry().run {
        withId(UUID.randomUUID().toString())
        withMessageGroupId(messageGroupId)
        withMessageBody("テスト1")
      }
      val message2 = SendMessageBatchRequestEntry().run {
        withId(UUID.randomUUID().toString())
        withMessageGroupId(messageGroupId)
        withMessageBody("テスト1")
      }
      runBlocking {
        SQSUtil.sendBatchMessages(
          queueUrlVal = FIFO_QUEUE_URL1,
          messages = listOf(message1, message2)
        )
      }

      val actual = runBlocking {
        SQSUtil.receiveMessages(queueUrlVal = FIFO_QUEUE_URL1, maxNumberOfMessages = 10)
      }

      assertThat(actual.size).isEqualTo(1)
    }

重複排除ID

この重複排除IDはメッセージグループIDと同じくFIFOキューで設定できる項目です。メッセージグループIDのように設定でき、同一の重複排除IDが設定されているメッセージは5分間キューに送信できても受信で気なくなるというものになります。

  @Nested
  inner class FIFOキューの場合_コンテンツに基づく重複排除ON {
    @Test
    fun 重複IDが同一の場合_メッセージの重複が許されないこと() { // 5分間は同一のメッセージ重複IDのメッセージが送信できない
      val messageGroupId = "Group1" // メッセージグループIDは同一にする
      val messageDuplicateId = "DuplicateId"
      val message1 = SendMessageBatchRequestEntry().run {
        withId(UUID.randomUUID().toString())
        withMessageGroupId(messageGroupId)
        withMessageDeduplicationId(messageDuplicateId)
        withMessageBody("テスト1")
      }
      val message2 = SendMessageBatchRequestEntry().run {
        withId(UUID.randomUUID().toString())
        withMessageGroupId(messageGroupId)
        withMessageDeduplicationId(messageDuplicateId)
        withMessageBody("テスト2")
      }
      runBlocking {
        SQSUtil.sendBatchMessages(
          queueUrlVal = FIFO_QUEUE_URL2,
          messages = listOf(message1, message2)
        )
      }

      val actual = runBlocking {
        SQSUtil.receiveMessages(queueUrlVal = FIFO_QUEUE_URL2, maxNumberOfMessages = 10)
      }

      assertThat(actual.size).isEqualTo(1) //2件送信しても1件のみ登録されていること
    }
  }

まとめ

SQSというものを初めて使ってみましたが、使い方次第ではいろいろなことができそうな気がしました。
まだ、デッドレターキューとかちゃんとみれていないのでそっちも時間があったら調べっようかなと思います。

1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?