8
9

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

CodePipelineからDBマイグレーションを自動実行する

Posted at

はじめに

最近CodePipelineでCI/CDを組む機会が増えてきたのですが、悩ましいのがDBのマイグレーションでした。

  • ソースのデプロイはパイプラインで自動化出来るが、DBスキーマ変更は自動化しにくい
    • DBはVPCのprivate subnetにいるので、ビルドサーバからDBに接続できるようにネットワークを整備する必要がある
  • 手動で適用するには踏み台を経由してDB接続を確立して適用するなど、必要な手順が増えがち
    • 「DBスキーマ適用担当」みたいな昔懐かしいDBA的な人が現れたりする

というわけで、マイグレーションを自動実行するために必要なパイプライン構成について少し真面目に考えてみたいと思います。

今回想定する構成

CodePipeline

CodePipelineはCI/CDを実現するAWSのサービスです。
CodeCommit/CodeBuild/CodeDeployなどの各種サービスをパイプラインでつなぐことで継続的なデプロイを実現します。
今回は簡略化のため

  • マイグレーションファイルを管理しているリポジトリがCodeCommitにある
  • デプロイフェーズではマイグレーションの実行のみを行う

という想定とします。

DBのマイグレーションツール

データベースのスキーマを管理するツールです。
SQLやそれに類するDSLをバージョン管理することで、環境ごとのスキーマの適用漏れなどを防ぐために使用します。
ActiveRecordのようなフレームワークに組み込まれているものやFlywayといったスタンドアロンのツールなど様々ありますが、今回はGo製のsql-migrateを使用することを想定しています。

マイグレーションを実行するフェーズの検討

マイグレーションを実行するフェーズは

  1. CodeBuild内で実行する
  2. Lambda呼び出しで実行する

ケースの2案が考えられそうです。

課題

  • CodeBuildでマイグレーションを行う場合はVPCに接続する必要があります
  • Lambda呼び出しによってマイグレーションを実行する場合はLambdaをVPCに接続する必要があります
    • Lambda側からCodePipelineにジョブの終了をCallbackする必要があるため、NATインターフェースやVPCエンドポイントが必要になります

NATゲートウェイを使用しない構成

NATゲートウェイやインターフェース型VPCエンドポイントには維持コストがかかります。
総合的にペイするのであれば問題ないと思いますが、たまに発生するDBマイグレーションのためだけにNATゲートウェイを維持するのはコストに見合わないと感じるケースもあると思います。
その対策として、NATゲートウェイ抜きでマイグレーション実行を実現するために、Lambdaを多段構成にして呼び出します。

  1. 1段目のLambdaは非VPCのLambdaとして構成し、CodePipelineから実行します
  2. Pipeline実行Lambda内で更にマイグレーション実行用のLambdaをInvokeします
  3. マイグレーション実行用LambdaはVPCに接続し、private subnetにいるDBに対してマイグレーションを実行します
  4. S3のみゲートウェイ型のVPCエンドポイントを用意しておけばソースコードは取得可能です
  5. マイグレーション実行Lambdaが正常終了すればPipeline実行Lambda内でパイプラインジョブを完了します

Migration.png

今回はこの構成で組んでみたいと思います。

CDKスタック

Pipeline

ソースステージにCodeCommitを、デプロイステージにPipeline実行Lambdaを指定したパイプラインを構築します

pipeline.ts
const sourceOutput = new pipeline.Artifact()
new pipeline.Pipeline(this, 'Pipeline', {
  artifactBucket: bucket,
  pipelineName: pipelineName,
  stages: [
    {
      stageName: 'Source',
      actions: [
        new pipeline_actions.CodeCommitSourceAction({
          actionName: 'CodeCommit_Source',
          output: sourceOutput,
          repository: repository,
          branch: branchName,
        })
      ]
    },
    {
      stageName: 'Deploy',
      actions: [
        new pipeline_actions.LambdaInvokeAction({
          actionName: 'Migration_Lambda',
          lambda: triggerFunction,
          inputs: [sourceOutput]
        })
      ]
    }
  ]
})

