34
24

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

【実務でもよく使う】SQS × Lamda × EventBridgeで実現する非同期&バッチ処理の基本

Last updated at Posted at 2023-08-11

概要

応答速度の改善を目的にシステムに非同期処理を組み込むことはよくあることかと思います。AWSでもSQSを利用することで比較的簡単にジョブキュー処理を実現することができます。一方で、バッチ処理との組み合わせも踏まえると、AWSのサービスの選択肢は複数存在するため分かりにくいです。

そこで、本記事では以下を整理してみました。

本記事で得られる内容

  • 非同期処理のメリット・デメリット
  • AWSで非同期処理・バッチ処理を実現する方法
  • 実際にSQS × lamda × EventBridgeの組み合わせでサンプルアプリを作成

知識整理編

まず、前提となる基礎知識や非同期処理・バッチ処理を実現するために必要なAWSリソースを簡単に整理してみます。ここで紹介する内容は概要に留めるため、より詳細を知りたい方はAWSの公式ドキュメントやBlackBelt等の資料をご確認ください。

非同期処理

非同期処理とは、処理の結果を待つ必要がないサービスの呼び出し方法を指します。非同期処理は時間がかかる処理を切り分け、システム間を疎結合に取り扱えるためにいくつかのメリットがあります。

項目 内容
応答性の改善 ユーザーのリクエストに迅速に対応するため、応答時間が短縮。これにより、ユーザーの満足度向上や業務効率の増加が期待されます。
耐障害性・可用性の向上 システムの一部が障害に見舞われても、他の部分は正常に動作継続できます。これにより、システム全体の信頼性や稼働率が向上します。
スループットの向上 複数のタスクを並行して処理することで、一定時間内に完了できるタスクの数が増加。全体の生産性がアップします。
コスト削減 高い効率での処理により、必要なリソースや時間が削減。長期的には運用コストの大幅な削減が期待されます。

以下の記事が非常に参考になるのでおすすめです。

SQSの何が凄いのか

SQS(Simple Queue Service)は、AWSが提供するフルマネージド型のメッセージキューサービスで、その機能と性能により多くの企業や開発者から高く評価されています。以下の点が特筆されます。

項目 内容
フルマネージド AWSのSQSのようなフルマネージドサービスを利用することで、ユーザーはサーバーの設定や運用管理から解放され、ビジネスロジックに集中することができます。
ほぼ無制限のTP 高いTPSを持つサービスは、大量のトランザクションを秒単位で処理する能力を持つ。
従量課金 従量課金制(API 実行回数 + データ転送量)を採用することで、実際に利用したリソースのみを課金の対象とする。
分散キューモデルによる高い可用性を提供 分散モデルを採用することで、システムのダウンタイムを低減し、高いサービスの可用性を実現します。

SQS×Lamdaによって開発者が楽になること

非同期処理においては、受け手側の処理が煩雑になることが多いです。特にキューに対して、メッセージを取得するReceiveMessageや、処理済みのメッセージをキューから削除するDeleteMessageの扱いを実装することは非常に面倒です。一方で、Lamdaはソース元をSQSに指定することで、以下を自動で実行できます。

項目 内容
自動ポーリング AWS Lambdaは、SQSキューからのメッセージを自動的にポーリングします。これにより、手動でのメッセージ取得やキューの監視の手間が省かれ、システムの効率性が向上します。
スケーリング メッセージ量に応じてLambdaの実行環境がスケーリングされるため、大量のメッセージでも迅速に処理が可能です。動的なトラフィックや急増するアクセスにも柔軟に対応します。
メッセージ削除 Lambda関数がメッセージを処理した後、該当メッセージは自動的にSQSから削除されます。これにより、メッセージの重複処理のリスクが低減され、効率的なメッセージ管理が可能となります。

SQS × lamdaの組み合わせにおいて注意すべきこと

便利なこの組み合わせですが、設定時にはいくつか注意すべき点があります。1回のLambda実行で処理するSQSメッセージの数を指定するバッチサイズや、メッセージが消費されている間、他の消費者がそのメッセージを見ることができない時間可視性タイムアウト等は、安定的な非同期処理を持続させるためには、適切に設定する必要があります。さらに、失敗したメッセージをDLQ(Dead Letter Queue)で取り扱うなど、エラーハンドリングも重要です。

詳細は以下の記事が参考になるので確認してみてください。

AWSでバッチ処理を実現するための技術候補

AWSでバッチ処理を実現する方法はいくつかあります。ここでは代表的なサービスと組み合わせをご紹介します。なお、この内容は以下の記事を参考にさせて頂きました。内容も細かく、かつ非常にわかりやすいのでぜひおすすめです。

