0
0

More than 3 years have passed since last update.

【lambda】インスタンスの状態を取得してlambdaでslackへ通知する

Last updated at Posted at 2020-08-11

概要

インスタンスを立ち上げてテストしたあと、よく停止し忘れるのでlambdaでEC2/RDS/ElastiCacheの状態を確認し、不要なインスタンスが起動中であればslackで通知する処理を作る
(強制停止だと不味い場合があるので通知だけ)

フロー

CloudWatchEventのスケジュール -> lambda実行

前準備

lambda実行用に以下のポリシーがついたロールを用意
・AmazonEC2ReadOnlyAccess
・AmazonElastiCacheReadOnlyAccess
・AmazonRDSReadOnlyAccess

コード

import boto3
import json
import urllib.request

ALLWAYS_RUNNING_EC2_INSTANCE_ID_LIST = [
    'i-hogehoge', # hogehoge
]
AS_INSTANCE_NAME_LIST = [
    'auto-scale-instance-name',
]
ALLWAYS_AVAILABLE_RDS_INSTANCE_NAME_LIST = [
    'hogehoge',
]
ALLWAYS_AVAILABLE_CACHE_CLUSTER_ID_LIST = [
    'hogehoge',
]

def get_needless_rds_instance_list(region):
    client = boto3.client('rds', region)
    tmp = client.describe_db_instances().get('DBInstances')
    ret = []
    for instance in tmp:
        if instance['DBInstanceStatus'] != 'available':
             continue

        if instance['DBClusterIdentifier'] in ALLWAYS_AVAILABLE_RDS_INSTANCE_NAME_LIST:
            continue

        ret.append(instance['DBClusterIdentifier'])

    return ret

def get_needless_cache_instance_list(region):
    client = boto3.client('elasticache', region)
    tmp = client.describe_cache_clusters().get('CacheClusters')
    ret = []
    for cluster in tmp:
        if cluster['CacheClusterStatus'] != 'available':
             continue

        if cluster['CacheClusterId'] in ALLWAYS_AVAILABLE_CACHE_CLUSTER_ID_LIST:
            continue

        ret.append(cluster['CacheClusterId'])

    return ret

def get_needless_ec2_instance_list(region):
    client = boto3.client('ec2', region)
    tmp = client.describe_instances()
    ret = []
    for reservation in tmp['Reservations']:
        for instance in reservation['Instances']:
            if instance['State']['Name'] != 'running':
                continue

            if instance['InstanceId'] in ALLWAYS_RUNNING_EC2_INSTANCE_ID_LIST:
                continue

            if 'Tags' not in instance:
                ret.append('名前なし : ' + instance['InstanceId'])
                continue

            for tag in instance['Tags']:
                if tag['Key'] == 'Name' and tag['Value'] not in AS_INSTANCE_NAME_LIST:
                    ret.append(tag['Value'] + ' : ' + instance['InstanceId'])
    return ret

def post_slack(message):
    send_data = {
        "text": message,
    }
    send_text = json.dumps(send_data)
    request = urllib.request.Request(
        'https://hooks.slack.com/HOGEHOGE',
        data = send_text.encode('utf-8'), 
        method = "POST"
    )
    with urllib.request.urlopen(request) as response:
        response_body = response.read().decode('utf-8')

def lambda_handler(event, context):
    message = ''
    try:
        needless_ec2_instance_list = get_needless_ec2_instance_list('ap-northeast-1')
        needless_rds_instance_list = get_needless_rds_instance_list('ap-northeast-1')
        needless_cache_instance_list = get_needless_cache_instance_list('ap-northeast-1')

        if needless_ec2_instance_list:
            message += '下記EC2インスタンスが起動中です\n' + '\n'.join(needless_ec2_instance_list) + '\n'
        if needless_rds_instance_list:
            message += '下記RDSインスタンスが起動中です\n' + '\n'.join(needless_rds_instance_list) + '\n'
        if needless_cache_instance_list:
            message += '下記redisクラスターが起動中です\n' + '\n'.join(needless_cache_instance_list) + '\n'
    except Exception as e:
        message = '\n'.join(e.args)

    if message:
        post_slack(message)

改善点

・オートスケールインスタンスはインスタンスIDが一定ではないので名前で弾いていますが、もっと良い方法を探したいところ
 -> タグでやるのはありかも

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0