Pipeline実行Lambda

CodePipelineのJobを完了する権限、マイグレーション実行Lambdaを起動する権限を持ったRoleを作成します。

role.ts
const triggerRole = new iam.Role(this, 'TriggerLambdaExecutionRole', {
  assumedBy: new iam.CompositePrincipal(
    new iam.ServicePrincipal('lambda.amazonaws.com')
  ),
  managedPolicies: [
    lambdaBasicExecutionRolePolicy,
  ],
})
triggerRole.addToPolicy(new iam.PolicyStatement({
  resources: ['*'],
  actions: [
    'codepipeline:PutJobSuccessResult',
    'codepipeline:PutJobFailureResult',
    'lambda:InvokeFunction'
  ]
}))

LambdaFunctionを作成します。

function.ts
const triggerFunction = new lambda.Function(this, 'TriggerLambdaFunction', {
  code: triggerCode,
  handler: 'main',
  runtime: lambda.Runtime.GO_1_X,
  logRetention: 1,
  environment: {
    'FunctionName': functionName
  },
  functionName: 'trigger-lambda',
  memorySize: 256,
  role: triggerRole,
})

マイグレーション実行Lambda

接続するVPC情報(VPC,サブネット,ルートテーブル)を取得します。

vpc.ts
const vpc = ec2.Vpc.fromVpcAttributes(this, 'VPC', {
  vpcId: vpcId,
  availabilityZones: ['ap-northeast-1a', 'ap-northeast-1c', 'ap-northeast-1d'],
})
const selectSubnets = vpc.selectSubnets({
  subnets: [
    ec2.Subnet.fromSubnetAttributes(this, 'Subnet', {
      availabilityZone: availabilityZone,
      subnetId: subnetId,
      routeTableId: routeTableId,
    }),
  ]
})
const securityGroup = ec2.SecurityGroup.fromSecurityGroupId(this, 'SecurityGroup', securityGroup)

VPCへのアクセス、S3へのアクセス権限を持ったRoleを作成します。

role.ts
const migrationRole = new iam.Role(this, 'MigrationLambdaExecutionRole', {
  assumedBy: new iam.CompositePrincipal(
    new iam.ServicePrincipal('lambda.amazonaws.com')
  ),
  managedPolicies: [
    lambdaBasicExecutionRolePolicy,
    iam.ManagedPolicy.fromManagedPolicyArn(this, 'AWSLambdaVPCAccessExecutionRole', 'arn:aws:iam::aws:policy/service-role/AWSLambdaVPCAccessExecutionRole'),
    iam.ManagedPolicy.fromManagedPolicyArn(this, 'S3ReadOnly', 'arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess'),         
  ],
})

LambdaFunctionを作成します。

function.ts
const migrationFunction = new lambda.Function(this, 'MigrationLambdaFunction', {
  code: migrationCode,
  handler: 'main',
  runtime: lambda.Runtime.GO_1_X,
  allowPublicSubnet: true,
  logRetention: 1,
  functionName: 'migration-lambda',
  memorySize: 256,
  role: migrationRole,
  vpc: vpc,
  vpcSubnets: selectSubnets,
  securityGroups: [securityGroup]
})

参考: スタックサンプル

参考CDKスタック
migration-lambda-stack.ts
import * as cdk from '@aws-cdk/core'
import * as lambda from '@aws-cdk/aws-lambda'
import * as iam from '@aws-cdk/aws-iam'
import * as ec2 from '@aws-cdk/aws-ec2'
import * as pipeline from '@aws-cdk/aws-codepipeline'
import * as pipeline_actions from '@aws-cdk/aws-codepipeline-actions'
import * as codecommit from '@aws-cdk/aws-codecommit'
import * as s3 from '@aws-cdk/aws-s3'
import * as path from 'path'

