0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

【Go】LocalStackとaws-sdk-go-v2を利用してAWSサービスの基本操作をしてみた

Posted at

はじめに

この記事は、Go Advent Calendar 2025の24日目の記事です。

今回は、LocalStackを利用して、GoでAWS操作を気軽に体験してみました!LocalStack を使えば、ローカル環境でAWSサービスを触ることができます。

aws-sdk-go-v2を使ってSQSとDynamoDBを操作する CLI ツールを実際に作りながら、AWSの基本的な操作をしてみました。

利用ツール・ディレクトリ構成

CLIの骨組みにはCobraを使用していますが、本記事ではAWS SDKを利用したロジック部分に焦点を当てて説明します。

開発環境

  • macOS
  • Docker Desktop for Mac
  • Go 1.25.4
  • cobra-cli
  • awslocal 2.16.4

cobra-cliとは

Go言語で多機能なCLIアプリケーションを構築するためのフレームワーク「Cobra」のCLIツールです。
コマンド一発でプロジェクトの骨組みが出来上がります。

# プロジェクトの初期化
cobra-cli init

# サブコマンドの追加(例:sqsコマンド)
cobra-cli add sqs

# プロジェクトの実行
go run main.go sqs

これだけで、main.go のセットアップや、引数解析のロジックが自動生成されます。

installの方法などは以下を参照してください。

awslocalとは

LocalStackに対してAWS CLIを実行する場合、全てのコマンドに --endpoint-url オプションを付与する必要があります。awslocal はこのエンドポイント指定を自動化し、本物の AWS CLI と全く同じ構文での操作を可能にします。

installの方法などは以下を参照してください。

実装

ここからは具体的なコードを紹介します。
※aws-sdk-go-v2には非常に多くのパラメータが存在するため、一部抜粋やエラーハンドリングの簡略化をしています。実際に利用する際は公式のリファレンスもあわせて参照ください。

LocalStackの設定

docker-compose.yml

name: sample
services:
  localstack:
    image: localstack/localstack:latest
    container_name: localstack
    ports:
      - "4566:4566"
    environment:
      - SERVICES=sqs,dynamodb
      - SQS_ENDPOINT_STRATEGY=off
      - AWS_DEFAULT_REGION=ap-northeast-1
    volumes:
      - "./init:/etc/localstack/init/ready.d"
  • ports
    LocalStackはデフォルトで4566を利用します。
  • SERVICES
    起動するサービスを設定しています。
  • SQS_ENDPOINT_STRATEGY
    SQSのURLの形式を指定します。今回はlocalでの開発なのでシンプルになるような設定をしています。

  • AWS_DEFAULT_REGION
    コンテナ内部のデフォルトリージョンをap-northeast-1に設定しています。これにより、初期化スクリプトを実行する際、意図しないリージョンにリソースが作られるのを防ぎます。
  • /etc/localstack/init/ready.d
    サービス起動した後に実行するスクリプトなどをマウントします。

起動後のスクリプト

init.sh
#!/bin/bash

echo "localstack setup started"

echo "sqs setup started"

# SQSキューの作成
awslocal sqs create-queue --queue-name receive-queue
awslocal sqs create-queue --queue-name send-queue

# 作成したキューの確認
echo "Listing SQS Queues:"
awslocal sqs list-queues

echo "sqs setup completed"

echo "dynamodb setup started"
# DynamoDBテーブルの作成
awslocal dynamodb create-table --table-name SampleTable \
    --attribute-definitions \
        AttributeName=ID,AttributeType=S \
        AttributeName=City,AttributeType=S \
        AttributeName=CreatedAt,AttributeType=S \
    --key-schema AttributeName=ID,KeyType=HASH \
    --global-secondary-indexes \
        IndexName=City-CreatedAt-index,KeySchema='[{AttributeName=City,KeyType=HASH},{AttributeName=CreatedAt,KeyType=RANGE}]',Projection='{ProjectionType=ALL}' \
    --billing-mode PAY_PER_REQUEST

# TTLの設定
awslocal dynamodb update-time-to-live \
    --table-name SampleTable \
    --time-to-live-specification "Enabled=true,AttributeName=ExpiresAt"

# 作成したテーブルの確認
echo "Listing DynamoDB Tables"

awslocal dynamodb list-tables

# SampleTableにアイテムを追加
awslocal dynamodb put-item --table-name SampleTable --item '{
    "ID": {"S": "123"}, 
    "Name": {"S": "Test Item"}, 
    "City": {"S": "Tokyo"}, 
    "CreatedAt": {"S": "2024-12-23T10:00:00Z"}
}'