No ユースケース サービス 構築難易度
1 実行時間が15分を超えない + 少量リクエスト Lambda
2 実行時間が15分を超えない + 大量リクエスト SQS + Lambda やや易
3 実行時間が15分を超える + 少量リクエスト ECS(Fargate) やや易
4 実行時間が15分を超える + 大量リクエスト AWS Batch(Fargate) やや難

上記における"15分"というのは、Lamdaにおける実行時間の制限時間となります。また、上記に加えて、"スケジューラー"や複雑なジョブの"依存関係の制御"を踏まえると以下のリソースなどをさらにかけ合わせる必要があります。

No ユースケース サービス
1 スケジューラー EventBridge .etc
ジョブ制御 Step Functions .etc

上記はそれぞれのユースケースを実現する上で必須なリソースではありません。上記以外の方法でも、スケジューリングを行うことは可能です。

ハンズオン編

ここからは実際にサンプルアプリを使って実際に動かしてみます。以下のリポジトリをクローンからお願いします。

サンプルアプリ概要

今回のハンズオンでは以下のユースケースを想定してみます。

  • EventBridgeのスケジューラーが定刻にバッチ処理を起動
  • 前段のLamda処理でDynamoDBから対処となるデータを取得
  • SQSに上記データのキーを含めてタスクメッセージをキューイング
  • 後段のLamda処理で非同期処理を行い完了次第SESでメール送信
    ※画像では後段のLamda処理でもDynamoDBへ矢印を向けていますが、コード上では割愛しています。

名称未設定ファイル-Lamda×SQS.drawio (2).png

動作確認

(1)前段のGo用のLamdaのパッケージ作成

# デプロイパッケージを作成
GOOS=linux GOARCH=amd64 go build -o main
zip -r ./.aws/lamda/sqs_sender.zip main
  • こちらの通り、Go1.x系は標準ランタイムサポートが終了しています。
  • したがって本記事ではカスタムランタイムをイメージを用いて実現しています。

Lambda の Go 1.x マネージドランタイムは非奨励になりました。Go 1.x ランタイムを使用する関数がある場合は、関数を provided.al2023 または provided.al2 に移行する必要があります。

ECRへカスタムランタイムをアップロード

docker build --platform linux/amd64 -t go-custom-runtime-lambda .

aws ecr get-login-password --region ap-northeast-1 | docker login --username AWS --password-stdin xxxxxxxxxxxx.dkr.ecr.ap-northeast-1.amazonaws.com

docker tag go-custom-runtime-lambda:latest xxxxxxxxxxxx.dkr.ecr.ap-northeast-1.amazonaws.com/go-custom-runtime-lambda:latest

docker push xxxxxxxxxxxx.dkr.ecr.ap-northeast-1.amazonaws.com/go-custom-runtime-lambda:latest

(2)後段のNode.js用のLamdaのパッケージ作成

# デプロイパッケージを作成
zip -r ./.aws/lamda/ses_email_sender.zip index.mjs

(3)Terraformを使ってリソース構築を行う

事前に以下を参考にインストールをお願いします。

リポジトリに格納されているvariables.tfファイルのうち<ご自身のメールアドレスを入力>の箇所に正常に受信できるご自身のメールアドレスを入力してください。

variables.tf
// ...割愛
variable "your_email" {
  type    = string
  default = "<ご自身のメールアドレスを入力>"
}
# コンテナを立ち上げる
docker run \
  -v ~/.aws:/root/.aws \
  -v $(pwd)/.aws:/terraform \
  -w /terraform \
  -it \
  --entrypoint=ash \
  hashicorp/terraform:1.5

# 初期化
terraform init
# 差分検出
terraform plan
# コードを適用する
terraform apply -auto-approve

(4)DynamoDBに登録

# テスト用レコード作成
aws dynamodb put-item \
    --table-name users \
    --item '{
        "user_id": {"S": "1"},
        "timestamp": {"S": "2023-08-03T10:00:00Z"},
        "status": {"S": "a"},
        "mailAddress": {"S": "<ご自身のメールアドレスを入力>"},
        "name": {"S": "Jone"}
    }' \
    --region ap-northeast-1

(5)SESで送信されるメール送信を認証

指定したメールアドレスにAWSから認証用のメールアドアレスが送信されてくるので確認します。

image.png

(6)動作確認

EventBridgeで一分単位で指定しているので以下のメールが送信されていることが確認できるかと思います。

スクリーンショット 2023-08-09 7.18.35.png

(7)リソース削除

検証が終わったら以下のコマンドでリソースを削除してください。

# 削除
terraform destroy

Terraformを使ったリソース構築

本記事ではIaCとしてTerraformを採用しています。コードの詳細は割愛しますが、参考までにコードを掲載します。

(1)main.tf

