LoginSignup
1
1

More than 1 year has passed since last update.

Lambda (Python) で EBS ボリュームのスナップショットを削除してみる

Posted at

■ はじめに

963f9ca9395c096c234f891ad8877b28.png

ずいぶん昔に作成したものですが…

以下の処理に伴って、イメージを削除したものの、元の EC2 インスタンスの設定によっては、スナップショットが残ってしまうことがあるのでこちらで対応します。

■ プログラムの流れ

  1. EC2 インスタンスのイメージの一覧情報を取得
  2. EBS ボリュームのスナップショットの一覧情報を取得
  3. 上記二つの一覧情報から、削除されたイメージにも関わらず、残っているスナップショットを抽出
  4. 抽出したスナップショットを削除

■ コード

コードは、以下。

MaintenanceEC2_Snapshot.py
#!/usr/bin/env python
# -*- coding: utf-8-unix; -*-
"""AWS Lambda Function - Maintenance EC2 Snapshots
"""
from __future__ import print_function

import logging
import boto3
import os

# 「Python 3 で少しだけ便利になった datetime の新機能 - Qiita」
# <https://qiita.com/methane/items/d7ac3c9af5a2c659bc51>
from datetime import datetime, timezone, timedelta
TimeZone = timezone(timedelta(hours=+9), 'JST')

# 「Lambdaの本番業務利用を考える① - ログ出力とエラーハンドリング | ナレコムAWSレシピ」
# <https://recipe.kc-cloud.jp/archives/9968>
logger = logging.getLogger()
logLevelTable={'DEBUG':logging.DEBUG,'INFO':logging.INFO,'WARNING':logging.WARNING,'ERROR':logging.ERROR,'CRITICAL':logging.CRITICAL}
if os.environ.get('logging_level') in logLevelTable :
    logger.setLevel(logLevelTable[os.environ['logging_level']])
else:
    logger.setLevel(logging.WARNING)

Snapshot_Description_Head = "Created by CreateImage("
logger.info("Snapshot_Description_Head:[%s]", Snapshot_Description_Head)
Snapshot_Description_Head_1 = ") for ami-"
logger.info("Snapshot_Description_Head_1:[%s]", Snapshot_Description_Head_1)
Snapshot_Description_Head_2 = " from vol-"
logger.info("Snapshot_Description_Head_2:[%s]", Snapshot_Description_Head_2)
Snapshot_Description_Head_3 = ") for "
logger.info("Snapshot_Description_Head_3:[%s]", Snapshot_Description_Head_3)

#
StartDateTime = datetime.now(TimeZone)