awslocal dynamodb put-item --table-name SampleTable --item '{
    "ID": {"S": "124"}, 
    "Name": {"S": "Test Item 2"}, 
    "City": {"S": "Osaka"}, 
    "CreatedAt": {"S": "2024-12-23T11:00:00Z"}
}'

echo "dynamodb setup completed"

echo "localstack setup completed"

awslocalを利用して、SQSキューとDynamoDBテーブルの作成をしています。
DynamoDBについては、GSIやTTLも設定しています。

chmod +x init.sh で実行権限を付与しておく必要がある点に注意してください。

認証

LocalStackを利用するため、ダミーの認証キーを利用します。

package config

import (
	"context"

	"github.com/aws/aws-sdk-go-v2/aws"
	"github.com/aws/aws-sdk-go-v2/config"
	"github.com/aws/aws-sdk-go-v2/credentials"
)

const (
	LocalStackEndpoint = "http://localhost.localstack.cloud:4566"

	DefaultRegion = "ap-northeast-1"

	DummyAccessKey    = "dummy_access_key"
	DummySecretKey    = "dummy_secret_key"
	DummySessionToken = ""
)

func GetLocalStackConfig(ctx context.Context) aws.Config {
	cfg, _ := config.LoadDefaultConfig(ctx, config.WithRegion(DefaultRegion),
		config.WithCredentialsProvider(credentials.NewStaticCredentialsProvider(DummyAccessKey, DummySecretKey, DummySessionToken)),
	)
	return cfg
}

sqs

sqsを操作するために以下を利用します。

構造体

SQSの操作を行うために、以下のような構造体を定義しました。

type SQSClient struct {
    client *sqs.Client
}

// メッセージ受信用の構造体
type ReceiveMessage struct {
    Name    string `json:"name"`
    Age     int32  `json:"age"`
    City    string `json:"city"`
    Country string `json:"country"`
}

// メッセージ送信用の構造体
type SendMessage struct {
    Name string `json:"name"`
    Age  int32  `json:"age"`
    City string `json:"city"`
}

LocalStack用のクライアント初期化

LocalStackに接続するため、エンドポイントを明示的に設定しています:

func NewSQSClient() *SQSClient {
    cfg := config.GetLocalStackConfig()
    client := sqs.NewFromConfig(cfg, func(o *sqs.Options) {
        o.BaseEndpoint = aws.String(config.LocalStackEndpoint)
    })
    return &SQSClient{
        client: client,
    }
}

メッセージ受信(ReceiveMessage)

SQSからメッセージを取得します。

