概要
インスタンスを立ち上げてテストしたあと、よく停止し忘れるのでlambdaでEC2/RDS/ElastiCacheの状態を確認し、不要なインスタンスが起動中であればslackで通知する処理を作る
(強制停止だと不味い場合があるので通知だけ)
フロー
CloudWatchEventのスケジュール -> lambda実行
前準備
lambda実行用に以下のポリシーがついたロールを用意
・AmazonEC2ReadOnlyAccess
・AmazonElastiCacheReadOnlyAccess
・AmazonRDSReadOnlyAccess
コード
import boto3
import json
import urllib.request
ALLWAYS_RUNNING_EC2_INSTANCE_ID_LIST = [
'i-hogehoge', # hogehoge
]
AS_INSTANCE_NAME_LIST = [
'auto-scale-instance-name',
]
ALLWAYS_AVAILABLE_RDS_INSTANCE_NAME_LIST = [
'hogehoge',
]
ALLWAYS_AVAILABLE_CACHE_CLUSTER_ID_LIST = [
'hogehoge',
]
def get_needless_rds_instance_list(region):
client = boto3.client('rds', region)
tmp = client.describe_db_instances().get('DBInstances')
ret = []
for instance in tmp:
if instance['DBInstanceStatus'] != 'available':
continue
if instance['DBClusterIdentifier'] in ALLWAYS_AVAILABLE_RDS_INSTANCE_NAME_LIST:
continue
ret.append(instance['DBClusterIdentifier'])
return ret
def get_needless_cache_instance_list(region):
client = boto3.client('elasticache', region)
tmp = client.describe_cache_clusters().get('CacheClusters')
ret = []
for cluster in tmp:
if cluster['CacheClusterStatus'] != 'available':
continue
if cluster['CacheClusterId'] in ALLWAYS_AVAILABLE_CACHE_CLUSTER_ID_LIST:
continue
ret.append(cluster['CacheClusterId'])
return ret
def get_needless_ec2_instance_list(region):
client = boto3.client('ec2', region)
tmp = client.describe_instances()
ret = []
for reservation in tmp['Reservations']:
for instance in reservation['Instances']:
if instance['State']['Name'] != 'running':
continue
if instance['InstanceId'] in ALLWAYS_RUNNING_EC2_INSTANCE_ID_LIST:
continue
if 'Tags' not in instance:
ret.append('名前なし : ' + instance['InstanceId'])
continue
for tag in instance['Tags']:
if tag['Key'] == 'Name' and tag['Value'] not in AS_INSTANCE_NAME_LIST:
ret.append(tag['Value'] + ' : ' + instance['InstanceId'])
return ret
def post_slack(message):
send_data = {
"text": message,
}
send_text = json.dumps(send_data)
request = urllib.request.Request(
'https://hooks.slack.com/HOGEHOGE',
data = send_text.encode('utf-8'),
method = "POST"
)
with urllib.request.urlopen(request) as response:
response_body = response.read().decode('utf-8')
def lambda_handler(event, context):
message = ''
try:
needless_ec2_instance_list = get_needless_ec2_instance_list('ap-northeast-1')
needless_rds_instance_list = get_needless_rds_instance_list('ap-northeast-1')
needless_cache_instance_list = get_needless_cache_instance_list('ap-northeast-1')
if needless_ec2_instance_list:
message += '下記EC2インスタンスが起動中です\n' + '\n'.join(needless_ec2_instance_list) + '\n'
if needless_rds_instance_list:
message += '下記RDSインスタンスが起動中です\n' + '\n'.join(needless_rds_instance_list) + '\n'
if needless_cache_instance_list:
message += '下記redisクラスターが起動中です\n' + '\n'.join(needless_cache_instance_list) + '\n'
except Exception as e:
message = '\n'.join(e.args)
if message:
post_slack(message)
改善点
・オートスケールインスタンスはインスタンスIDが一定ではないので名前で弾いていますが、もっと良い方法を探したいところ
-> タグでやるのはありかも