def AmazonEC2_DescribeDeleteSnapshots(response_images, response_snapshots):
    try:
        delete_snapshots = []
        image_ids = {}
        for n, k in enumerate(response_images["Images"]):
            id = response_images["Images"][n].get("ImageId")
            name = response_images["Images"][n].get("Name")
            description = response_images["Images"][n].get("Description")
            state = response_images["Images"][n].get("State")
            creationdate = response_images["Images"][n].get("CreationDate")
            image_ids[id] = {
                "ImageId" : id,
                "Name" : name,
                "Description" : description,
                "State" : state,
                "CreationDate" : creationdate,
            }
        for n, k in enumerate(response_snapshots["Snapshots"]):
            id = response_snapshots["Snapshots"][n].get("SnapshotId")
            name = response_snapshots["Snapshots"][n].get("Name")
            description = k.get("Description")
            state = response_snapshots["Snapshots"][n].get("State")
            progress = response_snapshots["Snapshots"][n].get("Progress")
            starttime = response_snapshots["Snapshots"][n].get("StartTime")
            ownerid = response_snapshots["Snapshots"][n].get("OwnerId")
            volumeid = response_snapshots["Snapshots"][n].get("VolumeId")
            volumesize = response_snapshots["Snapshots"][n].get("VolumeSize")
            instanceid = ""
            imageid = ""
            index = description.find(Snapshot_Description_Head)
            if index >= 0:
                index2 = description.find(Snapshot_Description_Head_1)
                if index2 >= 0:
                    instanceid = description[(index+len(Snapshot_Description_Head)):index2]
                    index3 = description.find(Snapshot_Description_Head_2)
                    if index3 >= 0:
                        imageid = description[(index2+len(Snapshot_Description_Head_3)):index3]
            logger.debug(("(%04d/%04d) EBS Snapshot | [  ] "
                "ID:[%s] Name:[%s] Description:[%s] State:[%s] Progress:[%s] StartTime:[%s] OwnerId:[%s] VolumeId:[%s] VolumeSize:[%s] InstanceId:[%s] ImageId:[%s]"),
                n + 1, len(response_snapshots["Snapshots"]),
                id, name, description, state, progress, starttime, ownerid, volumeid, volumesize, instanceid, imageid)
            if not instanceid:
                continue
            if not imageid:
                continue
            if not state == "completed":
                continue
            if imageid in image_ids:
                continue
            delete_snapshots.append({
                "SnapshotId" : id,
                "Name" : name,
                "Description" : description,
                "State" : state,
                "Progress" : progress,
                "StartTime" : starttime,
                "OwnerId" : ownerid,
                "VolumeId" : volumeid,
                "VolumeSize" : volumesize,
                "InstanceId" : instanceid,
                "ImageId" : imageid,
            })
    except Exception as e:
        logger.exception('Error dosomething: %s', e)
    else:
        pass
    finally:
        pass
    return delete_snapshots

def lambda_handler(event, context):
    try:
        global StartDateTime
        StartDateTime = datetime.now(TimeZone)
        logger.info("StartDateTime:[%s]", StartDateTime)
        #
        logger.info("event:[%s]", event)
        logger.info("context:[%s]", context)
        #
        DryRun = event.get("DryRun")
        if not DryRun:
            DryRun = False
        logger.info("DryRun:[%s]", DryRun)
        if not isinstance(DryRun, bool):
            return "[ERROR] Parameter DryRun Format is Bool"
        #
        ec2_client = boto3.client("ec2")
        response_snapshots = ec2_client.describe_snapshots(OwnerIds = ['self'])
        response_images = ec2_client.describe_images(Owners=['self'])
        # Amazon EBS - Delete Snapshots
        delete_snapshots = AmazonEC2_DescribeDeleteSnapshots(response_images, response_snapshots)
        for n, k in enumerate(delete_snapshots):
            logger.info(("(%04d/%04d) Delete Snapshot | [Do] "
                "ID:[%s] Name:[%s] Description:[%s] State:[%s] Progress:[%s] StartTime:[%s] OwnerId:[%s] VolumeId:[%s] VolumeSize:[%s] InstanceId:[%s] ImageId:[%s]"),
                n + 1, len(delete_snapshots),
                k["SnapshotId"], k["Name"], k["Description"], k["State"], k["Progress"], k["StartTime"], k["OwnerId"], k["VolumeId"], k["VolumeSize"], k["InstanceId"], k["ImageId"])
            if DryRun:
                logger.info("Delete Snapshot - Dry Run. (DryRun is %s)", DryRun)
                continue
            response_delete_snapshot = ec2_client.delete_snapshot(
                SnapshotId=k["SnapshotId"])
            logger.info("Delete Snapshot Response | [%s] | %s", k["SnapshotId"], response_delete_snapshot)
    except Exception as e:
        logger.exception('Error dosomething: %s', e)
    else:
        pass
    finally:
        pass
    #
    return "normal end"

■ Lambda への引数

  • DryRun
    • ドライランの設定。これが、false になっていないと、削除しない。
{
    "DryRun": false
}

■ Lambda に設定する環境変数

  • logging_level : [ログレベル]

■ ハマりポイント

  • 昔のことなので、覚えていません…

🤔🤔🤔

■ まとめ

参考になれば ♪♪♪

👋👋👋

1
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
1