Alpakka 0.3で AWS SQS Connector が追加されたので試してみました。
Released #Alpakka 0.3. AWS SQS, Cassandra, FTP. Great stuff, thanks all contributors. https://t.co/ObyGvhamWu
— Akka Team (@akkateam) 2016年12月2日
Alpakka
This project provides a home to Akka Streams connectors to various technologies, protocols or libraries.
Akka Streams コネクターが提供されています。
Akka Streams に関してはこちらやこちらをご覧ください。
前準備
- AWSのアカウントを作成し
Access key ID
とSecret access key
を取得 - SQSでキューを作成し、URLを取得
実装
こちらのリファレンスを参考(ほぼそのまま)にコードを書きます。 config
は適宜読み替えてください。
メッセージが送信されると本文を出力し、そのまま削除するようにしています。
name := "alpakka-sqs-sample"
version := "1.0"
scalaVersion := "2.12.0"
libraryDependencies += "com.lightbend.akka" %% "akka-stream-alpakka-sqs" % "0.3"
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.alpakka.sqs.scaladsl.SqsSource
import com.amazonaws.auth.BasicAWSCredentials
import com.amazonaws.services.sqs.AmazonSQSAsyncClient
import com.amazonaws.services.sqs.model.DeleteMessageRequest
import com.typesafe.config.ConfigFactory
import scala.concurrent.duration.{ FiniteDuration, _ }
object AlpakkaSQSSample {
def main(args: Array[String]): Unit = {
val config = ConfigFactory.load()
val credentials = new BasicAWSCredentials(
config.getString("aws.accessKey"), config.getString("aws.secretKey")
)
implicit val sqsClient: AmazonSQSAsyncClient =
new AmazonSQSAsyncClient(credentials).withEndpoint(config.getString("aws.sqs.endpoint"))
implicit val system = ActorSystem()
implicit val mat = ActorMaterializer()
val queue = config.getString("aws.sqs.url")
SqsSource(queue)
.runForeach((message) => {
println(message.getBody)
sqsClient.deleteMessage(
new DeleteMessageRequest(queue, message.getReceiptHandle)
)
})
}
}
ロングポーリングタイムアウトやバッファーサイズなどはSqsSourceSettings(longPollingDuration: FiniteDuration, maxBufferSize: Int, maxBatchSize: Int)
で設定可能です。(SqsSource.apply
の第二引数)
デフォルトの設定はこちらです。
実行
$ sbt
> run
メッセージを送信すると以下のように本文が表示されます。
[info] Running ***.AlpakkaSQSSample
message-1
message-2
最後に
Akka Streamsがさらに身近な存在になりました。
リファレンス通りにElasticMQを利用すれば、SQSを利用しなくてもすぐ試すことができます。
ちなみに、FIFOキューだとちょっと気になる挙動をしていたのでAkka User Listで質問してみました。