2
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

AtraeAdvent Calendar 2016

Day 7

Alpakka AWS SQS Connectorを試してみた

Posted at

Alpakka 0.3で AWS SQS Connector が追加されたので試してみました。

Alpakka

Github akka/alpakka

This project provides a home to Akka Streams connectors to various technologies, protocols or libraries.

Akka Streams コネクターが提供されています。
Akka Streams に関してはこちらこちらをご覧ください。

前準備

  • AWSのアカウントを作成しAccess key IDSecret access keyを取得
  • SQSでキューを作成し、URLを取得

実装

こちらのリファレンスを参考(ほぼそのまま)にコードを書きます。 configは適宜読み替えてください。
メッセージが送信されると本文を出力し、そのまま削除するようにしています。

build.sbt
name := "alpakka-sqs-sample"

version := "1.0"

scalaVersion := "2.12.0"

libraryDependencies += "com.lightbend.akka" %% "akka-stream-alpakka-sqs" % "0.3"
AlpakkaSQSSample.scala
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.alpakka.sqs.scaladsl.SqsSource
import com.amazonaws.auth.BasicAWSCredentials
import com.amazonaws.services.sqs.AmazonSQSAsyncClient
import com.amazonaws.services.sqs.model.DeleteMessageRequest
import com.typesafe.config.ConfigFactory

import scala.concurrent.duration.{ FiniteDuration, _ }

object AlpakkaSQSSample {
  def main(args: Array[String]): Unit = {
    val config = ConfigFactory.load()

    val credentials = new BasicAWSCredentials(
      config.getString("aws.accessKey"), config.getString("aws.secretKey")
    )
    implicit val sqsClient: AmazonSQSAsyncClient =
      new AmazonSQSAsyncClient(credentials).withEndpoint(config.getString("aws.sqs.endpoint"))

    implicit val system = ActorSystem()
    implicit val mat = ActorMaterializer()

    val queue = config.getString("aws.sqs.url")

    SqsSource(queue)
      .runForeach((message) => {
        println(message.getBody)
        sqsClient.deleteMessage(
          new DeleteMessageRequest(queue, message.getReceiptHandle)
        )
      })
  }
}

ロングポーリングタイムアウトやバッファーサイズなどはSqsSourceSettings(longPollingDuration: FiniteDuration, maxBufferSize: Int, maxBatchSize: Int)で設定可能です。(SqsSource.applyの第二引数)
デフォルトの設定はこちらです。

実行

$ sbt
> run

メッセージを送信すると以下のように本文が表示されます。

[info] Running ***.AlpakkaSQSSample 
message-1
message-2

最後に

Akka Streamsがさらに身近な存在になりました。
リファレンス通りにElasticMQを利用すれば、SQSを利用しなくてもすぐ試すことができます。
ちなみに、FIFOキューだとちょっと気になる挙動をしていたのでAkka User Listで質問してみました。

参考

2
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?