概要
表題のバッチをGoで作りました。経緯としては今担当しているサービスが特定の曜日だけ負荷が非常に高まるというサービス特性がありました。Readerが重いだけならスケーリングを時限操作するだけで良いのですが、書き込みが捌けずWriterのスペックを上げる必要があります。Writerのスペックを上げるにはダウンタイムが発生するので人の少ない深夜帯に対応を行う必要があり、毎週深夜対応を行うのは現実的ではなく自動化する運びとなりました。RDSはAuroraのMySQLを使っていて、Writer1台,Reader2台の構成になっています。
処理の流れ
まずWriterのスペックを上げる流れとしてはReaderをスペックアップし、フェイルオーバー、その後もともとWriter立ったものをスペックアップするという流れになります。本質的にはこれだけなのですが、フェイルオーバーする際にサービスダウンが発生し、それをCloudWatchAlertで拾ってしまうのを避けたく、Alertのアクションを止めるという対応も入れます。
実装方針
DBのスペックアップもフェイルオーバーもAWS SDK for Goでよしなになってくれるので非常に楽です。ただ、実行してはするものの命令はするだけで処理が完了するかまでは追えないので、そこはポーリングしつつ正しく設定がされたら次のステップに移るという対応を入れています。AWS SDK for Goを利用した部分は他にも再利用するためライブラリ化し、メインのコントロール処理でそれらのライブラリを利用します。
メインのコントロール処理
import (
"fmt"
"time"
"github.com/hogehogeorg/hogehogeprojecct/library/aws/cloudwatch"
"github.com/hogehogeorg/hogehogeprojecct/library/aws/rds"
"github.com/hogehogeorg/hogehogeprojecct/library/env"
"github.com/hogehogeorg/hogehogeprojecct/library/log"
"go.uber.org/zap"
)
// ControlDbScale DBのスケール調整
func ControlDbScale(mode string) error {
// 設定とクラスターのインスタンス情報を取得
clusterName := env.GetAWSRDSClusterName()
writerInstance, readerInstances, err := rds.GetSpecUpInstance(clusterName)
if err != nil {
return err
}
instanceClass := env.GetAWSRDSUpClass()
if mode == "down" {
instanceClass = env.GetAWSRDSDownClass()
}
alarmPrefix := env.GetAWSCloudWatchAlarmPrefix()
stopActionArn := env.GetAWSCloudWatchStopActionArn()
// 状態チェック
log.Logger.Info("", zap.String("cluster", clusterName))
log.Logger.Info("", zap.String("writerInstance name", writerInstance.Name))
log.Logger.Info("", zap.String("writerInstance class", writerInstance.Class))
log.Logger.Info("", zap.String("writerInstance status", writerInstance.Status))
log.Logger.Info("", zap.String("readerInstance_0 name", readerInstances[0].Name))
log.Logger.Info("", zap.String("readerInstance_0 class", readerInstances[0].Class))
log.Logger.Info("", zap.String("readerInstance_0 status", readerInstances[0].Status))
log.Logger.Info("", zap.String("readerInstance_1 name", readerInstances[1].Name))
log.Logger.Info("", zap.String("readerInstance_1 class", readerInstances[1].Class))
log.Logger.Info("", zap.String("readerInstance_1 status", readerInstances[1].Status))
log.Logger.Info("", zap.String("alarmPrefix ", alarmPrefix))
if !writerInstance.IsAvailable() {
return fmt.Errorf("writer is not available")
}
if !readerInstances[0].IsAvailable() {
return fmt.Errorf("reader_0 is not available")
}
if !readerInstances[1].IsAvailable() {
return fmt.Errorf("reader_1 is not available")
}
// readerを全てスペック変更
if readerInstances[0].IsModifyInstanceClassFinished(instanceClass) {
log.Logger.Info("reader_0 modify already finished")
} else {
if err := rds.ModifyInstanceClass(readerInstances[0].Name, instanceClass); err != nil {
return err
}
}
if readerInstances[1].IsModifyInstanceClassFinished(instanceClass) {
log.Logger.Info("reader_1 modify already finished")
} else {
if err := rds.ModifyInstanceClass(readerInstances[1].Name, instanceClass); err != nil {
return err
}
}
// writerが既にスペック変更されているかの確認
if writerInstance.IsModifyInstanceClassFinished(instanceClass) {
log.Logger.Info("writer modify already finished")
return nil
}
// 監視を止める
if err := cloudwatch.DisableAlarmAction(alarmPrefix, stopActionArn); err != nil {
return err
}
log.Logger.Info("stop alarm")
// 中断された時も監視を再開する
defer func() {
if err := cloudwatch.EnableAlarmAction(alarmPrefix, stopActionArn); err != nil {
log.Logger.Error("監視再開失敗", zap.Error(err))
}
log.Logger.Info("restart alarm")
}()
// フェイルオーバ
if err := rds.DoFailover(clusterName); err != nil {
return err
}
log.Logger.Info("failover success")
// フェイルオーバ後多少安定してから監視再開、およびスペックアップ実行
time.Sleep(time.Second * 300)
if err := cloudwatch.EnableAlarmAction(alarmPrefix, stopActionArn); err != nil {
log.Logger.Error("監視再開失敗", zap.Error(err))
}
log.Logger.Info("restart alarm")
// writer(フェイルオーバしているので実行時点ではreader)をスペック変更
if err := rds.ModifyInstanceClass(writerInstance.Name, instanceClass); err != nil {
return err
}
return nil
}
読んでいただければ大体の流れがわかるかと思います。フェイルセーフにするため細かな処理を入れたり、ログをはいたりいますが、処理の流れにそったコードになっているかと思います。ロガーはlog用のライブライブラリを用意し、内部ではzapを使ってます。監視については処理的に途中でエラーになっても監視を再開するためdeferを使ってもう一回呼んでいます。害はないのですが二回監視再会を行うことになるので別関数に分けたほうが綺麗かもしれません。監視を止める対象については止めたいActionとAlarmの名前を前方一致させて絞り込んだものを止めるようにしています。
RDSを処理するライブラリ
RDS周りでできること、関数の利用方法は以下に全てまとめられているので、これを見ながら対応をしていけば大丈夫です。
https://docs.aws.amazon.com/sdk-for-go/api/service/rds/
接続処理、タイムアウト処理の設定
接続処理はAWSのセッションを作り、RDSのセッションを作るだけです。設定としてポーリングのチェック間隔とタイムアウトする時間を持っています。
package rds
import (
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/rds"
)
func connect() *rds.RDS {
sess := session.Must(session.NewSessionWithOptions(session.Options{
Config: aws.Config{
Region: aws.String("ap-northeast-1"),
},
SharedConfigState: session.SharedConfigEnable,
}))
return rds.New(sess)
}
package rds
// checkIntervalSecond チェック間隔秒数
const checkIntervalSecond = 30
// timeOutSecond タイムアウトにする秒数
const timeOutSecond = 30 * 60
インスタンス用関数
SDKから返されるインスタンスの情報は非常に多くの情報を含んでいるため、シンプルに必要な情報だけを利用すべくInstanceという構造体を作ってまとめています。スペックアップは処理実行後、正常な状態になったかを定期的にポーリングしてチェックするという方法で行ってます。
package rds
import (
"fmt"
"time"
"github.com/hogehogeorg/hogehogeprojecct/library/log"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/rds"
)
// Instance RDSインスタンス
type Instance struct {
Name string
Class string
Status string
}
// NewInstance 生成
func NewInstance(name, class, status string) *Instance {
return &Instance{
Name: name,
Class: class,
Status: status,
}
}
// IsAvailable Availableになっているか?
func (i *Instance) IsAvailable() bool {
return i.Status == "available"
}
// IsModifyInstanceClassFinished インスタンスのクラス変更が完了しているか
func (i *Instance) IsModifyInstanceClassFinished(instanceClass string) bool {
return i.IsAvailable() && i.Class == instanceClass
}
// ModifyInstanceClass インスタンスのクラス変更
func ModifyInstanceClass(instanceName string, instanceClass string) error {
svr := connect()
_, err := svr.ModifyDBInstance(&rds.ModifyDBInstanceInput{
ApplyImmediately: aws.Bool(true),
DBInstanceClass: aws.String(instanceClass),
DBInstanceIdentifier: aws.String(instanceName),
})
if err != nil {
return err
}
roopBraker := timeOutSecond / checkIntervalSecond
for i := 0; i < roopBraker; i++ {
time.Sleep(time.Second * time.Duration(checkIntervalSecond))
instance, err := GetInstance(instanceName)
if err != nil {
return err
}
if instance.IsModifyInstanceClassFinished(instanceClass) {
return nil
}
sec := checkIntervalSecond * i
log.Logger.Info(fmt.Sprintf("%v changing spec to %v [%vm%vs]", instanceName, instanceClass, sec/60, sec%60))
}
return fmt.Errorf("spec change failed")
}
// GetInstance インスタンス取得
func GetInstance(instanceName string) (*Instance, error) {
svr := connect()
result, err := svr.DescribeDBInstances(&rds.DescribeDBInstancesInput{
DBInstanceIdentifier: aws.String(instanceName),
})
if err != nil {
return nil, err
}
if len(result.DBInstances) != 1 {
return nil, fmt.Errorf("invalid rds instance")
}
return NewInstance(
instanceName,
*result.DBInstances[0].DBInstanceClass,
*result.DBInstances[0].DBInstanceStatus,
), nil
}
クラスタ用関数
フェイルオーバもインスタンスのスペックアップと同様にポーリングしてチェックします。フェイルオーバはスペックアップと比較してすぐに(1,2分ほど)終わるかと思いますが、確実に変更されたのを確認してから次のステップに進めることができます。
package rds
import (
"fmt"
"strings"
"time"
"github.com/hogehogeorg/hogehogeprojecct/library/log"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/rds"
)
// GetSpecUpInstance スペックアップを行うインスタンスを取得
func GetSpecUpInstance(clusterName string) (*Instance, []*Instance, error) {
svr := connect()
result, err := svr.DescribeDBClusters(&rds.DescribeDBClustersInput{
DBClusterIdentifier: aws.String(clusterName),
})
if err != nil {
return nil, nil, err
}
if len(result.DBClusters) != 1 {
return nil, nil, fmt.Errorf("invalid rds cluster")
}
var writer *Instance
var reader []*Instance
for _, v := range result.DBClusters[0].DBClusterMembers {
if *v.IsClusterWriter {
writer, err = GetInstance(*v.DBInstanceIdentifier)
if err != nil {
return nil, nil, err
}
} else {
instance, err := GetInstance(*v.DBInstanceIdentifier)
if err != nil {
return nil, nil, err
}
reader = append(reader, instance)
}
}
return writer, reader, nil
}
// DoFailover フェイルオーバ実行
func DoFailover(clusterName string) error {
svr := connect()
_, err := svr.FailoverDBCluster(&rds.FailoverDBClusterInput{
DBClusterIdentifier: aws.String(clusterName),
})
if err != nil {
return err
}
roopBraker := timeOutSecond / checkIntervalSecond
for i := 0; i < roopBraker; i++ {
time.Sleep(time.Second * time.Duration(checkIntervalSecond))
svr := connect()
result, err := svr.DescribeDBClusters(&rds.DescribeDBClustersInput{
DBClusterIdentifier: aws.String(clusterName),
})
if err != nil {
return err
}
if len(result.DBClusters) != 1 {
return fmt.Errorf("invalid rds cluster")
}
if *result.DBClusters[0].Status == "available" {
return nil
}
sec := checkIntervalSecond * i
log.Logger.Info(fmt.Sprintf("%v failover [%vm%vs]", clusterName, sec/60, sec%60))
}
return fmt.Errorf("fail failver")
}
CloudWatchを処理するライブラリ
RDSに比べてシンプルです。止めたいアラートを探して、止めたいアクションなら止めるという事をやっています。
処理の詳細はここで確認ください。
https://docs.aws.amazon.com/sdk-for-go/api/service/cloudwatch/
package cloudwatch
import (
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/cloudwatch"
)
func connect() *cloudwatch.CloudWatch {
sess := session.Must(session.NewSessionWithOptions(session.Options{
Config: aws.Config{
Region: aws.String("ap-northeast-1"),
},
SharedConfigState: session.SharedConfigEnable,
}))
return cloudwatch.New(sess)
}
package cloudwatch
import (
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/cloudwatch"
)
// EnableAlarmAction アラームアクションを開始
func EnableAlarmAction(alarmPrefix, stopAction string) error {
svc := connect()
result, err := svc.DescribeAlarms(&cloudwatch.DescribeAlarmsInput{
AlarmNamePrefix: aws.String(alarmPrefix),
})
if err != nil {
return err
}
alerms := cloudwatch.EnableAlarmActionsInput{}
for _, v := range result.MetricAlarms {
if len(v.AlarmActions) == 0 || *v.AlarmActions[0] != stopAction {
continue
}
alerms.AlarmNames = append(alerms.AlarmNames, v.AlarmName)
}
_, err = svc.EnableAlarmActions(&alerms)
if err != nil {
return err
}
return nil
}
// DisableAlarmAction アラームアクションを停止
func DisableAlarmAction(alarmPrefix, stopAction string) error {
svc := connect()
result, err := svc.DescribeAlarms(&cloudwatch.DescribeAlarmsInput{
AlarmNamePrefix: aws.String(alarmPrefix),
})
if err != nil {
return err
}
alerms := cloudwatch.DisableAlarmActionsInput{}
for _, v := range result.MetricAlarms {
if len(v.AlarmActions) == 0 || *v.AlarmActions[0] != stopAction {
continue
}
alerms.AlarmNames = append(alerms.AlarmNames, v.AlarmName)
}
_, err = svc.DisableAlarmActions(&alerms)
if err != nil {
return err
}
return nil
}
まとめ
そこそこのコード量になってしまいましたが、これでやりたいことができたかと思います。今回は各変更処理を愚直にポーリングして一つずつ進めていくという方法にしましたが、Goであれば並行処理をしつつ、時間がかかる処理を同時に走らせるみたいな書き方もできるかと思います。是非同様のことをやるケースがあればアレンジして活用していただければと思います。