export class MigrationLambdaStack extends cdk.Stack {
  constructor(scope: cdk.Construct, id: string, props?: cdk.StackProps) {
    super(scope, id, props)

    // 環境情報を取得
    const repositoryName = this.node.tryGetContext('repositoryName')
    const branchName= this.node.tryGetContext('branchName')
    const bucketName = this.node.tryGetContext('bucketName')
    const subnetId = this.node.tryGetContext('subnetId')
    const routeTableId= this.node.tryGetContext('routeTableId')
    const vpcId= this.node.tryGetContext('vpcId')
    const availabilityZone= this.node.tryGetContext('availabilityZone')
    const securityGroup= this.node.tryGetContext('securityGroup')

    // デプロイ先VPC環境の取得
    const vpc = ec2.Vpc.fromVpcAttributes(this, 'VPC', {
      vpcId: vpcId,
      availabilityZones: ['ap-northeast-1a', 'ap-northeast-1c', 'ap-northeast-1d'],
    })

    const selectSubnets = vpc.selectSubnets({
      subnets: [
        ec2.Subnet.fromSubnetAttributes(this, 'Subnet', {
          availabilityZone: availabilityZone,
          subnetId: subnetId,
          routeTableId: routeTableId,
        }),
      ]
    })

    const securityGroup = ec2.SecurityGroup.fromSecurityGroupId(this, 'SecurityGroup', securityGroup)

    // Lambda構築
    const deployCode = lambda.Code.fromAsset(path.resolve(__dirname, 'bin/main.zip'))
    const migrationCode = lambda.Code.fromAsset(path.resolve(__dirname, 'bin/migration.zip'))
    const lambdaBasicExecutionRolePolicy = iam.ManagedPolicy.fromManagedPolicyArn(this, 'LambdaBasicExecutionRole', 'arn:aws:iam::aws:policy/AWSLambdaExecute')
    const triggerRole = new iam.Role(this, 'TriggerLambdaExecutionRole', {
      assumedBy: new iam.CompositePrincipal(
        new iam.ServicePrincipal('lambda.amazonaws.com')
      ),
      managedPolicies: [
        lambdaBasicExecutionRolePolicy,
      ],
    })
    triggerRole.addToPolicy(new iam.PolicyStatement({
      resources: ['*'],
      actions: [
        'codepipeline:PutJobSuccessResult',
        'codepipeline:PutJobFailureResult',
        'lambda:InvokeFunction'
      ]
    }))
    const migrationRole = new iam.Role(this, 'MigrationLambdaExecutionRole', {
      assumedBy: new iam.CompositePrincipal(
        new iam.ServicePrincipal('lambda.amazonaws.com')
      ),
      managedPolicies: [
        lambdaBasicExecutionRolePolicy,
        iam.ManagedPolicy.fromManagedPolicyArn(this, 'AWSLambdaVPCAccessExecutionRole', 'arn:aws:iam::aws:policy/service-role/AWSLambdaVPCAccessExecutionRole'),
        iam.ManagedPolicy.fromManagedPolicyArn(this, 'S3ReadOnly', 'arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess'),         
      ],
    })

    const migrationFunction = new lambda.Function(this, 'MigrationLambdaFunction', {
      code: migrationCode,
      handler: 'main',
      runtime: lambda.Runtime.GO_1_X,
      allowPublicSubnet: true,
      logRetention: 1,
      functionName: 'migration-lambda',
      memorySize: 256,
      role: migrationRole,
      vpc: vpc,
      vpcSubnets: selectSubnets,
      securityGroups: [securityGroup]
    })
    const triggerFunction = new lambda.Function(this, 'TriggerLambdaFunction', {
      code: deployCode,
      handler: 'main',
      runtime: lambda.Runtime.GO_1_X,
      logRetention: 1,
      environment: {
        'FunctionName': migrationFunction.functionName
      },
      functionName: 'trigger-lambda',
      memorySize: 256,
      role: triggerRole,
    })

    const bucket = s3.Bucket.fromBucketName(this, 'ArtifactBucket', bucketName)
    const source = codecommit.Repository.fromRepositoryName(this, 'Repository', repositoryName)
    const sourceOutput = new pipeline.Artifact()

    // pipeline作成
    new pipeline.Pipeline(this, 'Pipeline', {
      artifactBucket: bucket,
      pipelineName: 'db-migration-pipeline',
      stages: [
        {
          stageName: 'Source',
          actions: [
            new pipeline_actions.CodeCommitSourceAction({
              actionName: 'CodeCommit_Source',
              output: sourceOutput,
              repository: source,
              branch: branchName,
            })
          ]
        },
        {
          stageName: 'Deploy',
          actions: [
            new pipeline_actions.LambdaInvokeAction({
              actionName: 'Migration_Lambda',
              lambda: triggerFunction,
              inputs: [sourceOutput]
            })
          ]
        }
      ]
    })
  }
}

