5
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

S3・SQS・Lambdaで自動マスタリングアプリを作る

Last updated at Posted at 2024-03-17

概要

今回は、AWSのSQS・Lambdaを練習も兼ねて触ってみようということで自動マスタリングアプリを開発しました。
ちなみにマスタリングとは、音楽制作の最終工程であり、最終的に配信やCDプレスするために音質を整える作業のことです。

※ファイルをS3にアップロード・ダウンロードするUI等は実装しません。

音圧爆上げくんとは

AIを使用したマスタリングを、オンラインで自動的に行なってくれるサービスです。
以前は有料機能もあったようですが、2024年3月17日現在は無料で使えます。

APIが提供されており、今回使用させていただきました。

実装には、こちらのチュートリアルが役立ちます。

成果物

今回開発したアプリの使い方の説明です。

1. 音声ファイルをS3バケットに配置する

1-a. マスタリング待ち(音圧爆上げくん開発者向けページ

1-b. マスタリング中

1-c. マスタリング完了!

2. マスタリング済みファイルが別のS3バケットに配置される

構成

アプリの構成は以下のとおりです。

  • ユーザーがS3にファイルをアップロードすると、S3からSQSに向けてイベント通知を行います。
  • SQSのメッセージをトリガーに、Lambda関数が実行されます。
  • Lambda関数の中で音圧爆上げくんAPIを呼び出し、マスタリングを行います。
  • マスタリング済みの音源をS3に配置します。

プロジェクト構成

プロジェクト構成は以下のとおりです。

auto-mastering-app
├── infra
│   ├── fuga
│   │    └── bootstrap.zip // Lambdaにデプロイするzipファイル
│   │
│   ├── .terraform.lock.hcl
│   ├── main.tf
│   ├── lambda.tf
│   ├── s3.tf
│   └── sqs.tf
│
└── lambda
    ├── bin
    │    └──bootstrap // goのbuildファイル
    │
    └── cmd
                ├── mastering
         │    └── mastering.go
                ├── s3_client
         │    └── s3.go
                ├── ssm
         │    └── ssm.go
           ├── go.mod
           ├── go.sum
           ├── main.go
           └── build.sh // goをbuildするshell

インフラの準備

S3

S3の基本的な定義をしています。
パブリックアクセスのブロック、暗号化、イベント通知ですね。
今回は、オリジナルの音源ファイルを配置するバケットと、マスタリング後の音源ファイルを配置するバケットの2種類を用意します。

s3.tf
# マスタリング前のファイルを置くバケット
resource "aws_s3_bucket" "mastering_appbucket" {
  bucket        = "${local.env}-mastering-app-bucket"
  tags          = {}
  force_destroy = true
}

# バケットに対するパブリックアクセスのブロック
resource "aws_s3_bucket_public_access_block" "mastering_appbucket" {
  bucket = aws_s3_bucket.mastering_appbucket.bucket

  block_public_acls       = true
  block_public_policy     = true
  ignore_public_acls      = true
  restrict_public_buckets = true
}

# S3のデフォルト暗号化
resource "aws_s3_bucket_server_side_encryption_configuration" "mastering_appbucket" {
  bucket = aws_s3_bucket.mastering_appbucket.bucket

  rule {
    apply_server_side_encryption_by_default {
      sse_algorithm = "AES256"
    }
  }
}

# sqsに対してのイベント通知設定
resource "aws_s3_bucket_notification" "mastering_appbucket_notification" {
  bucket = aws_s3_bucket.mastering_appbucket.id

  queue {
    queue_arn     = aws_sqs_queue.mastering_app_queue.arn
    events        = ["s3:ObjectCreated:*"]
  }
}

# マスタリング後のファイルを置くバケット
resource "aws_s3_bucket" "mastering_appbucket_mastered" {
  bucket        = "${local.env}-mastering-app-bucket-mastered"
  tags          = {}
  force_destroy = true
}

resource "aws_s3_bucket_public_access_block" "mastering_appbucket_mastered" {
  bucket = aws_s3_bucket.mastering_appbucket_mastered.bucket

  block_public_acls       = true
  block_public_policy     = true
  ignore_public_acls      = true
  restrict_public_buckets = true
}

resource "aws_s3_bucket_server_side_encryption_configuration" "mastering_appbucket_mastered" {
  bucket = aws_s3_bucket.mastering_appbucket_mastered.bucket

  rule {
    apply_server_side_encryption_by_default {
      sse_algorithm = "AES256"
    }
  }
}

SQS

次に、SQSです。こちらも基本的な設定です。
料金節約のため、ロングポーリングを設定したり、メッセージ保持時間を短く設定しています。

通常のキューとデッドレターキューを同時に定義すると循環参照エラーが発生してしまうので、
回避のためにデッドレターキュー側のsourceQueueArnsではキューのarnを直接指定しています。

sqs.tf
resource "aws_sqs_queue" "mastering_app_queue" {
  name   = "mastering_app_queue"
  policy = data.aws_iam_policy_document.mastering_app_queue.json
  # ロングポーリングを有効にすることで、空のレスポンスを減らす
  receive_wait_time_seconds = 20
  # メッセージ保持は15分(練習なので短くていい)
  message_retention_seconds = 60 * 15
  # 可視性タイムアウト設定
  visibility_timeout_seconds = 300

    // 再実行に関する設定
  redrive_policy = jsonencode({
    // 失敗したら以下のデッドレターキューにデータをエンキューする
    deadLetterTargetArn = aws_sqs_queue.dlq.arn
    // 最大2回まで実行する (同じメッセージを3回目に受け取ったらデッドレターキューに送る)
    maxReceiveCount     = 2
  })
}

# デッドレターキュー
resource "aws_sqs_queue" "dlq" {
  name = "mastering_app_queue_dlq"
  message_retention_seconds = 60 * 60 * 1
  visibility_timeout_seconds = 300
  receive_wait_time_seconds = 20

    redrive_allow_policy = jsonencode({
    redrivePermission = "byQueue",
    # resource "aws_sqs_queue" "mastering_app_queue"に対して循環参照になるので、ARNを直接指定
    sourceQueueArns   = ["arn:aws:sqs:ap-northeast-1:1234567890:mastering_app_queue"]
  })
}

# s3からのイベントを受け取るためのポリシー
data "aws_iam_policy_document" "mastering_app_queue" {
  statement {
    effect = "Allow"

    principals {
      type        = "*"
      identifiers = ["*"]
    }

    actions   = ["sqs:SendMessage"]
    # resource "aws_sqs_queue" "mastering_app_queue"に対して循環参照になるので、ARNを直接指定
    resources = ["arn:aws:sqs:*:*:mastering_app_queue"]

    condition {
      test     = "ArnEquals"
      variable = "aws:SourceArn"
      values   = [aws_s3_bucket.mastering_appbucket.arn]
    }
  }
}

Lambda

最後にLambdaです。
SQSからメッセージを受け取ったり、S3へアクセスしたりと、他のサービスを利用するための権限付与や
CloudWatch Logsへのログ出力などを定義しています。

また、data "archive_file"を定義することでterraform plan時に、Lambdaにデプロイするgoのビルドファイルをzip圧縮してくれます。

lambda.tf
# lambda関数のポリシー
data "aws_iam_policy_document" "lambda_sqs_policy" {

  # LambdaがSQSにアクセスするためのポリシー
  statement {
    effect    = "Allow"
    resources = [aws_sqs_queue.mastering_app_queue.arn, aws_sqs_queue.dlq.arn]

    actions = [
      "sqs:ChangeMessageVisibility",
      "sqs:DeleteMessage",
      "sqs:GetQueueAttributes",
      "sqs:ReceiveMessage",
      "sqs:SendMessage",
    ]
  }

  # LambdaがS3にアクセスするためのポリシー
  statement {
    effect = "Allow"
    resources = [
      aws_s3_bucket.mastering_appbucket.arn,
      "${aws_s3_bucket.mastering_appbucket.arn}/*",
      aws_s3_bucket.mastering_appbucket_mastered.arn,
      "${aws_s3_bucket.mastering_appbucket_mastered.arn}/*"
    ]
    actions = [
        "s3:ListBucket",
        "s3:GetObject",
        "s3:PutObject"
    ]
  }

  # Lambdaがssmにアクセスするためのポリシー
  statement {
    effect    = "Allow"
    resources = ["*"]
    actions = [
      "ssm:*"
    ]
  }
}

resource "aws_iam_policy" "lambda_sqs_policy" {
  name = "lambda_sqs_policy"
  policy = data.aws_iam_policy_document.lambda_sqs_policy.json
}

resource "aws_iam_role" "assume_role_for_lambda" {
  name               = "iam_for_lambda"
  assume_role_policy = data.aws_iam_policy_document.lambda_assume_role.json
}

# 信頼ポリシー(Lambda サービスプリンシパルに自分の役割を引き受けるアクセス権限を付与する)
data "aws_iam_policy_document" "lambda_assume_role" {
  statement {
    effect = "Allow"

    principals {
      type        = "Service"
      identifiers = ["lambda.amazonaws.com"]
    }

    actions = ["sts:AssumeRole"]
  }
}

resource "aws_iam_role_policy_attachment" "default" {
  role       = aws_iam_role.assume_role_for_lambda.name
  policy_arn = aws_iam_policy.lambda_sqs_policy.arn
}

// terraform plan実行時にbuildファイルをzipに圧縮する
data "archive_file" "go_function_zip" {
  type        = "zip"
  source_file = "../lambda/bin/bootstrap"
  output_path = "./archive/bootstrap.zip"
}

resource "aws_lambda_function" "mastering_app_function" {
  filename      = data.archive_file.go_function_zip.output_path
  function_name = "mastering_app_function"
  role          = aws_iam_role.assume_role_for_lambda.arn
  handler       = "mastering.handler"
  runtime       = "provided.al2"
  source_code_hash = data.archive_file.go_function_zip.output_base64sha256

  timeout = 600
  reserved_concurrent_executions = 1
  
  environment {
    variables = {
      S3_BUCKET_NAME_MASTERED = aws_s3_bucket.mastering_appbucket_mastered.bucket
    }
  }
  dead_letter_config {
    target_arn = aws_sqs_queue.dlq.arn
  }
}

# SQSをトリガーとしてLambdaを起動する
resource "aws_lambda_event_source_mapping" "mastering_app" {
  event_source_arn = aws_sqs_queue.mastering_app_queue.arn
  function_name    = aws_lambda_function.mastering_app_function.arn
}


# CloudWatch Logs
resource "aws_cloudwatch_log_group" "mastering_app" {
  name              = "/aws/lambda/${aws_lambda_function.mastering_app_function.function_name}"
  # 練習なのでログの保持日数は短くていい
  retention_in_days = 1
}

# LambdaからCloudWatchLogsにログを出力するための権限
data "aws_iam_policy_document" "lambda_logging" {
  statement {
    effect = "Allow"

    actions = [
      "logs:CreateLogGroup",
      "logs:CreateLogStream",
      "logs:PutLogEvents",
    ]

    resources = ["arn:aws:logs:*:*:*"]
  }
}

resource "aws_iam_policy" "lambda_logging" {
  name        = "mastering_app_log"
  path        = "/"
  description = "IAM policy for logging from a lambda"
  policy      = data.aws_iam_policy_document.lambda_logging.json
}

resource "aws_iam_role_policy_attachment" "lambda_logs" {
  role       = aws_iam_role.assume_role_for_lambda.name
  policy_arn = aws_iam_policy.lambda_logging.arn
}

Lambda用プログラムの準備

Lambda関数のエントリポイントです。
環境変数を読み込んだり、S3やSSM Parameterを読み込んだりします。

main.go
package main

import (
	"encoding/json"
	"fmt"
	"net/url"
	"os"
	"path/filepath"
	"strings"
	"time"

	"github.com/aws/aws-lambda-go/events"
	"github.com/aws/aws-lambda-go/lambda"
	"github.com/aws/aws-sdk-go/aws"
	"github.com/auto-mastering-app/mastering"
	"github.com/hogehoge/auto-mastering-app/s3_client"
	"github.com/hogehoge/auto-mastering-app/ssm"

	"github.com/labstack/gommon/log"
)

func main() {
	lambda.Start(Handler)
}

func Handler(event events.SQSEvent) {
	log.Info("s3にファイルが配置されました")
	s3BucketNameMastered := os.Getenv("S3_BUCKET_NAME_MASTERED")

	awsConfig := &aws.Config{
		Region: aws.String("ap-northeast-1"),
	}

	log.Info("s3Clientを初期化します")
	s3Client := s3_client.NewS3Client(s3_client.S3ClientDeps{
		AwsConfig: awsConfig,
	})

	ssmClient := ssm.NewSsmParamClient(ssm.SsmParamClientDeps{
		AwsConfig: awsConfig,
	})
	masteringToken, err := ssmClient.GetParameter("AIMASTERING_ACCESS_TOKEN")
	if err != nil {
		log.Error("Error getting parameter from ssm:", err)
		return
	}

	for _, record := range event.Records {
		fmt.Println(record.Body)

		log.Info("sqsメッセージをパースします")
		bucket, key := getS3InfoBySqsMessage(record)

		if !checkIsAllowedAudioFile(key) {
			log.Info("許可されていないファイル形式です。処理を終了します。")
			return
		}

		log.Info("s3からファイルを取得します")
		srcObj, err := s3Client.GetS3File(key, bucket)
		if err != nil {
			log.Error("Error getting file from s3:", err)
			return
		}

		tmpFileName, err := addSuffixToFileName(key, time.Now().Format("20060102150405"))
		if err != nil {
			log.Error("Error changing file name:", err)
			return
		}
		tmpFilePath := fmt.Sprintf("/tmp/%s", tmpFileName)

		log.Info("s3から取得したファイルをtmpフォルダに保存します")
		_, err = s3Client.ConvertS3ObjectToFile(srcObj, tmpFilePath)
		if err != nil {
			log.Error("Error converting s3 object to file:", err)
			return
		}
		masterdObjectKey, err := addSuffixToFileName(key, "master")
		if err != nil {
			log.Error("Error changing file name:", err)
			return
		}
		masterdFilePath := fmt.Sprintf("/tmp/%s", masterdObjectKey)

		err = mastering.ExecMastering(tmpFilePath, masterdFilePath, masteringToken)
		if err != nil {
			log.Error("Error executing mastering:", err)
			return
		}

		log.Info(fmt.Sprintf("s3にマスタリング済みファイルを配置します。送信先: %s", s3BucketNameMastered))
		err = s3Client.PutS3File(masterdObjectKey, s3BucketNameMastered, masterdFilePath)
		if err != nil {
			log.Error("Error putting file to s3:", err)
			return
		}
		log.Info("完了")
	}

}

func getS3InfoBySqsMessage(record events.SQSMessage) (bucket string, key string) {
	// SQSメッセージの本文(Body)をパースしてS3イベント情報を取得
	var sqsEvents struct {
		Records []struct {
			S3 struct {
				Bucket struct {
					Name string `json:"name"`
				} `json:"bucket"`
				Object struct {
					Key string `json:"key"`
				} `json:"object"`
			} `json:"s3"`
		} `json:"Records"`
	}

	err := json.Unmarshal([]byte(record.Body), &sqsEvents)
	if err != nil {
		log.Error("Error parsing SQS message body:", err)
		return
	}

	// URLデコードする
	decodedKey, err := url.QueryUnescape(sqsEvents.Records[0].S3.Object.Key)
	if err != nil {
		log.Error("Error url decoding key:", err)
		return
	}

	// バケット名とキーを取得
	bucket = sqsEvents.Records[0].S3.Bucket.Name
	key = decodedKey

	fmt.Println("Bucket Name:", bucket)
	fmt.Println("Key:", decodedKey)

	return
}

func addSuffixToFileName(orgFileName string, suffix string) (string, error) {

	// ファイル名と拡張子を分割
	filename := strings.TrimSuffix(orgFileName, filepath.Ext(orgFileName))
	extension := filepath.Ext(orgFileName)

	// 新しいファイル名を作成
	newFilename := fmt.Sprintf("%s-%s%s", filename, suffix, extension)

	return newFilename, nil
}

func checkIsAllowedAudioFile(fileName string) bool {
	// 拡張子がwav, mp3, aiffの場合はtrueを返す
	switch filepath.Ext(fileName) {
	case ".wav", ".mp3":
		return true
	}
	return false
}

S3クライアントです。
オブジェクトのGetおよびPut、また、取得したS3オブジェクトをLambda内で操作するためにos.Fileに変換したりします。

s3.go
package s3_client

import (
	"io"
	"os"

	"github.com/aws/aws-sdk-go/aws"
	"github.com/aws/aws-sdk-go/aws/session"
	"github.com/aws/aws-sdk-go/service/s3"
	"github.com/labstack/gommon/log"
)

type S3Client interface {
	GetS3File(objectKey string, bucketName string) (*s3.GetObjectOutput, error)
	PutS3File(objectKey string, bucketName string, filePath string) error
	ConvertS3ObjectToFile(srcObj *s3.GetObjectOutput, filePath string) error
}

type S3ClientImpl struct {
	*S3ClientDeps
}

type S3ClientDeps struct {
	AwsConfig *aws.Config
}

func NewS3Client(deps S3ClientDeps) *S3ClientImpl {
	return &S3ClientImpl{
		S3ClientDeps: &deps,
	}
}

func (sc *S3ClientImpl) GetS3File(objectKey string, bucketName string) (*s3.GetObjectOutput, error) {
	sess, err := session.NewSession(sc.AwsConfig)
	if err != nil {
		log.Fatal(err)
		return nil, err
	}

	S3client := s3.New(sess)
	srcObj, err := S3client.GetObject(&s3.GetObjectInput{
		Bucket: aws.String(bucketName),
		Key:    aws.String(objectKey),
	})
	if err != nil {
		log.Fatal(err)
		return nil, err
	}
	return srcObj, nil
}

func (sc *S3ClientImpl) PutS3File(objectKey string, bucketName string, filePath string) error {
	sess, err := session.NewSession(&aws.Config{})
	if err != nil {
		log.Fatal(err)
		return err
	}

	file, err := os.Open(filePath)
	if err != nil {
		log.Fatal(err)
		return err
	}
	defer file.Close()

	S3client := s3.New(sess)
	_, err = S3client.PutObject(&s3.PutObjectInput{
		Bucket: aws.String(bucketName),
		Key:    aws.String(objectKey),
		Body:   file,
	})
	if err != nil {
		log.Fatal(err)
		return err
	}
	return nil
}

func (s3 *S3ClientImpl) ConvertS3ObjectToFile(srcObj *s3.GetObjectOutput, filePath string) (*os.File, error) {
	// s3のファイルをローカルに保存する
	file, err := os.Create(filePath)
	if err != nil {
		log.Error(err)
		return nil, err
	}

	defer func() {
		if closeErr := file.Close(); closeErr != nil {
			if err == nil {
				err = closeErr
			} else {
				log.Error(closeErr)
			}
		}
	}()

	_, err = io.Copy(file, srcObj.Body)
	if err != nil {
		log.Error(err)
		return nil, err
	}
	return file, nil
}

SSM Parameterを取得します。
今回は音圧爆上げくんのAPI Keyを取得するために使用します。

ssm.go
package ssm

import (
	"github.com/aws/aws-sdk-go/aws"
	"github.com/aws/aws-sdk-go/aws/session"
	"github.com/aws/aws-sdk-go/service/ssm"
	"github.com/labstack/gommon/log"
)

type SsmParamClient interface {
	GetParameter() (string, error)
}

type SsmParamClientImpl struct {
	*SsmParamClientDeps
}

type SsmParamClientDeps struct {
	AwsConfig *aws.Config
}

func NewSsmParamClient(deps SsmParamClientDeps) *SsmParamClientImpl {
	return &SsmParamClientImpl{
		SsmParamClientDeps: &deps,
	}
}

func (ssmi *SsmParamClientImpl) GetParameter(parameterName string) (string, error) {

	sess, err := session.NewSessionWithOptions(session.Options{
		Config: *ssmi.AwsConfig,
	})
	if err != nil {
		log.Error(err)
		return "", err
	}
	svc := ssm.New(sess)

	res, err := svc.GetParameter(&ssm.GetParameterInput{
		Name: aws.String(parameterName),
		// パラメータが暗号化している場合は復号して取得
		WithDecryption: aws.Bool(true),
	})
	if err != nil {
		log.Error(err)
		return "", err
	}
	return *res.Parameter.Value, nil
}

音圧爆上げくんAPIを呼び出し、マスタリングを実行します。

mastering.go
package mastering

import (
	"context"
	"fmt"
	"io"
	"net/http"
	"os"
	"time"

	"github.com/ai-mastering/aimastering-go"
	"github.com/labstack/gommon/log"
)

func ExecMastering(sourceFilePath, masterdFilePath, apiToken string) error {

	log.Info("masteringを実行します")

	client := aimastering.NewAPIClient(aimastering.NewConfiguration())
	auth := context.WithValue(context.Background(), aimastering.ContextAPIKey, aimastering.APIKey{
		Key: apiToken,
	})

	inputAudioFile, err := os.Open(sourceFilePath)
	if err != nil {
		log.Fatal(err)
		return err
	}
	defer inputAudioFile.Close()

	inputAudio, _, err := client.AudioApi.CreateAudio(auth, map[string]interface{}{
		"file": inputAudioFile,
	})
	if err != nil {
		log.Fatal(err)
		return err
	}
	fmt.Fprintf(os.Stderr, "The input audio was uploaded id %d\n", inputAudio.Id)

	// マスタリング開始
	mastering, _, err := client.MasteringApi.CreateMastering(auth, inputAudio.Id, map[string]interface{}{
		"mode": "default",
	})
	if err != nil {
		log.Fatal(err)
		return err
	}
	fmt.Fprintf(os.Stderr, "The mastering started id %d\n", mastering.Id)

	// マスタリングが完了するまで待機
	for mastering.Status == "processing" || mastering.Status == "waiting" {
		mastering, _, err = client.MasteringApi.GetMastering(auth, mastering.Id)
		if err != nil {
			log.Fatal(err)
			return err
		}
		fmt.Fprintf(os.Stderr,
			"waiting for the mastering completion %d%%\n", int(100*mastering.Progression))
		time.Sleep(5 * time.Second)
	}

	// download output audio
	// notes
	// - client.AudioApi.DownloadAudio cannot be used because swagger-codegen doesn't support binary string response in golang
	// - instead use GetAudioDownloadToken (to get signed url) + HTTP Get
	fmt.Printf("mastering.OutputAudioId: %d\n", mastering.OutputAudioId)
	audioDownloadToken, resp, _ := client.AudioApi.GetAudioDownloadToken(auth, mastering.OutputAudioId)

	defer resp.Body.Close()

	// マスタリング済みファイルをダウンロード
	resp, err = http.Get(audioDownloadToken.DownloadUrl)
	if err != nil {
		log.Fatal(err)
		return err
	}

	fmt.Println("outputファイルを生成")
	outputAudioFile, err := os.Create(masterdFilePath)
	if err != nil {
		log.Fatal(err)
		return err
	}
	defer outputAudioFile.Close()

	fmt.Println("outputファイルに書き込み")
	_, err = io.Copy(outputAudioFile, resp.Body)
	if err != nil {
		log.Fatal(err)
		return err
	}

	fmt.Fprintf(os.Stderr,
		"The Output audio was saved to %s\n", masterdFilePath)

	log.Info("masteringが完了しました")
	return nil
}

まとめ

最近AWSのSAAを取得しましたが、やはり実際に試してみることは重要ですね。

ただ、土台となる知識が付けられた分、以前に比べ実装の中で何をやるべきかが理解しやすかったです。
付けた知識を実践で定着させる、これからもやっていきたいですね!

5
2
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
5
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?