Posted at
AtraeDay 7

Alpakka AWS SQS Connectorを試してみた

More than 1 year has passed since last update.

Alpakka 0.3で AWS SQS Connector が追加されたので試してみました。


Alpakka

Github akka/alpakka


This project provides a home to Akka Streams connectors to various technologies, protocols or libraries.


Akka Streams コネクターが提供されています。

Akka Streams に関してはこちらこちらをご覧ください。


前準備


  • AWSのアカウントを作成しAccess key IDSecret access keyを取得

  • SQSでキューを作成し、URLを取得


実装

こちらのリファレンスを参考(ほぼそのまま)にコードを書きます。 configは適宜読み替えてください。

メッセージが送信されると本文を出力し、そのまま削除するようにしています。


build.sbt

name := "alpakka-sqs-sample"

version := "1.0"

scalaVersion := "2.12.0"

libraryDependencies += "com.lightbend.akka" %% "akka-stream-alpakka-sqs" % "0.3"



AlpakkaSQSSample.scala

import akka.actor.ActorSystem

import akka.stream.ActorMaterializer
import akka.stream.alpakka.sqs.scaladsl.SqsSource
import com.amazonaws.auth.BasicAWSCredentials
import com.amazonaws.services.sqs.AmazonSQSAsyncClient
import com.amazonaws.services.sqs.model.DeleteMessageRequest
import com.typesafe.config.ConfigFactory

import scala.concurrent.duration.{ FiniteDuration, _ }

object AlpakkaSQSSample {
def main(args: Array[String]): Unit = {
val config = ConfigFactory.load()

val credentials = new BasicAWSCredentials(
config.getString("aws.accessKey"), config.getString("aws.secretKey")
)
implicit val sqsClient: AmazonSQSAsyncClient =
new AmazonSQSAsyncClient(credentials).withEndpoint(config.getString("aws.sqs.endpoint"))

implicit val system = ActorSystem()
implicit val mat = ActorMaterializer()

val queue = config.getString("aws.sqs.url")

SqsSource(queue)
.runForeach((message) => {
println(message.getBody)
sqsClient.deleteMessage(
new DeleteMessageRequest(queue, message.getReceiptHandle)
)
})
}
}


ロングポーリングタイムアウトやバッファーサイズなどはSqsSourceSettings(longPollingDuration: FiniteDuration, maxBufferSize: Int, maxBatchSize: Int)で設定可能です。(SqsSource.applyの第二引数)

デフォルトの設定はこちらです。


実行

$ sbt

> run

メッセージを送信すると以下のように本文が表示されます。

[info] Running ***.AlpakkaSQSSample 

message-1
message-2


最後に

Akka Streamsがさらに身近な存在になりました。

リファレンス通りにElasticMQを利用すれば、SQSを利用しなくてもすぐ試すことができます。

ちなみに、FIFOキューだとちょっと気になる挙動をしていたのでAkka User Listで質問してみました。


参考