Lambdaソース

Pipeline実行Lambda

  1. CodePipelineから実行される
  2. Sourceフェーズで取得したソースコードのアップロード先を取得する
  3. マイグレーション実行Lambdaを同期的に呼び出す
  4. パイプラインジョブを完了させる
参考コード
main.go
package main

import (
	"context"
	"encoding/json"
	"log"
	"os"

	"github.com/aws/aws-lambda-go/events"
	"github.com/aws/aws-lambda-go/lambda"
	"github.com/aws/aws-sdk-go/aws"
	"github.com/aws/aws-sdk-go/aws/session"
	"github.com/aws/aws-sdk-go/service/codepipeline"
	lambdaSdk "github.com/aws/aws-sdk-go/service/lambda"
)

func handle(context context.Context, e events.CodePipelineEvent) (string, error) {
	sess, err := session.NewSession()
	if err != nil {
		log.Println("Error creating session ", err)
		return "Error creating session", err
	}
	service := codepipeline.New(sess)
	job := e.CodePipelineJob

	// CodePipelineから渡されたソースコードのパスを取得し、マイグレーション実行Lambdaへ渡す
	payload, _ := json.Marshal(map[string]interface{}{
		"BucketName": job.Data.InputArtifacts[0].Location.S3Location.BucketName,
		"ObjectKey":  job.Data.InputArtifacts[0].Location.S3Location.ObjectKey,
	})
	log.Printf("Invoke Payload: %s", payload)
	lambdaService := lambdaSdk.New(sess, &aws.Config{
		Region: aws.String("ap-northeast-1"),
	})
	functionName := os.Getenv("FunctionName")

	// マイグレーション実行Lambdaを同期的に実行
	migrationResult, err := lambdaService.Invoke(&lambdaSdk.InvokeInput{
		FunctionName: &functionName,
		Payload:      payload,
	})
	// エラーが返ってきた場合はジョブを異常終了する
	if err != nil {
		log.Println("Error migration execution ", err)
		service.PutJobFailureResult(&codepipeline.PutJobFailureResultInput{
			JobId: &job.ID,
			FailureDetails: &codepipeline.FailureDetails{
				Type:    aws.String(codepipeline.FailureTypeJobFailed),
				Message: aws.String(err.Error()),
			},
		})
		return "Error migration execution", err
	}
	log.Printf("Migration Success!: %s", migrationResult.String())

	// ジョブを正常終了する
	output, err := service.PutJobSuccessResult(&codepipeline.PutJobSuccessResultInput{
		JobId: aws.String(job.ID),
	})
	if err != nil {
		log.Println("Error put job success", err)
	}
	log.Println(output.String())
	return output.GoString(), err
}

func main() {
	lambda.Start(handle)
}

Migraiont実行Lambda

  1. Pipeline実行Lambdaから起動させる
  2. ソースコードのアップロード先を引数に受け取り、S3からソースをダウンロードする
  3. マイグレーションを実行する
参考コード
main.go
package main

import (
	"archive/zip"
	"context"
	"database/sql"
	"fmt"
	"io"
	"io/ioutil"
	"log"
	"os"
	"path/filepath"
	"strings"

	"github.com/aws/aws-lambda-go/lambda"
	"github.com/aws/aws-sdk-go/aws"
	"github.com/aws/aws-sdk-go/aws/session"
	"github.com/aws/aws-sdk-go/service/s3"
	"github.com/aws/aws-sdk-go/service/s3/s3manager"
	_ "github.com/go-sql-driver/mysql"
	migrate "github.com/rubenv/sql-migrate"
)