func (s *SQSClient) ReceiveMessages(ctx context.Context) ([]model.Message, error) {

	receiveCtx, cancel := context.WithTimeout(ctx, 10*time.Second)
	defer cancel()
	input := &sqs.ReceiveMessageInput{
		QueueUrl:            aws.String(fmt.Sprintf("%s/receive-queue", baseQueueURL)),
		MaxNumberOfMessages: 10,
		WaitTimeSeconds:     1,
	}

	receiveMessages, err := s.client.ReceiveMessage(receiveCtx, input)
    // ... エラーハンドリング、JSON解析処理

ポイント

  • MaxNumberOfMessages: 10でバッチ受信(最大10件までしか1度に取得できない)

// The maximum number of messages to return. Amazon SQS never returns more
// messages than this value (however, fewer messages might be returned). Valid
// values: 1 to 10. Default: 1.
MaxNumberOfMessages int32

  • WaitTimeSeconds: 1でメッセージを受信するまで待機する時間を指定

他にも可視性タイムアウトの設定なども可能です。

メッセージ送信(SendMessageBatch)

SQSへメッセージを送信します。送信する際、1件か複数送信するのかで利用するメソッドが変わります。
今回は複数件送信するパターンとなります

func (s *SQSClient) SendMessages(ctx context.Context, messages []model.Message) (*int, error) {
    if len(messages) > 10 {
        return nil, fmt.Errorf("SQS SendMessageBatch supports a maximum of 10 messages per batch")
    }
    
    var entries []types.SendMessageBatchRequestEntry
    uuid := uuid.New().String()

    for i, message := range messages {
        jsonData, err := json.Marshal(message)
        if err != nil {
            return nil, fmt.Errorf("failed to marshal message: %v", err)
        }

        entries = append(entries, types.SendMessageBatchRequestEntry{
            Id:          aws.String(fmt.Sprintf("%s-%d", uuid, i)),
            MessageBody: aws.String(string(jsonData)),
        })
    }

    input := &sqs.SendMessageBatchInput{
		QueueUrl: aws.String(fmt.Sprintf("%s/send-queue", baseQueueURL)),
		Entries:  entries,
	}

	sendMessages, err := s.client.SendMessageBatch(sendCtx, input)
	if err != nil {
		return nil, fmt.Errorf("failed to send messages: %v", err)
	}
    if len(sendMessages.Failed) > 0 {
		for _, failed := range sendMessages.Failed {
			fmt.Printf("Failed to send message %s: %v\n", *failed.Id, failed.Message)
		}
		return nil, fmt.Errorf("some messages failed to send")
	}
    // ... エラーハンドリング、JSON解析処理
}

ポイント

  • バッチ送信にも制限があり最大10件となる
    件数以外も一度に送信できるサイズもあるので注意が必要。
  • UUIDを使った一意なメッセージID生成
  • 送信に成功と失敗したものが混在する可能性がある。そのため、返却値に失敗がないか確認する必要がある
  • もしFIFOキュー(順序保証)を利用する場合は、MessageGroupIdなども指定する必要がある

メッセージ削除(DeleteMessageBatch)

SQSはメッセージを取得しただけでは、そのキューからメッセージは削除されません。そのため明示的に削除する必要があります。
送信と同様、1件か複数削除するのかで利用するメソッドが変わります。

func (s *SQSClient) DeleteMessages(ctx context.Context, messages []model.Message) (*int, error) {
    var entries []types.DeleteMessageBatchRequestEntry
    
    for i, message := range messages {
        entries = append(entries, types.DeleteMessageBatchRequestEntry{
            Id:            aws.String(fmt.Sprintf("%s-%d", uuid, i)),
            ReceiptHandle: aws.String(message.Metadata.ReceiptHandle),
        })
    }
    
    deleteMessages, err := s.client.DeleteMessageBatch(ctx, input)
	if err != nil {
		return nil, fmt.Errorf("failed to delete messages: %v", err)
	}

	if len(deleteMessages.Failed) > 0 {
		for _, failed := range deleteMessages.Failed {
			fmt.Printf("Failed to delete message %s: %v\n", *failed.Id, failed.Message)
		}
		return nil, fmt.Errorf("some messages failed to delete")
	}
    // ... エラーハンドリング、JSON解析処理
}

ポイント

  • 削除する際はReceiptHandleが必要になる。メッセージ取得時、そのメッセージを削除するためのキー情報が発行される
  • 削除に成功と失敗したものが混在する可能性がある。そのため、返却値に失敗がないか確認する必要がある

DynamoDB

DynamoDBを操作するために以下を利用します。

構造体

DynamoDBの操作を行うために、以下のような構造体を定義しました。

type DynamoDBClient struct {
    client *dynamodb.Client
}

// DynamoDBアイテム用の構造体
type Item struct {
    ID        string `dynamodbav:"ID"`        // パーティションキー
    Name      string `dynamodbav:"Name"`
    City      string `dynamodbav:"City"`      // GSIのパーティションキー
    CreatedAt string `dynamodbav:"CreatedAt"` // GSIのソートキー
    ExpiresAt int64  `dynamodbav:"ExpiresAt"` // TTL(Time To Live)用
}

LocalStack用のクライアント初期化

SQSと同様、LocalStackのエンドポイントを指定してクライアントを初期化します。

func NewDynamoDBClient() *DynamoDBClient {
    cfg := config.GetLocalStackConfig()
    client := dynamodb.NewFromConfig(cfg, func(o *dynamodb.Options) {
        o.BaseEndpoint = aws.String(config.LocalStackEndpoint)
    })
    return &DynamoDBClient{
        client: client,
    }
}

アイテムの登録 (PutItem)

テーブルにデータを1件登録します。

func (d *DynamoDBClient) PutItem(ctx context.Context) (string, error) {
    jst, err := time.LoadLocation("Asia/Tokyo")
    if err != nil {
        return "", err
    }
    
    item := Item{
        ID:        uuid.New().String(),
        Name:      "Sample Name",
        City:      "Sample City",
        CreatedAt: time.Now().In(jst).Format(time.RFC3339),
        ExpiresAt: time.Now().Add(30 * time.Second).Unix(), // 30秒後にTTLを設定
    }

    itemMap, err := attributevalue.MarshalMap(item)
    if err != nil {
        return "", err
    }

    input := &dynamodb.PutItemInput{
        TableName: aws.String(TableName),
        Item:      itemMap,
        ConditionExpression: aws.String("attribute_not_exists(ID)"),
    }
    
    _, err = d.client.PutItem(ctx, input)
    return item.ID, err
}

ポイント

  • attributevalue.MarshalMapでGoの構造体をDynamoDB形式に変換
  • ConditionExpressionで楽観的ロック(重複防止)を実装
    指定しない場合、既に存在するキーを上書きします。
  • TTLはDynamoDBの特性上期限を超えてもすぐには消えない

一括書き込み (BatchWriteItem)

一括で書き込みを行います。

func (d *DynamoDBClient) BatchWriteItems(ctx context.Context, items []Item) error {

	var writeRequests []types.WriteRequest

	for _, item := range items {
		itemMap, err := attributevalue.MarshalMap(item)
		if err != nil {
			return err
		}

		writeRequests = append(writeRequests, types.WriteRequest{
			PutRequest: &types.PutRequest{
				Item: itemMap,
			},
		})
	}

	input := &dynamodb.BatchWriteItemInput{
		RequestItems: map[string][]types.WriteRequest{
			TableName: writeRequests,
		},
	}

	output, err := d.client.BatchWriteItem(ctx, input)
	if err != nil {
		return err
	}
	if len(output.UnprocessedItems) > 0 {
		return fmt.Errorf("unprocessed items found: %v", output.UnprocessedItems)
	}
	return nil
}

ポイント

  • 最大25件までの一括書き込みが可能でサイズの制限もある
  • 登録に成功と失敗したものが混在する可能性がある
    そのため、返却値に失敗がないか確認する必要がある

IDによる単一アイテム取得 (GetItemByID)

主キーをもとに単一のアイテムを取得します。
登録と同様に複数取得するメソッドも存在します。

func (d *DynamoDBClient) GetItemByID(ctx context.Context, id string) (*model.Item, error) {
    input := &dynamodb.GetItemInput{
        TableName: aws.String(TableName),
        Key: map[string]types.AttributeValue{
            "ID": &types.AttributeValueMemberS{Value: id},
        },
    }

    result, err := d.client.GetItem(ctx, input)
    if err != nil {
		return nil, err
	}

	if result.Item == nil {
		return &model.Item{}, nil
	}
    // ... エラーハンドリング、JSON解析処理
}

ポイント:

  • 主キーを指定した単一アイテムの取得
  • result.Item == nilでアイテムが存在しない場合のハンドリング

GSIを使った一覧取得 (Query)

GSIを利用してクエリを書くことが可能です。
(主キーやGSIを利用しない場合はScan()という重いクエリを発行してアイテムを取得する必要があります。)

func (d *DynamoDBClient) GetList(ctx context.Context, city string) ([]model.Item, error) {
    paginator := dynamodb.NewQueryPaginator(d.client, &dynamodb.QueryInput{
        TableName: aws.String(TableName),
        IndexName: aws.String("City-CreatedAt-index"), // GSI指定
        KeyConditionExpression: aws.String("City = :city"),
        ExpressionAttributeValues: map[string]types.AttributeValue{
            ":city": &types.AttributeValueMemberS{Value: city},
        },
        Limit:            aws.Int32(3),
        ScanIndexForward: aws.Bool(false), // 降順(最新順)
    })

    var items []model.Item
    for paginator.HasMorePages() {
        output, err := paginator.NextPage(ctx)
        if err != nil {
            return nil, err
        }
        
        for _, item := range output.Items {
            var modelItem model.Item
            err := attributevalue.UnmarshalMap(item, &modelItem)
            if err != nil {
                return nil, err
            }
            items = append(items, modelItem)
        }
    }
    return items, nil
}

ポイント:

  • NewQueryPaginatorで1MB制限を自動でハンドル
    一度に取得できるサイズには上限があるため、ページネーションが必要になる
    Client.Queryを利用してもページネーションを実現可能ですが、こちらの方がわかりやすい
  • GSI(City-CreatedAt-index)を使用した効率的な検索
  • ScanIndexForward: falseで降順取得(最新データから順番に)

まとめ

LocalStackを使えばAWSの利用料を気にせず、いろいろなこと試すことができます。

今回は標準のSDKを使いましたが、他のAWSサービスを操作するライブラリもあるため、そちらも試してみたいと思います!

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?