はじめに
この記事は、Go Advent Calendar 2025の24日目の記事です。
今回は、LocalStackを利用して、GoでAWS操作を気軽に体験してみました!LocalStack を使えば、ローカル環境でAWSサービスを触ることができます。
aws-sdk-go-v2を使ってSQSとDynamoDBを操作する CLI ツールを実際に作りながら、AWSの基本的な操作をしてみました。
利用ツール・ディレクトリ構成
CLIの骨組みにはCobraを使用していますが、本記事ではAWS SDKを利用したロジック部分に焦点を当てて説明します。
開発環境
- macOS
- Docker Desktop for Mac
- Go 1.25.4
- cobra-cli
- awslocal 2.16.4
cobra-cliとは
Go言語で多機能なCLIアプリケーションを構築するためのフレームワーク「Cobra」のCLIツールです。
コマンド一発でプロジェクトの骨組みが出来上がります。
# プロジェクトの初期化
cobra-cli init
# サブコマンドの追加(例:sqsコマンド)
cobra-cli add sqs
# プロジェクトの実行
go run main.go sqs
これだけで、main.go のセットアップや、引数解析のロジックが自動生成されます。
installの方法などは以下を参照してください。
awslocalとは
LocalStackに対してAWS CLIを実行する場合、全てのコマンドに --endpoint-url オプションを付与する必要があります。awslocal はこのエンドポイント指定を自動化し、本物の AWS CLI と全く同じ構文での操作を可能にします。
installの方法などは以下を参照してください。
実装
ここからは具体的なコードを紹介します。
※aws-sdk-go-v2には非常に多くのパラメータが存在するため、一部抜粋やエラーハンドリングの簡略化をしています。実際に利用する際は公式のリファレンスもあわせて参照ください。
LocalStackの設定
docker-compose.yml
name: sample
services:
localstack:
image: localstack/localstack:latest
container_name: localstack
ports:
- "4566:4566"
environment:
- SERVICES=sqs,dynamodb
- SQS_ENDPOINT_STRATEGY=off
- AWS_DEFAULT_REGION=ap-northeast-1
volumes:
- "./init:/etc/localstack/init/ready.d"
- ports
LocalStackはデフォルトで4566を利用します。 - SERVICES
起動するサービスを設定しています。 - SQS_ENDPOINT_STRATEGY
SQSのURLの形式を指定します。今回はlocalでの開発なのでシンプルになるような設定をしています。
- AWS_DEFAULT_REGION
コンテナ内部のデフォルトリージョンをap-northeast-1に設定しています。これにより、初期化スクリプトを実行する際、意図しないリージョンにリソースが作られるのを防ぎます。 - /etc/localstack/init/ready.d
サービス起動した後に実行するスクリプトなどをマウントします。
起動後のスクリプト
init.sh
#!/bin/bash
echo "localstack setup started"
echo "sqs setup started"
# SQSキューの作成
awslocal sqs create-queue --queue-name receive-queue
awslocal sqs create-queue --queue-name send-queue
# 作成したキューの確認
echo "Listing SQS Queues:"
awslocal sqs list-queues
echo "sqs setup completed"
echo "dynamodb setup started"
# DynamoDBテーブルの作成
awslocal dynamodb create-table --table-name SampleTable \
--attribute-definitions \
AttributeName=ID,AttributeType=S \
AttributeName=City,AttributeType=S \
AttributeName=CreatedAt,AttributeType=S \
--key-schema AttributeName=ID,KeyType=HASH \
--global-secondary-indexes \
IndexName=City-CreatedAt-index,KeySchema='[{AttributeName=City,KeyType=HASH},{AttributeName=CreatedAt,KeyType=RANGE}]',Projection='{ProjectionType=ALL}' \
--billing-mode PAY_PER_REQUEST
# TTLの設定
awslocal dynamodb update-time-to-live \
--table-name SampleTable \
--time-to-live-specification "Enabled=true,AttributeName=ExpiresAt"
# 作成したテーブルの確認
echo "Listing DynamoDB Tables"
awslocal dynamodb list-tables
# SampleTableにアイテムを追加
awslocal dynamodb put-item --table-name SampleTable --item '{
"ID": {"S": "123"},
"Name": {"S": "Test Item"},
"City": {"S": "Tokyo"},
"CreatedAt": {"S": "2024-12-23T10:00:00Z"}
}'
awslocal dynamodb put-item --table-name SampleTable --item '{
"ID": {"S": "124"},
"Name": {"S": "Test Item 2"},
"City": {"S": "Osaka"},
"CreatedAt": {"S": "2024-12-23T11:00:00Z"}
}'
echo "dynamodb setup completed"
echo "localstack setup completed"
awslocalを利用して、SQSキューとDynamoDBテーブルの作成をしています。
DynamoDBについては、GSIやTTLも設定しています。
chmod +x init.sh で実行権限を付与しておく必要がある点に注意してください。
認証
LocalStackを利用するため、ダミーの認証キーを利用します。
package config
import (
"context"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/credentials"
)
const (
LocalStackEndpoint = "http://localhost.localstack.cloud:4566"
DefaultRegion = "ap-northeast-1"
DummyAccessKey = "dummy_access_key"
DummySecretKey = "dummy_secret_key"
DummySessionToken = ""
)
func GetLocalStackConfig(ctx context.Context) aws.Config {
cfg, _ := config.LoadDefaultConfig(ctx, config.WithRegion(DefaultRegion),
config.WithCredentialsProvider(credentials.NewStaticCredentialsProvider(DummyAccessKey, DummySecretKey, DummySessionToken)),
)
return cfg
}
sqs
sqsを操作するために以下を利用します。
構造体
SQSの操作を行うために、以下のような構造体を定義しました。
type SQSClient struct {
client *sqs.Client
}
// メッセージ受信用の構造体
type ReceiveMessage struct {
Name string `json:"name"`
Age int32 `json:"age"`
City string `json:"city"`
Country string `json:"country"`
}
// メッセージ送信用の構造体
type SendMessage struct {
Name string `json:"name"`
Age int32 `json:"age"`
City string `json:"city"`
}
LocalStack用のクライアント初期化
LocalStackに接続するため、エンドポイントを明示的に設定しています:
func NewSQSClient() *SQSClient {
cfg := config.GetLocalStackConfig()
client := sqs.NewFromConfig(cfg, func(o *sqs.Options) {
o.BaseEndpoint = aws.String(config.LocalStackEndpoint)
})
return &SQSClient{
client: client,
}
}
メッセージ受信(ReceiveMessage)
SQSからメッセージを取得します。
func (s *SQSClient) ReceiveMessages(ctx context.Context) ([]model.Message, error) {
receiveCtx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
input := &sqs.ReceiveMessageInput{
QueueUrl: aws.String(fmt.Sprintf("%s/receive-queue", baseQueueURL)),
MaxNumberOfMessages: 10,
WaitTimeSeconds: 1,
}
receiveMessages, err := s.client.ReceiveMessage(receiveCtx, input)
// ... エラーハンドリング、JSON解析処理
ポイント
-
MaxNumberOfMessages: 10でバッチ受信(最大10件までしか1度に取得できない)
// The maximum number of messages to return. Amazon SQS never returns more
// messages than this value (however, fewer messages might be returned). Valid
// values: 1 to 10. Default: 1.
MaxNumberOfMessages int32
-
WaitTimeSeconds: 1でメッセージを受信するまで待機する時間を指定
他にも可視性タイムアウトの設定なども可能です。
メッセージ送信(SendMessageBatch)
SQSへメッセージを送信します。送信する際、1件か複数送信するのかで利用するメソッドが変わります。
今回は複数件送信するパターンとなります
func (s *SQSClient) SendMessages(ctx context.Context, messages []model.Message) (*int, error) {
if len(messages) > 10 {
return nil, fmt.Errorf("SQS SendMessageBatch supports a maximum of 10 messages per batch")
}
var entries []types.SendMessageBatchRequestEntry
uuid := uuid.New().String()
for i, message := range messages {
jsonData, err := json.Marshal(message)
if err != nil {
return nil, fmt.Errorf("failed to marshal message: %v", err)
}
entries = append(entries, types.SendMessageBatchRequestEntry{
Id: aws.String(fmt.Sprintf("%s-%d", uuid, i)),
MessageBody: aws.String(string(jsonData)),
})
}
input := &sqs.SendMessageBatchInput{
QueueUrl: aws.String(fmt.Sprintf("%s/send-queue", baseQueueURL)),
Entries: entries,
}
sendMessages, err := s.client.SendMessageBatch(sendCtx, input)
if err != nil {
return nil, fmt.Errorf("failed to send messages: %v", err)
}
if len(sendMessages.Failed) > 0 {
for _, failed := range sendMessages.Failed {
fmt.Printf("Failed to send message %s: %v\n", *failed.Id, failed.Message)
}
return nil, fmt.Errorf("some messages failed to send")
}
// ... エラーハンドリング、JSON解析処理
}
ポイント
- バッチ送信にも制限があり最大10件となる
件数以外も一度に送信できるサイズもあるので注意が必要。 - UUIDを使った一意なメッセージID生成
- 送信に成功と失敗したものが混在する可能性がある。そのため、返却値に失敗がないか確認する必要がある
- もしFIFOキュー(順序保証)を利用する場合は、
MessageGroupIdなども指定する必要がある
メッセージ削除(DeleteMessageBatch)
SQSはメッセージを取得しただけでは、そのキューからメッセージは削除されません。そのため明示的に削除する必要があります。
送信と同様、1件か複数削除するのかで利用するメソッドが変わります。
func (s *SQSClient) DeleteMessages(ctx context.Context, messages []model.Message) (*int, error) {
var entries []types.DeleteMessageBatchRequestEntry
for i, message := range messages {
entries = append(entries, types.DeleteMessageBatchRequestEntry{
Id: aws.String(fmt.Sprintf("%s-%d", uuid, i)),
ReceiptHandle: aws.String(message.Metadata.ReceiptHandle),
})
}
deleteMessages, err := s.client.DeleteMessageBatch(ctx, input)
if err != nil {
return nil, fmt.Errorf("failed to delete messages: %v", err)
}
if len(deleteMessages.Failed) > 0 {
for _, failed := range deleteMessages.Failed {
fmt.Printf("Failed to delete message %s: %v\n", *failed.Id, failed.Message)
}
return nil, fmt.Errorf("some messages failed to delete")
}
// ... エラーハンドリング、JSON解析処理
}
ポイント
- 削除する際は
ReceiptHandleが必要になる。メッセージ取得時、そのメッセージを削除するためのキー情報が発行される - 削除に成功と失敗したものが混在する可能性がある。そのため、返却値に失敗がないか確認する必要がある
DynamoDB
DynamoDBを操作するために以下を利用します。
構造体
DynamoDBの操作を行うために、以下のような構造体を定義しました。
type DynamoDBClient struct {
client *dynamodb.Client
}
// DynamoDBアイテム用の構造体
type Item struct {
ID string `dynamodbav:"ID"` // パーティションキー
Name string `dynamodbav:"Name"`
City string `dynamodbav:"City"` // GSIのパーティションキー
CreatedAt string `dynamodbav:"CreatedAt"` // GSIのソートキー
ExpiresAt int64 `dynamodbav:"ExpiresAt"` // TTL(Time To Live)用
}
LocalStack用のクライアント初期化
SQSと同様、LocalStackのエンドポイントを指定してクライアントを初期化します。
func NewDynamoDBClient() *DynamoDBClient {
cfg := config.GetLocalStackConfig()
client := dynamodb.NewFromConfig(cfg, func(o *dynamodb.Options) {
o.BaseEndpoint = aws.String(config.LocalStackEndpoint)
})
return &DynamoDBClient{
client: client,
}
}
アイテムの登録 (PutItem)
テーブルにデータを1件登録します。
func (d *DynamoDBClient) PutItem(ctx context.Context) (string, error) {
jst, err := time.LoadLocation("Asia/Tokyo")
if err != nil {
return "", err
}
item := Item{
ID: uuid.New().String(),
Name: "Sample Name",
City: "Sample City",
CreatedAt: time.Now().In(jst).Format(time.RFC3339),
ExpiresAt: time.Now().Add(30 * time.Second).Unix(), // 30秒後にTTLを設定
}
itemMap, err := attributevalue.MarshalMap(item)
if err != nil {
return "", err
}
input := &dynamodb.PutItemInput{
TableName: aws.String(TableName),
Item: itemMap,
ConditionExpression: aws.String("attribute_not_exists(ID)"),
}
_, err = d.client.PutItem(ctx, input)
return item.ID, err
}
ポイント
-
attributevalue.MarshalMapでGoの構造体をDynamoDB形式に変換 -
ConditionExpressionで楽観的ロック(重複防止)を実装
指定しない場合、既に存在するキーを上書きします。 - TTLはDynamoDBの特性上期限を超えてもすぐには消えない
一括書き込み (BatchWriteItem)
一括で書き込みを行います。
func (d *DynamoDBClient) BatchWriteItems(ctx context.Context, items []Item) error {
var writeRequests []types.WriteRequest
for _, item := range items {
itemMap, err := attributevalue.MarshalMap(item)
if err != nil {
return err
}
writeRequests = append(writeRequests, types.WriteRequest{
PutRequest: &types.PutRequest{
Item: itemMap,
},
})
}
input := &dynamodb.BatchWriteItemInput{
RequestItems: map[string][]types.WriteRequest{
TableName: writeRequests,
},
}
output, err := d.client.BatchWriteItem(ctx, input)
if err != nil {
return err
}
if len(output.UnprocessedItems) > 0 {
return fmt.Errorf("unprocessed items found: %v", output.UnprocessedItems)
}
return nil
}
ポイント
- 最大25件までの一括書き込みが可能でサイズの制限もある
- 登録に成功と失敗したものが混在する可能性がある
そのため、返却値に失敗がないか確認する必要がある
IDによる単一アイテム取得 (GetItemByID)
主キーをもとに単一のアイテムを取得します。
登録と同様に複数取得するメソッドも存在します。
func (d *DynamoDBClient) GetItemByID(ctx context.Context, id string) (*model.Item, error) {
input := &dynamodb.GetItemInput{
TableName: aws.String(TableName),
Key: map[string]types.AttributeValue{
"ID": &types.AttributeValueMemberS{Value: id},
},
}
result, err := d.client.GetItem(ctx, input)
if err != nil {
return nil, err
}
if result.Item == nil {
return &model.Item{}, nil
}
// ... エラーハンドリング、JSON解析処理
}
ポイント:
- 主キーを指定した単一アイテムの取得
-
result.Item == nilでアイテムが存在しない場合のハンドリング
GSIを使った一覧取得 (Query)
GSIを利用してクエリを書くことが可能です。
(主キーやGSIを利用しない場合はScan()という重いクエリを発行してアイテムを取得する必要があります。)
func (d *DynamoDBClient) GetList(ctx context.Context, city string) ([]model.Item, error) {
paginator := dynamodb.NewQueryPaginator(d.client, &dynamodb.QueryInput{
TableName: aws.String(TableName),
IndexName: aws.String("City-CreatedAt-index"), // GSI指定
KeyConditionExpression: aws.String("City = :city"),
ExpressionAttributeValues: map[string]types.AttributeValue{
":city": &types.AttributeValueMemberS{Value: city},
},
Limit: aws.Int32(3),
ScanIndexForward: aws.Bool(false), // 降順(最新順)
})
var items []model.Item
for paginator.HasMorePages() {
output, err := paginator.NextPage(ctx)
if err != nil {
return nil, err
}
for _, item := range output.Items {
var modelItem model.Item
err := attributevalue.UnmarshalMap(item, &modelItem)
if err != nil {
return nil, err
}
items = append(items, modelItem)
}
}
return items, nil
}
ポイント:
-
NewQueryPaginatorで1MB制限を自動でハンドル
一度に取得できるサイズには上限があるため、ページネーションが必要になる
Client.Queryを利用してもページネーションを実現可能ですが、こちらの方がわかりやすい - GSI(
City-CreatedAt-index)を使用した効率的な検索 -
ScanIndexForward: falseで降順取得(最新データから順番に)
まとめ
LocalStackを使えばAWSの利用料を気にせず、いろいろなこと試すことができます。
今回は標準のSDKを使いましたが、他のAWSサービスを操作するライブラリもあるため、そちらも試してみたいと思います!