// MigrationEvent マイグレーションに利用するソースオブジェクト
type MigrationEvent struct {
	BucketName string `json:"BucketName"`
	ObjectKey  string `json:"ObjectKey"`
}

// S3からダウンロードしたZipソースを解凍する
func unzip(src, dest string) error {
	r, err := zip.OpenReader(src)
	if err != nil {
		return err
	}
	defer r.Close()

	for _, f := range r.File {
		rc, err := f.Open()
		if err != nil {
			return err
		}
		defer rc.Close()

		if f.FileInfo().IsDir() {
			path := filepath.Join(dest, f.Name)
			os.MkdirAll(path, f.Mode())
		} else {
			buf := make([]byte, f.UncompressedSize)
			_, err = io.ReadFull(rc, buf)
			if err != nil {
				return err
			}
			if strings.Contains(f.Name, "/") {
				dir := filepath.Join(dest, f.Name[:strings.LastIndex(f.Name, "/")])
				os.MkdirAll(dir, 0755)
			}

			path := filepath.Join(dest, f.Name)
			err := ioutil.WriteFile(path, buf, f.Mode())
			if err != nil {
				return err
			}
		}
	}

	return nil
}

func handle(context context.Context, e MigrationEvent) (string, error) {
	sess, err := session.NewSession()
	if err != nil {
		log.Println("Error creating session ", err)
		return "Error creating session", err
	}

	tmpfile, err := ioutil.TempFile("/tmp", "srctmp_")
	if err != nil {
		log.Println("Error create temp file", err)
		return "Error create temp file", err
	}
	defer os.Remove(tmpfile.Name())
	log.Printf("tmpfilename: %s", tmpfile.Name())

	// S3からソースをダウンロード
	downloader := s3manager.NewDownloader(sess, func(d *s3manager.Downloader) {
		d.PartSize = 64 * 1024 * 1024
		d.Concurrency = 2
	})
	n, err := downloader.Download(
		tmpfile,
		&s3.GetObjectInput{
			Bucket: aws.String(e.BucketName),
			Key:    aws.String(e.ObjectKey),
		})
	if err != nil {
		log.Println("Error download file", err)
		return "Error download file", err
	}
	log.Printf("Downloaded: %d bytes", n)
	// ソースコードを解凍
	unzipError := unzip(tmpfile.Name(), "/tmp")
	if unzipError != nil {
		log.Println("Error Unzip", unzipError)
		return "Error Unzip", unzipError
	}

	// sql-migrateをライブラリとしてロードして実行
	migrations := migrate.FileMigrationSource(migrate.FileMigrationSource{
		Dir: "/tmp/migrations",
	})
	dataSource := fmt.Sprintf("%s:%s@tcp(%s:3306)/%s?charset=utf8mb4",
		os.Getenv("DB_USER"), os.Getenv("DB_PASSWORD"), os.Getenv("DB_HOST"), os.Getenv("DB_NAME"))
	db, err := sql.Open("mysql", dataSource)
	if err != nil {
		log.Println("Error connect database", err)
		return "Error connect database", err
	}
	result, err := migrate.Exec(db, "mysql", migrations, migrate.Up)
	if err != nil {
		log.Println("Error migration exec", err)
		return "Error migration exec", err
	}
	log.Printf("Applied %d Migrations!", result)

	return "Success", nil
}

func main() {
	lambda.Start(handle)
}

Lambdaのビルド(参考)

今回はLambdaをGoで書いたため、事前にビルドしてZipに固めたものをCDKからアップロードする

zipを作るスクリプトは事前に準備しておくと便利

参考ビルドスクリプト
build.sh
#!/bin/bash
GOOS=linux go build -x -ldflags '-d -s -w' -a -tags netgo -installsuffix netgo -o ../bin/main main.go
zip -j ../bin/main.zip ../bin/main

まとめ

構成としてはやや複雑になりましたが、VPC内のprivate subnetにいるDBに対してCI/CDでマイグレーション実行が出来るようになりました。

参考リンク

8
9
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
8
9

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?