一時的にスマホからワッとくるアクセスをとりあえずキューに入れて、
別処理でDBに突っ込むようなシステムを作りたい。
そこでaws/aws-sdk-goを使ってSQSを試してみる手順。
TL;DR
aws-sdk-goを使えばGoからAWSの操作は大体できるよ
基本戦略
SQSには指定回数取得されたのに正常終了(削除処理)されなかったメッセージを
別のキュー(以下デッドキュー)に移してくれる機能があるので、これを使う。
これを使うとエラー処理をあまりがんばらなくて済む。
またロングポーリングを行い、メッセージが空の場合は接続したまま一定時間待つことで、
アクセスを減らすようにする。
注意点として同一メッセージが複数回取得される場合があるので、
実際のシステムではそれでも問題ないように実装する必要がある。
事前準備
aws-sdk-goのインストール
$ go get -u github.com/aws/aws-sdk-go/...
credentialsファイルの作成
以下のファイルにアクセスキーの情報を書いて保存しておく。
$HOME/.aws/credentials
[default]
aws_access_key_id = xxxxxxxxxx
aws_secret_access_key = xxxxxxxxxxxxxxxxx
SQS作成
こんな感じのコードでキューとデッドキューを作れる。
コードじゃなくてもWEBのコンソールから作ってもいい。
package main
import (
"fmt"
"log"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/sqs"
)
var svc *sqs.SQS
const (
AWS_REGION = "ap-northeast-1"
MAIN_QUEUE_NAME = "main-queue"
DEAD_QUEUE_NAME = "dead-queue"
)
// エラーになったメッセージを溜め込むためのデッドキューを作る。
func CreateDeadQueue(queuename string) (string, error) {
params := &sqs.CreateQueueInput{
QueueName: aws.String(queuename),
}
queueurl, err := createQueue(params)
if err != nil {
return "", err
}
return queueurl, nil
}
// キューを作成する。
func CreateMainQueue(queuename, deadarn string) (string, error) {
// 一定回数の処理に失敗したメッセージをデッドキューに入れるためのポリシー。
// ここでは3回失敗したらデッドキューに移すようにしている。
// 設定内容はjson文字列を組み立てる。わりと原始的。
redrivePolicy := fmt.Sprintf(
"{\"deadLetterTargetArn\":\"%s\",\"maxReceiveCount\":%d}",
deadarn,
3,
)
// キュー名を指定してキュー作成。
// 設定内容はAttributesに足していく。
params := &sqs.CreateQueueInput{
QueueName: aws.String(queuename),
Attributes: map[string]*string{
// VisibilityTimeout:取得したメッセージは指定した秒数の間、他から見えなくする。
"VisibilityTimeout": aws.String("30"),
// ReceiveMessageWaitTimeSeconds: ロングポーリングの秒数。
// キューが空だった場合、どれだけ待つか。
// ここで指定しなくても、メッセージ取得時に指定は可能。
"ReceiveMessageWaitTimeSeconds": aws.String("20"),
// RedrivePolicy: デッドキュー用ポリシー。先に作っておいた値を設定。
"RedrivePolicy": aws.String(redrivePolicy),
},
}
queueurl, err := createQueue(params)
if err != nil {
return "", err
}
return queueurl, nil
}
// 指定キューの情報を取得する。
// デッドキュー指定時にARNが必要だが、キュー生成時にはQueueURLしか返してくれないので、
// 別途取得している。
func GetQueueAttributes(queueurl string) (map[string]*string, error) {
params := &sqs.GetQueueAttributesInput{
QueueUrl: aws.String(queueurl),
AttributeNames: []*string{
// ほしいパラメータ名を指定。Allにすると全部取得できる。
aws.String("All"),
},
}
resp, err := svc.GetQueueAttributes(params)
if err != nil {
return nil, err
}
return resp.Attributes, nil
}
// キュー生成用の共通関数
func createQueue(params *sqs.CreateQueueInput) (string, error) {
resp, err := svc.CreateQueue(params)
if err != nil {
return "", err
}
return *resp.QueueUrl, nil
}
func main() {
// クライアント生成
svc = sqs.New(&aws.Config{Region: aws.String(AWS_REGION)})
// デッドキューを先に作成
deadurl, err := CreateDeadQueue(DEAD_QUEUE_NAME)
if err != nil {
log.Fatal(err)
}
// デッドキューのARNを取得
attrs, err := GetQueueAttributes(deadurl)
if err != nil {
log.Fatal(err)
}
deadarn := *attrs["QueueArn"]
// メイン処理用のキューを作成
mainurl, err := CreateMainQueue(MAIN_QUEUE_NAME, deadarn)
if err != nil {
log.Fatal(err)
}
fmt.Printf("Main queue URL:%s\nDead queue URL:%s\n", mainurl, deadurl)
}
メッセージの登録
package main
import (
"fmt"
"log"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/sqs"
)
var svc *sqs.SQS
const (
AWS_REGION = "ap-northeast-1"
QUEUE_URL = "https://sqs.ap-northeast-1.amazonaws.com/xxxxxxx/main-queue"
)
// メッセージを送信する
func SendMessage() error {
// 送信内容を作成
params := &sqs.SendMessageInput{
MessageBody: aws.String("HogeFugaPiyo"),
QueueUrl: aws.String(QUEUE_URL),
DelaySeconds: aws.Int64(1),
}
sqsRes, err := svc.SendMessage(params)
if err != nil {
return err
}
fmt.Println("SQSMessageID", *sqsRes.MessageId)
return nil
}
func main() {
svc = sqs.New(&aws.Config{Region: aws.String(AWS_REGION)})
if err := SendMessage(); err != nil {
log.Fatal(err)
}
}
メッセージの回収
package main
import (
"fmt"
"log"
"sync"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/sqs"
)
var svc *sqs.SQS
const (
AWS_REGION = "ap-northeast-1"
QUEUE_URL = "https://sqs.ap-northeast-1.amazonaws.com/xxxxxxx/main-queue"
)
func RetrieveMessage() error {
params := &sqs.ReceiveMessageInput{
QueueUrl: aws.String(QUEUE_URL),
// 一度に取得する最大メッセージ数。最大でも10まで。
MaxNumberOfMessages: aws.Int64(10),
// これでキューが空の場合はロングポーリング(20秒間繋ぎっぱなし)になる。
WaitTimeSeconds: aws.Int64(20),
}
resp, err := svc.ReceiveMessage(params)
if err != nil {
return err
}
fmt.Printf("messages count: %d\n", len(resp.Messages))
if len(resp.Messages) == 0 {
fmt.Println("empty queue.")
return nil
}
// メッセージの数だけgoroutineを実行してみる。
var wg sync.WaitGroup
for _, m := range resp.Messages {
wg.Add(1)
go func(msg *sqs.Message) {
defer wg.Done()
fmt.Println(msg.Body)
if err := DeleteMessage(msg); err != nil {
fmt.Println(err)
}
}(m)
}
wg.Wait()
return nil
}
// メッセージを削除する。
func DeleteMessage(msg *sqs.Message) error {
params := &sqs.DeleteMessageInput{
QueueUrl: aws.String(QUEUE_URL),
ReceiptHandle: aws.String(*msg.ReceiptHandle),
}
_, err := svc.DeleteMessage(params)
if err != nil {
return err
}
return nil
}
func main() {
svc = sqs.New(&aws.Config{Region: aws.String(AWS_REGION)})
// 止めるまでメッセージを回収しつづける。
for {
if err := RetrieveMessage(); err != nil {
log.Fatal(err)
}
}
}
aws-sdk-goの感想
前にRubyのSDKを使ってAWSを操作した経験があり、そのときはドキュメントを見てもどう使えばいいのか分からず苦労した覚えがあるのですが、Go版はexamplesが用意されているのと、静的言語だけあってライブラリ側のコードにすぐジャンプできることもあり、使用するのはとても楽でした。
一方でパラメーター設定などは単なる文字列での指定であったりして、APIの薄いラッパー感が強いです。
でもこれぐらいの方がAPIドキュメントからの類推も効くので、充分なように思います。