main.tf
provider "aws" {
  region = "ap-northeast-1"
}

(2)eventBridge.tf

eventBridge.tf
# EventBridgeルールの作成
resource "aws_cloudwatch_event_rule" "every_minute" {
  name                = "every-minute"
  schedule_expression = "rate(1 minute)"
}

# EventBridgeルールに対するターゲットの作成
resource "aws_cloudwatch_event_target" "run_lambda_every_minute" {
  rule      = aws_cloudwatch_event_rule.every_minute.name
  target_id = "runLambdaFunction"
  arn       = aws_lambda_function.sqs_sender.arn
}

# ターゲットとなるLambda関数に対する実行権限の付与
resource "aws_lambda_permission" "allow_cloudwatch_to_call" {
  statement_id  = "AllowExecutionFromCloudWatch"
  action        = "lambda:InvokeFunction"
  function_name = aws_lambda_function.sqs_sender.function_name
  principal     = "events.amazonaws.com"
  source_arn    = aws_cloudwatch_event_rule.every_minute.arn
}

(3)lamda.tf

lamda.tf
/*
後段のバッチ処理とSESへメッセージ送信
*/

resource "aws_lambda_function" "ses_email_sender" {
  function_name = "ses_email_sender"
  handler       = "index.handler"
  runtime       = "nodejs18.x"
  role          = aws_iam_role.lambda_role_for_ses.arn
  filename      = "lamda/ses_email_sender.zip"
  environment {
    variables = {
      EMAIL_SOURCE = var.your_email
    }
  }
}

resource "aws_lambda_event_source_mapping" "sqs_mapping" {
  event_source_arn = aws_sqs_queue.queue.arn
  function_name    = aws_lambda_function.ses_email_sender.arn
  batch_size       = 10
}

resource "aws_iam_role" "lambda_role_for_ses" {
  name = "lambda_role_for_ses"

  assume_role_policy = jsonencode({
    Version = "2012-10-17",
    Statement = [
      {
        Action = "sts:AssumeRole",
        Effect = "Allow",
        Principal = {
          Service = "lambda.amazonaws.com"
        }
      },
    ]
  })
}

resource "aws_iam_role_policy" "lambda_policy_for_ses" {
  name = "lambda_policy_for_ses"
  role = aws_iam_role.lambda_role_for_ses.id

  policy = jsonencode({
    Version = "2012-10-17",
    Statement = [
      {
        Effect = "Allow",
        Action = [
          "sqs:DeleteMessage",
          "sqs:ReceiveMessage",
          "sqs:GetQueueAttributes",
        ],
        Resource = aws_sqs_queue.queue.arn
      },
      {
        Effect = "Allow",
        Action = [
          "ses:SendEmail",
          "logs:CreateLogGroup",
          "logs:CreateLogStream",
          "logs:PutLogEvents",
        ],
        Resource = "*"
      },
    ]
  })
}

/*
前段のSQSにキューイングするLamda
*/

resource "aws_lambda_function" "sqs_sender" {
  function_name = "sqs_sender"
  handler       = "main"
  runtime       = "go1.x"
  role          = aws_iam_role.lambda_role_for_sqs.arn
  filename      = "lamda/sqs_sender.zip"
  environment {
    variables = {
      SQS_QUEUE_URL = aws_sqs_queue.queue.url
    }
  }
}

# LambdaのIAMロール
resource "aws_iam_role" "lambda_role_for_sqs" {
  name = "lambda_role_for_sqs"

  assume_role_policy = jsonencode({
    Version = "2012-10-17",
    Statement = [
      {
        Action = "sts:AssumeRole",
        Effect = "Allow",
        Principal = {
          Service = "lambda.amazonaws.com"
        }
      },
    ]
  })
}

# LambdaのIAMポリシー
resource "aws_iam_role_policy" "lambda_policy_for_sqs" {
  name = "lambda_policy_for_sqs"
  role = aws_iam_role.lambda_role_for_sqs.id

  policy = jsonencode({
    Version = "2012-10-17",
    Statement = [
      {
        Effect = "Allow",
        Action = [
          "dynamodb:GetItem"
        ],
        Resource = aws_dynamodb_table.users.arn
      },
      {
        Effect = "Allow",
        Action = [
          "sqs:SendMessage",
        ],
        Resource = aws_sqs_queue.queue.arn
      },
      {
        Effect = "Allow",
        Action = [
          "logs:CreateLogGroup",
          "logs:CreateLogStream",
          "logs:PutLogEvents",
        ],
        Resource = "*"
      },
    ]
  })
}

(4)dynamodb.tf

dynamodb.tf
resource "aws_dynamodb_table" "users" {
  name         = "users"
  billing_mode = "PAY_PER_REQUEST"
  hash_key     = "user_id"

  attribute {
    name = "user_id"
    type = "S"
  }
}

