ECSのインスタンスを入れ替える際にやることは
- 新しいAMIを作成する
- launch configurationを新しいAMIで入れ替える
- desired count増やして新しいインスタンスがecs clusterに入るまで待つ
- 入れ替え対象をdrainingしてタスクが0になるまで待つ
- desired countを戻してterminate
を繰り返すを手動でやってました。そんなに頻繁に入れ替えないから後回しにしてましたが、時間ができたのでcloudformationを使って自動化してみました。
参考にしたのは下記awsブログの内容です。
前提
- terraform
- cloudformation
- lambda
- sns
- ecs
- ec2 autoscaling
の知識はあるものとして扱います。
概要
Auto Scaling Lifecycle Hooksを利用して、起動前にECS Clusterに参加していること、終了時にはdrainingしてインスタンスにECSタスクがなくなっていることを保証をすることがほぼ全てといえます。
ここから引用しますが
scale out
/scale in
したときにhookを挟むことができ、complete lifecycle action
を投げつけるまでInService
/Terminated
になるまで待たせることができます。
hookは作成時に通知先sns
/sqs
/cloudwatch event
のいずれかを設定できます。今回はsns
経由でlambda
を呼び出してやり、lambda
内で処理後にcomplete lifecycle action
を叩くことで自動でのECSインスタンス入れ替えを実現します。
インスタンスの入れ替えにはcloudformationを使います。現状の環境はterraformで構成しているのですが、autoscalingで一台づつ入れ替えるという動作をterraformは行わないのでcloudformationです。
しかしできる限りterraformにもたせたいので、デプロイできないlambdaを境界に分離しました。
その上でcloudformationの記述もterraformに含めてやります。
具体的に
lambda
awsブログのサンプルとの違い
- デプロイがS3にあるファイルを置き換えになっているので、デプロイパッケージを作って
terraform apply
でデプロイできるように https://github.com/aws/chalice をつかってzipを作れるように - terminatingのみ対応だったのでlaunchingに反応するように
- retry制限いれた
- ECSクラスタの検出にuser_dataを使っていたのでインスタンスのタグから取得するようにした(好みの問題
- slackに起動/終了/エラーを通知
chalice package [path]
とすることでpathに deployments.zip
としてlambdaにデプロイできるzipが出来上がってくれます。
sam.jsonも作ってくれますけどapi gateway使わないからいらないです。依存関係含めてパッケージ作ってくれればなんでも構いません。
環境変数はterraformに入れてあげてます。
chalice
boto3
requests
# -*- coding: utf-8 -*-
import logging
import json
import boto3
import requests
import os
import enum
import time
from chalice import Chalice
app = Chalice(app_name='lifecycle-hook-ecs-instance')
logging.basicConfig()
logging.getLogger("botocore").setLevel(logging.WARNING)
logging.getLogger('boto3').setLevel(logging.WARNING)
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
WAIT_TIMEOUT_COUNT = 90
WAIT_TIME_SEC = 10
@app.lambda_function()
def lambda_function(event: dict, context: dict):
logger.info(f"Lambda received the event {event}")
lifecycle_hook_ecs = LifecycleHookEcs()
return lifecycle_hook_ecs.run(event=event)
class Slack(object):
def __init__(self):
self.slack_incoming_web_hook_url = os.environ["SLACK_INCOMING_WEB_HOOK"]
self.slack_channel = os.environ["SLACK_CHANNEL"]
def launched_instance(self, cluster_name: str, instance_id: str):
data = {
'attachments': [
{
'title': 'Autoscaling Instance Launched',
'fields': [
{'title': 'EcsCluster', 'value': cluster_name, 'short': True},
{'title': 'InstanceId', 'value': instance_id, 'short': True},
]
}
]
}
self.__notify_slack(data=data)
def terminated_instance(self, cluster_name: str, instance_id: str):
data = {
'attachments': [
{
'title': 'Autoscaling Instance Terminated',
'fields': [
{'title': 'EcsCluster', 'value': cluster_name, 'short': True},
{'title': 'InstanceId', 'value': instance_id, 'short': True},
]
}
],
}
self.__notify_slack(data=data)
def timeout(self, cluster_name: str, instance_id: str):
data = {
'attachments': [
{
'color': 'danger',
'title': 'Autoscaling Instance wait timeout',
'fields': [
{'title': 'EcsCluster', 'value': cluster_name, 'short': True},
{'title': 'InstanceId', 'value': instance_id, 'short': True},
]
}
],
}
self.__notify_slack(data=data)
def error(self, message):
data = {
'attachments': [
{
'color': 'danger',
'title': 'Autoscaling Instance Error',
'fields': [
{'title': 'Message', 'value': message},
]
}
]
}
if self.slack_channel is not None:
data['channel'] = self.slack_channel
self.__notify_slack(data=data)
def __notify_slack(self, data: dict):
if self.slack_channel is not None:
data['channel'] = self.slack_channel
if self.slack_incoming_web_hook_url is not None:
requests.post(url=self.slack_incoming_web_hook_url, data=json.dumps(data))
class LifecycleTransition(enum.Enum):
TERMINATING = 'autoscaling:EC2_INSTANCE_TERMINATING'
LAUNCHING = 'autoscaling:EC2_INSTANCE_LAUNCHING'
TEST_NOTIFICATION = 'autoscaling:TEST_NOTIFICATION'
class EcsContainerInstanceStatus(enum.Enum):
ACTIVE = 'ACTIVE'
DRAINING = 'DRAINING'
INACTIVE = 'INACTIVE'
NONE = ''
class EcsInstance(object):
def __init__(self, ec2_instance_id: str, asg_group_name: str, sns_arn: str, topic_arn: str,
cluster_name: str, container_instance_arn: str, retry: int,
lifecycle_transition: LifecycleTransition, lifecycle_hook_name: str):
self.ec2_instance_id = ec2_instance_id
self.asg_group_name = asg_group_name
self.sns_arn = sns_arn
self.topic_arn = topic_arn
self.cluster_name = cluster_name
self.container_instance_arn = container_instance_arn
self.retry = retry
self.lifecycle_transition = lifecycle_transition
self.lifecycle_hook_name = lifecycle_hook_name
@staticmethod
def from_event(event: dict):
line = event['Records'][0]['Sns']['Message']
message = json.loads(line)
lifecycle_transition = None
try:
lifecycle_transition_str = message.get('LifecycleTransition')
if lifecycle_transition_str is not None:
lifecycle_transition = LifecycleTransition(lifecycle_transition_str)
except ValueError:
logger.error("There is no such Lifecycle Transition")
return
if lifecycle_transition is None:
logger.error("Lifecycle Transition not found")
return
if lifecycle_transition == LifecycleTransition.TEST_NOTIFICATION:
logger.info("Lifecycle Transition is autoscaling:TEST_NOTIFICATION")
return
lifecycle_hook_name = None
if lifecycle_transition is not None:
lifecycle_hook_name = message.get('LifecycleHookName')
if lifecycle_hook_name is None:
raise Exception(f"lifecycle hook name not found in message.")
ec2_instance_id = message['EC2InstanceId']
asg_group_name = message['AutoScalingGroupName']
sns_arn = event['Records'][0]['EventSubscriptionArn']
topic_arn = event['Records'][0]['Sns']['TopicArn']
container_instance_arn = message.get('ContainerInstanceArn')
cluster_name = message.get('ClusterName')
retry = message.get('retry')
if retry is None:
retry = 0
return EcsInstance(
ec2_instance_id=ec2_instance_id, asg_group_name=asg_group_name, sns_arn=sns_arn, topic_arn=topic_arn,
cluster_name=cluster_name, container_instance_arn=container_instance_arn, retry=retry,
lifecycle_transition=lifecycle_transition, lifecycle_hook_name=lifecycle_hook_name
)
def dump_retry_sns_message(self) -> str:
message = {
'EC2InstanceId': self.ec2_instance_id,
'AutoScalingGroupName': self.asg_group_name,
'LifecycleHookName': self.lifecycle_hook_name,
'ContainerInstanceArn': self.container_instance_arn,
'ClusterName': self.cluster_name,
'retry': self.retry + 1,
'LifecycleTransition': self.lifecycle_transition.value
}
return json.dumps(message)
class LifecycleHookEcs(object):
def __init__(self):
# Establish boto3 session
session = boto3.session.Session()
logger.debug("Session is in region %s ", session.region_name)
self.ec2_client = session.client(service_name='ec2')
self.ecs_client = session.client(service_name='ecs')
self.asg_client = session.client('autoscaling')
self.sns_client = session.client('sns')
self.lambda_client = session.client('lambda')
def __get_container_instance_arn(self, cluster_name: str, ec2_instance_id: str) -> str:
container_instance_arn = None
# Get list of container instance IDs from the clusterName
paginator = self.ecs_client.get_paginator('list_container_instances')
cluster_list_pages = paginator.paginate(cluster=cluster_name)
for cluster_list_response in cluster_list_pages:
describe_container_response = self.ecs_client.describe_container_instances(
cluster=cluster_name,
containerInstances=cluster_list_response.get('containerInstanceArns')
)
logger.debug(f"describe container instances response {describe_container_response}")
for container_instance in describe_container_response['containerInstances']:
logger.debug(f"Container Instance ARN: {container_instance.get('containerInstanceArn')}"
f" and ec2 Instance ID {container_instance.get('ec2InstanceId')}")
if container_instance['ec2InstanceId'] == ec2_instance_id:
logger.info(f"Container instance ID of interest : {container_instance['containerInstanceArn']}")
container_instance_arn = container_instance['containerInstanceArn']
return container_instance_arn
def __get_container_instance_status(self, ecs_instance: EcsInstance) -> EcsContainerInstanceStatus:
container_status = None
if ecs_instance.container_instance_arn is None:
return EcsContainerInstanceStatus.NONE
describe_container_response = self.ecs_client.describe_container_instances(
cluster=ecs_instance.cluster_name, containerInstances=[ecs_instance.container_instance_arn]
)
container_instance = describe_container_response['containerInstances'][0]
logger.info(f"Container instance ID of interest : {container_instance['containerInstanceArn']}")
ecs_instance.container_instance_arn = container_instance['containerInstanceArn']
container_status = EcsContainerInstanceStatus(container_instance['status'])
return container_status
def __get_instance_cluster(self, ec2_instance_id: str) -> str:
cluster_name = None
# Describe instance attributes and get the Clustername from userdata section which would have set ECS_CLUSTER
describe_tags = self.ec2_client.describe_tags(
Filters=[{'Name': 'resource-id', 'Values': [ec2_instance_id]}],
MaxResults=100
)
logger.debug("Describe instance tags %s", describe_tags)
for tag in describe_tags.get('Tags'):
if tag.get('Key') == 'ecsCluster':
cluster_name = tag.get('Value')
logger.debug("Cluster name %s", cluster_name)
if cluster_name is None:
raise Exception('ClusterName Not Found.')
return cluster_name
def __retry(self, ecs_instance: EcsInstance):
logger.info("Publish to SNS topic %s", ecs_instance.topic_arn)
sns_response = self.sns_client.publish(
TopicArn=ecs_instance.topic_arn,
Message=ecs_instance.dump_retry_sns_message(),
Subject='Publishing SNS message to invoke lambda again..'
)
logger.debug(f"sns response: {sns_response}")
def __complete_lifecycle(self, ecs_instance: EcsInstance):
response = self.asg_client.complete_lifecycle_action(
LifecycleHookName=ecs_instance.lifecycle_hook_name,
AutoScalingGroupName=ecs_instance.asg_group_name,
LifecycleActionResult='CONTINUE',
InstanceId=ecs_instance.ec2_instance_id)
logger.info("Response received from complete_lifecycle_action %s", response)
logger.info("Completed lifecycle hook action")
def __is_container_instance_has_tasks(self, ecs_instance: EcsInstance) -> bool:
if ecs_instance.container_instance_arn is not None:
# List tasks on the container instance ID, to get task Arns
list_tasks_response = self.ecs_client.list_tasks(
cluster=ecs_instance.cluster_name, containerInstance=ecs_instance.container_instance_arn)
logger.debug("Container instance task response: %s", list_tasks_response)
task_arns = list_tasks_response['taskArns']
# If the chosen instance has tasks
if len(task_arns) > 0:
logger.info("Tasks are on this instance...%s", ecs_instance.ec2_instance_id)
return True
else:
logger.info("NO tasks are on this instance...%s", ecs_instance.ec2_instance_id)
return False
else:
logger.info("NO tasks are on this instance....%s", ecs_instance.ec2_instance_id)
return False
def __drain_ecs_instance(self, ecs_instance: EcsInstance):
ecs_response = self.ecs_client.update_container_instances_state(
cluster=ecs_instance.cluster_name,
containerInstances=[ecs_instance.container_instance_arn],
status=EcsContainerInstanceStatus.DRAINING.value
)
logger.debug(f"Ecs Container Draining response: {ecs_response}")
def run(self, event: dict):
slack = Slack()
try:
# parse event
ecs_instance = EcsInstance.from_event(event=event)
if ecs_instance is None:
return
if ecs_instance.retry > WAIT_TIMEOUT_COUNT:
logger.error("wait timeout.")
slack.timeout(cluster_name=ecs_instance.cluster_name, instance_id=ecs_instance.ec2_instance_id)
return
# Get cluster_name
if ecs_instance.cluster_name is None:
ecs_instance.cluster_name = self.__get_instance_cluster(ec2_instance_id=ecs_instance.ec2_instance_id)
else:
pass
# Get container_instance_arn
if ecs_instance.container_instance_arn is None:
ecs_instance.container_instance_arn = self.__get_container_instance_arn(
cluster_name=ecs_instance.cluster_name, ec2_instance_id=ecs_instance.ec2_instance_id)
# Get list of container instance arn and return status
container_instance_status = self.__get_container_instance_status(ecs_instance=ecs_instance)
if ecs_instance.lifecycle_transition == LifecycleTransition.TERMINATING:
# If Container status active then draining
if container_instance_status == EcsContainerInstanceStatus.ACTIVE:
self.__drain_ecs_instance(ecs_instance=ecs_instance)
if self.__is_container_instance_has_tasks(ecs_instance=ecs_instance):
time.sleep(WAIT_TIME_SEC)
self.__retry(ecs_instance=ecs_instance)
else:
self.__complete_lifecycle(ecs_instance=ecs_instance)
slack.terminated_instance(
cluster_name=ecs_instance.cluster_name, instance_id=ecs_instance.ec2_instance_id)
elif ecs_instance.lifecycle_transition == LifecycleTransition.LAUNCHING:
if container_instance_status is None or container_instance_status == EcsContainerInstanceStatus.NONE:
time.sleep(WAIT_TIME_SEC)
self.__retry(ecs_instance=ecs_instance)
else:
self.__complete_lifecycle(ecs_instance=ecs_instance)
slack.launched_instance(
cluster_name=ecs_instance.cluster_name, instance_id=ecs_instance.ec2_instance_id)
except Exception as e:
logger.error(str(e), exc_info=True)
slack.error(str(e))
terraformの設定
先ほどのlambda用につくった deployments.zip
をここで参照してデプロイできるようにしています。
また、ecsCluster
タグからどのクラスタにいるかを判断するのでAWS::AutoScaling::AutoScalingGroup
でタグづけをしています。
そして、 AWS::AutoScaling::LifecycleHook
の設定でlaunching
/terminating
の両方について LifecycleHookSNSTopic
を呼び出し、そのAWS::SNS::Topic
の設定としてLambdaを叩くことになります。
AMIの入れ替え時にはautoscalingの設定AutoScalingRollingUpdate
によって、MaxBatchSize
だけインスタンス入れ替えを行っていくことになります。ここは別に1じゃなくてもecs serviceでのminimumHealthyPercent
maximumPercent
がちゃんと指定されていればECSがよしなにしてくれるはずです。
terraform内でのcloudformationについてはtemplate_body
を使うことでterraformの変数をcloudformationに与えてやれます。
その他ELBとかvpcの設定とかfile
で参照しているやつとかは長くなるのでここに記載してないことだけ注意が必要です。
#--------------------------------------------------------------
# Lambda
#--------------------------------------------------------------
resource "aws_lambda_function" "lifecycle-hook-ecs-instance" {
filename = "../lambda/lifecycle-hook-ecs-instance/deployment.zip"
function_name = "lifecycle-hook-ecs-instance"
description = "lifecycle-hook-ecs-instance"
role = "${aws_iam_role.lifecycle-hook-ecs-instance.arn}"
handler = "app.lambda_function"
source_code_hash = "${base64sha256(file("../lambda/lifecycle-hook-ecs-instance/deployment.zip"))}"
runtime = "python3.6"
memory_size = 128
timeout = 300
environment {
variables = {
SLACK_INCOMING_WEB_HOOK = "${var.slack_web_hook}"
SLACK_CHANNEL = "${var.slack_channel}"
}
}
}
#--------------------------------------------------------------
# iam
#--------------------------------------------------------------
resource "aws_iam_policy" "lifecycle-hook-ecs-instance" {
name = "lifecycle-hook-ecs-instance"
description = "lifecycle-hook-ecs-instance"
policy = "${file("./iam/lifecycle-hook-ecs-instance.policy")}"
}
resource "aws_iam_role" "lifecycle-hook-ecs-instance" {
name = "lifecycle-hook-ecs-instance"
assume_role_policy = "${file("./iam/lambda_assume.policy")}"
}
resource "aws_iam_role_policy_attachment" "lifecycle-hook-ecs-instance" {
role = "${aws_iam_role.lifecycle-hook-ecs-instance.name}"
policy_arn = "${aws_iam_policy.lifecycle-hook-ecs-instance.arn}"
}
#--------------------------------------------------------------
# Artifact
#--------------------------------------------------------------
data "aws_ami" "app" {
filter {
name = "state"
values = ["available"]
}
filter {
name = "tag:role"
values = ["app"]
}
most_recent = true
}
resource "aws_ecs_cluster" "app" {
name = "app"
}
#--------------------------------------------------------------
# iam
#--------------------------------------------------------------
resource "aws_iam_role" "app" {
name = "app"
assume_role_policy = "${file("./iam/assume.policy")}"
}
resource "aws_iam_policy" "app" {
name = "app"
path = "/"
description = "for app policy"
policy = "${file("./iam/app_role.policy")}"
}
resource "aws_iam_policy_attachment" "app_role_attachment" {
name = "app_role_attachment"
roles = ["${aws_iam_role.app.name}"]
policy_arn = "${aws_iam_policy.app.arn}"
}
resource "aws_iam_instance_profile" "app" {
name = "app"
role = "${aws_iam_role.app.name}"
}
#--------------------------------------------------------------
# Cloudformation for app cluster
#--------------------------------------------------------------
resource "aws_cloudformation_stack" "ecs_app_cluster" {
name = "ecsAppCluster"
capabilities = ["CAPABILITY_IAM"]
depends_on = ["aws_lambda_function.lifecycle-hook-ecs-instance", "data.aws_ami.app", "aws_security_group.sg1", "aws_security_group.gs2", "aws_key_pair.developer", "aws_subnet.main", "aws_ecs_cluster.app", "aws_elb.app"]
template_body = <<EOF
AWSTemplateFormatVersion: 2010-09-09
Resources:
AppLaunchConfiguration:
Type: 'AWS::AutoScaling::LaunchConfiguration'
Properties:
AssociatePublicIpAddress: true
EbsOptimized: false
IamInstanceProfile: "${aws_iam_instance_profile.app.id}"
ImageId: "${data.aws_ami.app.id}"
InstanceMonitoring: false
InstanceType: "${var.app_instance_type}"
KeyName: "${aws_key_pair.developer.id}"
SecurityGroups:
- "${aws_security_group.sg1.id}"
- "${aws_security_group.sg2.id}"
UserData:
Fn::Base64: !Sub |
#!/bin/bash
echo ECS_CLUSTER=${aws_ecs_cluster.app.name} >> /etc/ecs/ecs.config
AppAutoscalingGroup:
Type: 'AWS::AutoScaling::AutoScalingGroup'
UpdatePolicy:
AutoScalingRollingUpdate:
MaxBatchSize: 1
MinInstancesInService: ${var.app_min_server_count}
MinSuccessfulInstancesPercent: 100
PauseTime: PT0S
Properties:
LaunchConfigurationName: !Ref AppLaunchConfiguration
Cooldown: 300
HealthCheckGracePeriod: 300
HealthCheckType: ELB
LoadBalancerNames:
- "${aws_elb.app.name}"
MaxSize: ${var.app_max_server_count}
MinSize: ${var.app_min_server_count}
Tags:
- Key: Name
Value: dev-app
PropagateAtLaunch: true
- Key: role
Value: app
PropagateAtLaunch: true
- Key: environment
Value: dev
PropagateAtLaunch: true
- Key: ecsCluster
Value: ${aws_ecs_cluster.app.name}
PropagateAtLaunch: true
VPCZoneIdentifier:
- "${aws_subnet.main.0.id}"
- "${aws_subnet.main.1.id}"
LifecycleHookSNSTopic:
Type: "AWS::SNS::Topic"
Properties:
Subscription:
- Endpoint: "${aws_lambda_function.lifecycle-hook-ecs-instance.arn}"
Protocol: "lambda"
LambdaInvokePermission:
Type: "AWS::Lambda::Permission"
Properties:
FunctionName: "${aws_lambda_function.lifecycle-hook-ecs-instance.arn}"
Action: lambda:InvokeFunction
Principal: "sns.amazonaws.com"
SourceArn: !Ref LifecycleHookSNSTopic
LambdaSubscriptionToSNSTopic:
Type: AWS::SNS::Subscription
Properties:
Endpoint: "${aws_lambda_function.lifecycle-hook-ecs-instance.arn}"
Protocol: 'lambda'
TopicArn: !Ref LifecycleHookSNSTopic
SNSLambdaRole:
Type: "AWS::IAM::Role"
Properties:
AssumeRolePolicyDocument:
Version: "2012-10-17"
Statement:
-
Effect: "Allow"
Principal:
Service:
- "autoscaling.amazonaws.com"
Action:
- "sts:AssumeRole"
ManagedPolicyArns:
- arn:aws:iam::aws:policy/service-role/AutoScalingNotificationAccessRole
Path: "/"
AppTerminatingLifecycleHook:
Type: "AWS::AutoScaling::LifecycleHook"
Properties:
AutoScalingGroupName: !Ref AppAutoscalingGroup
DefaultResult: "ABANDON"
HeartbeatTimeout: "900"
LifecycleTransition: "autoscaling:EC2_INSTANCE_TERMINATING"
NotificationTargetARN: !Ref LifecycleHookSNSTopic
RoleARN:
Fn::GetAtt:
- "SNSLambdaRole"
- "Arn"
DependsOn: "LifecycleHookSNSTopic"
AppLauchingLifecycleHook:
Type: "AWS::AutoScaling::LifecycleHook"
Properties:
AutoScalingGroupName: !Ref AppAutoscalingGroup
DefaultResult: "ABANDON"
HeartbeatTimeout: "600"
LifecycleTransition: "autoscaling:EC2_INSTANCE_LAUNCHING"
NotificationTargetARN: !Ref LifecycleHookSNSTopic
RoleARN:
Fn::GetAtt:
- "SNSLambdaRole"
- "Arn"
DependsOn: "LifecycleHookSNSTopic"
EOF
}
その他
インスタンス入れ替え方法
packerでAMIを作り、terraform上のcloudformation定義の変数として入れてるので、packerでAMIを作ってterraform applyすればインスタンスを入れ替えてくれるようになっています。
AMIに与えるtagが統一されてさえいれば、最新のAMIをterraformのdata.aws_ami
として参照できるのでpackerじゃなくても簡単です。
spot fleet
別のECSクラスタではspot fleetも運用しているのですが、すべてバッチしか動かしていないので、terraformでspot-fleetを定義してlifecycleにcrate_before_destroyを入れて作成してから全部終了してしまうだけにしました。
あとがき
最初は自動化するほどのものではないとして、その後はそのうちkubernetesにするからと思って手でやってたんですが、AWS EKSがworker nodeは自分で立てるということで、結局入れ替えは同じようにやらないとと思い自動化してみました。
autoscaling hookは真面目に触ってこなかったので勉強になりました。
最近dockerのイメージサイズが大きくなりがちでクラスタ参加後にイメージとってくると遅くて仕方ないのでどうしようかと思ってましたが、launchingのhookで必要なイメージとってこれたら起動とするなどできるので組み込んでみようかと思ってます。