(5)sqs.tf

sqs.tf
resource "aws_sqs_queue" "queue" {
  name = "my-queue"
}

(6)ses.tf

eventBridge.tf
resource "aws_ses_email_identity" "email" {
  email = var.your_email
}

(7)variables.tf

variables.tf
variable "project" {
  type    = string
  default = "go-api"
}

variable "environment" {
  type    = string
  default = "dev"
}

variable "your_email" {
  type    = string
  default = "<ご自身のメールアドレスを入力>"
}

Lamda関数

本サンプルアプリでは二つのLamdaを利用・定義しています。前段のLamdaはGoで、後段のLamdaはNode.jsで記述しています。※言語の選定理由は特にないです。

main.go

main.go
package main

import (
	"encoding/json"
	"fmt"
	"github.com/aws/aws-lambda-go/lambda"
	"github.com/aws/aws-sdk-go/aws"
	"github.com/aws/aws-sdk-go/aws/session"
	"github.com/aws/aws-sdk-go/service/dynamodb"
	"github.com/aws/aws-sdk-go/service/dynamodb/dynamodbattribute"
	"github.com/aws/aws-sdk-go/service/sqs"
	"log"
	"os"
)

type User struct {
	UserID      string `json:"user_id"`
	Timestamp   string `json:"timestamp"`
	Status      string `json:"status"`
	MailAddress string `json:"mailAddress"`
	Name        string `json:"name"`
}

func HandleRequest() (string, error) {
	sess := session.Must(session.NewSession(&aws.Config{
		Region: aws.String("ap-northeast-1"),
	}))

	db := dynamodb.New(sess)
	sqsSvc := sqs.New(sess)

	// バッチ処理の対象となるキーを固定値で指定
	// ex. DynamoDBからデータを取得する

	tableName := "users"
	userID := "1"

	params := &dynamodb.GetItemInput{
		TableName: aws.String(tableName),
		Key: map[string]*dynamodb.AttributeValue{
			"user_id": {
				S: aws.String(userID),
			},
		},
	}

	result, err := db.GetItem(params)
	if err != nil {
		errorMessage := fmt.Sprintf("Error getting item from DynamoDB: %v", err)
		log.Println(errorMessage)
		return "", fmt.Errorf(errorMessage)
	}

	user := User{}
	err = dynamodbattribute.UnmarshalMap(result.Item, &user)
	if err != nil {
		errorMessage := fmt.Sprintf("Error unmarshalling result from DynamoDB: %v", err)
		log.Println(errorMessage)
		return "", fmt.Errorf(errorMessage)
	}

	messageBody, err := json.Marshal(map[string]string{
		"email": user.MailAddress,
		"name":  user.Name,
	})
	if err != nil {
		errorMessage := fmt.Sprintf("Error marshalling message body: %v", err)
		log.Println(errorMessage)
		return "", fmt.Errorf(errorMessage)
	}

	sqsQueueURL := os.Getenv("SQS_QUEUE_URL")
	sqsParams := &sqs.SendMessageInput{
		QueueUrl:    aws.String(sqsQueueURL),
		MessageBody: aws.String(string(messageBody)),
	}

	_, err = sqsSvc.SendMessage(sqsParams)
	if err != nil {
		errorMessage := fmt.Sprintf("Error sending message to SQS: %v", err)
		log.Println(errorMessage)
		return "", fmt.Errorf(errorMessage)
	}

	return "Success", nil
}

func main() {
	lambda.Start(HandleRequest)
}

index.mjs

index.mjs
import { SESClient, SendEmailCommand } from "@aws-sdk/client-ses";
const ses = new SESClient({ region: "ap-northeast-1" });

// ハンドラ関数をエクスポート
export const handler = async function(event) {
  for (const record of event.Records) {
    try {
      const body = JSON.parse(record.body);
      const email = body.email;
      const name = body.name;

      // ここで何かしらの処理を行う想定(ex.DynamoDBのデータ)

      if (!email || !name) {
        console.error('Email or name missing from the record:', record);
        continue;
      }

      const sourceEmail = process.env.EMAIL_SOURCE || 'null';

      const params = {
        Destination: {
          ToAddresses: [email],
        },
        Message: {
          Body: {
            Text: { Data: `${name}様への本文` },
          },
          Subject: { Data: `${name}様へのタイトル` },
        },
        Source: sourceEmail,
      };

      const command = new SendEmailCommand(params);
      await ses.send(command);
      console.log(`Email sent successfully to ${email}`);
    } catch (error) {
      console.error('Error processing record:', record, 'Error:', error);
    }
  }
};
34
24
2

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
34
24

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?