LoginSignup
0
3

More than 5 years have passed since last update.

slackにawsの料金を通知するpythonスクリプト

Posted at

lamdaで動かすpythonスクリプト。

こんな感じで通知が来る。

image.png

Cost Explorerは料金がめっちゃ高い。1リクエスト1円くらいかかる。開発の為に何回もコールしたらもう2ドル。本末転倒か。
lamdaで毎日チェックするようにして、一定以上の金額増があったら自分にメンションを飛ばす。
S3の全バケットの合計容量とファイル数、料金の合計も通知。

使い方。awsの認証はデフォルト。lamdaでは適切にロールを設定。slackのapiは環境変数SLACK_URLSLACK_MENTION_USER_IDを入れる。

boto3_type_annotationsというモジュールが別途必要なので、lamdaに上げる時はそれも必要。aws lamda python モジュール とかでググると出てきます。type hintingの為に入れているだけなので取り除いてもいい。

直したいところ

  • date型と、datetime型の区別が曖昧で混ざっている箇所がある。
  • 型指定が効いていない。date型を要求している引数にstr型を入れてもIDEがエラー出してくれない。
  • タプルの戻り値で[0]、[1]と指定している箇所があるが、絶対後でわからなくなる。
  • importは言われるがまま追加したのでぐちゃぐちゃ。特に日付関連のimportはもっと綺麗に書けるはず。
  • 値段の単位(ドル)はプロパティなのに、量の単位(回)がメソッドで不一致なので統一したい。プロパティのgetterってあるよね?
  • "Amazon Route 53/A"は単位がreqで、値がintで~という定義がバラバラになっているので一箇所で書けるようにする
  • 昨日までのデータ、今日までのデータの配列から、新しいデータや料金が増えたデータを抽出するリスト内記法?ってやつが長すぎて読みにくい。もっといい書き方があるはず
# -*- coding: utf-8 -*-

import re
import os
import sys
import json
import urllib.request

from datetime import date, timedelta
from typing import List
from datetime import datetime as dt
import boto3
import datetime
from boto3_type_annotations.ce import Client as CEClient
from boto3_type_annotations.cloudwatch import Client as CWClient
from boto3_type_annotations.s3 import Client as S3Client


class CostData(object):
    量の値: float
    価格の単位: str
    価格の値: float
    オペレーション名: str
    サービス名: str
    終了日: date
    開始日: date

    def __init__(self):
        self.開始日 = datetime.date.today()
        self.終了日 = datetime.date.today()
        self.サービス名 = ''
        self.オペレーション名 = ''
        self.価格の値 = 0
        self.価格の単位 = ""
        self.量の値 = 0

    # noinspection SpellCheckingInspection,PyPep8Naming
    def 量の単位(self):
        key = self.サービス名 + "/" + self.オペレーション名
        suketto = {
            "Amazon Route 53/A": "req",
            "Amazon Route 53/AAAA": "req",
            "Amazon Route 53/ANY": "req",
            "Amazon Route 53/CAA": "req",
            "Amazon Route 53/CNAME": "req",
            "Amazon Route 53/HostedZone": "domain",
            "Amazon Route 53/MX": "req",
            "Amazon Route 53/NAPTR": "req",
            "Amazon Route 53/NS": "req",
            "Amazon Route 53/PTR": "req",
            "Amazon Route 53/SOA": "req",
            "Amazon Route 53/SRV": "req",
            "Amazon Route 53/TXT": "req",
            "Amazon Route 53/UNSUPPORTED": "req",
            "Amazon Simple Storage Service/GetObject": "req",
            "Amazon Simple Storage Service/PutObject": "req",
            "Amazon Simple Storage Service/StandardStorage": "GB",
        }
        return suketto.get(key, "")

    # noinspection PyPep8Naming,SpellCheckingInspection
    def 量の単位が整数(self):
        key = self.サービス名 + "/" + self.オペレーション名
        mk = [
            "Amazon Route 53/A",
            "Amazon Route 53/AAAA",
            "Amazon Route 53/ANY",
            "Amazon Route 53/CAA",
            "Amazon Route 53/CNAME",
            "Amazon Route 53/HostedZone",
            "Amazon Route 53/MX",
            "Amazon Route 53/NAPTR",
            "Amazon Route 53/NS",
            "Amazon Route 53/PTR",
            "Amazon Route 53/SOA",
            "Amazon Route 53/SRV",
            "Amazon Route 53/TXT",
            "Amazon Simple Storage Service/PutObject",
        ]
        return key in mk

    def __str__(self):
        価格 = ("{:.2f}".format(self.価格の値) if self.価格の値 != 0.0 else "0") + " " + self.価格の単位
        量の数値 = "{:.2f}".format(self.量の値) if not self.量の単位が整数() else str(int(self.量の値))
         = 量の数値 + " " + self.量の単位()
        return self.開始日.strftime('%Y/%m/%d') + "~" + self.終了日.strftime('%Y/%m/%d') + " " + self.サービス名 + "/" + self.オペレーション名 + " " + 価格 + " " + 


def run(fromDataObj, toDateObj, granularity):
    # fromDataStr = 2019-02-01
    # toDateStr = 2019-03-01
    # granularity = DAILY MONTHLY
    ce: CEClient = boto3.client('ce')
    ceR = ce.get_cost_and_usage(TimePeriod={
        'Start': date.strftime(fromDataObj, "%Y-%m-%d"),
        'End': date.strftime(toDateObj, "%Y-%m-%d")
    }, Granularity=granularity,
        Metrics=["AmortizedCost", "UsageQuantity"],
        GroupBy=[{
            'Type': 'DIMENSION',
            'Key': 'SERVICE'
        }, {
            'Type': 'DIMENSION',
            'Key': 'OPERATION'
        }]
    )
    result: List[CostData] = []
    for oneTimeData in ceR["ResultsByTime"]:
        start_date_str: str = oneTimeData["TimePeriod"]["Start"]
        end_date_str: str = oneTimeData["TimePeriod"]["End"]
        for one_time_item in oneTimeData["Groups"]:
            data = CostData()
            data.開始日 = dt.now().strptime(start_date_str, '%Y-%m-%d')
            data.終了日 = dt.now().strptime(end_date_str, '%Y-%m-%d')
            data.サービス名 = one_time_item["Keys"][0]
            data.オペレーション名 = one_time_item["Keys"][1]
            data.価格の値 = float(one_time_item["Metrics"]["AmortizedCost"]["Amount"])
            data.価格の単位 = one_time_item["Metrics"]["AmortizedCost"]["Unit"]
            data.量の値 = float(one_time_item["Metrics"]["UsageQuantity"]["Amount"])
            data.originalData = one_time_item
            result.append(data)
    return result


def 日付取得():
    today = dt(dt.now().year, dt.now().month, dt.now().day, 0, 0, 0)
    if today.day == 1 or today.day == 2:
        return None
    this_first_day = date(today.year, today.month, 1)
    yesterday = today - timedelta(days=1)
    return this_first_day, yesterday, today


def getS3TotalSize(start_date, end_date):
    # noinspection SpellCheckingInspection
    ce: CWClient = boto3.client('cloudwatch')
    s3Client: S3Client = boto3.client("s3")
    buckets = s3Client.list_buckets()
    totalFileSize = 0.0
    totalFileCount = 0
    for bucket in buckets["Buckets"]:
        bucket_name = bucket["Name"]
        r = ce.get_metric_statistics(Namespace="AWS/S3", MetricName="BucketSizeBytes", StartTime=start_date, EndTime=end_date, Period=86400, Statistics=["Average"], Dimensions=[
            {
                'Name': 'BucketName',
                'Value': bucket_name
            },
            {
                'Name': 'StorageType',
                'Value': 'StandardStorage'
            }
        ])
        # noinspection SpellCheckingInspection
        if 0 < len(r["Datapoints"]):
            # noinspection SpellCheckingInspection
            totalFileSize += r["Datapoints"][0]["Average"]
        r = ce.get_metric_statistics(Namespace="AWS/S3", MetricName="NumberOfObjects", StartTime=start_date, EndTime=end_date, Period=86400, Statistics=["Average"], Dimensions=[
            {
                'Name': 'BucketName',
                'Value': bucket_name
            },
            {
                'Name': 'StorageType',
                'Value': 'AllStorageTypes'
            }
        ])
        # noinspection SpellCheckingInspection
        if 0 < len(r["Datapoints"]):
            # noinspection SpellCheckingInspection
            totalFileCount += r["Datapoints"][0]["Average"]
    return totalFileSize, totalFileCount


def 数字に日本語の単位を付ける(val):
    v = str(int(val))
    p1 = "^(\\d+)(\\d{4})$"
    r1 = re.match(p1, v)
    if r1:
        return r1.group(1) + "万" + r1.group(2)
    else:
        return v


def get():
    日付 = 日付取得()
    if 日付 is None:
        return
    if os.getenv("SLACK_URL") is None:
        sys.exit(1)
    slask_url = os.getenv("SLACK_URL")
    if os.getenv("SLACK_MENTION_USER_ID") is None:
        sys.exit(1)
    slack_mention_user_id = os.getenv("SLACK_MENTION_USER_ID")

    今日のS3情報 = getS3TotalSize(日付[2] - timedelta(days=2), 日付[2] - timedelta(days=1))
    昨日のS3情報 = getS3TotalSize(日付[2] - timedelta(days=3), 日付[2] - timedelta(days=2))
    # 今日までのデータs = run("2019-02-01", "2019-03-01", "MONTHLY", "Amazon Simple Storage Service/StandardStorage")
    # 昨日までのデータs = run("2019-02-01", "2019-02-28", "MONTHLY", "Amazon Simple Storage Service/StandardStorage")
    今日までのデータs = run(日付[0], 日付[2], "MONTHLY")
    昨日までのデータs = run(日付[0], 日付[1], "MONTHLY")
    新規のデータ = [今日までのデータ for 今日までのデータ in 今日までのデータs if len([昨日までのデータ for 昨日までのデータ in 昨日までのデータs if 今日までのデータ.オペレーション名 == 昨日までのデータ.オペレーション名 and 今日までのデータ.サービス名 == 昨日までのデータ.サービス名]) == 0]
    昨日と全く同じデータ = [今日までのデータ for 今日までのデータ in 今日までのデータs if len([昨日までのデータ for 昨日までのデータ in 昨日までのデータs if 今日までのデータ.オペレーション名 == 昨日までのデータ.オペレーション名 and 今日までのデータ.サービス名 == 昨日までのデータ.サービス名 and 今日までのデータ.価格の値 == 昨日までのデータ.価格の値 and 今日までのデータ.量の値 == 昨日までのデータ.量の値]) == 1]
    昨日と異なるデータ = [今日までのデータ for 今日までのデータ in 今日までのデータs if len([昨日までのデータ for 昨日までのデータ in 昨日までのデータs if 今日までのデータ.オペレーション名 == 昨日までのデータ.オペレーション名 and 今日までのデータ.サービス名 == 昨日までのデータ.サービス名 and (今日までのデータ.価格の値 != 昨日までのデータ.価格の値 or 今日までのデータ.量の値 != 昨日までのデータ.量の値)]) == 1]
    昨日までの料金 = sum([i.価格の値 for i in 昨日までのデータs])
    今日までの料金 = sum([i.価格の値 for i in 今日までのデータs])
    logs: List[str] = []
    logs.append("料金は {0:.2f} ドルです(昨日から {1:.2f} ドルの増加)".format(今日までの料金, 今日までの料金 - 昨日までの料金))
    if len(昨日と異なるデータ) != 0:
        price_logs_zero: List[str] = []
        price_logs_price: List[str] = []
        has_high_price_data = False
        for item in 昨日と異なるデータ:
            今日のデータ = item
            昨日のデータ = [i for i in 昨日までのデータs if i.サービス名 == 今日のデータ.サービス名 and i.オペレーション名 == 今日のデータ.オペレーション名][0]
            値段の単位 = 今日のデータ.価格の単位
            量の単位 = 今日のデータ.量の単位()
            今日の値段 = 今日のデータ.価格の値
            今日の量 = 今日のデータ.量の値
            昨日との差分の値段 = 今日の値段 - 昨日のデータ.価格の値
            昨日との差分の量 = 今日の量 - 昨日のデータ.量の値
            量が整数 = 今日のデータ.量の単位が整数()
            bold_text = ""
            key = 今日のデータ.サービス名 + "/" + 今日のデータ.オペレーション名
            if key == "Amazon Simple Storage Service/StandardStorage":
                if 昨日との差分の値段 >= 5.0:
                    has_high_price_data = True
                    bold_text = "*"
            elif 昨日との差分の値段 >= 1.0:
                has_high_price_data = True
                bold_text = "*"
            print_message = (bold_text + "  " +
                             今日のデータ.サービス名 +
                             "/" +
                             今日のデータ.オペレーション名 +
                             " " +
                             "{:.2f}".format(今日の値段) +
                             " " +
                             値段の単位 +
                             " / " +
                             ("{:.2f}".format(今日の量) if not 量が整数 else str(int(今日の量))) +
                             " " +
                             量の単位 +
                             " ( " +
                             ("+ " if 昨日との差分の値段 >= 0 else "- ") +
                             ("0.0" if "{:.2f}".format(昨日との差分の値段) == "0.00" else "{:.2f}".format(昨日との差分の値段)) +
                             " " +
                             値段の単位 +
                             " / " +
                             ("+ " if 昨日との差分の量 >= 0 else "- ") +
                             ("{:.2f}".format(昨日との差分の量) if not 量が整数 else str(int(昨日との差分の量))) +
                             " " +
                             量の単位 +
                             " )" + bold_text
                             )
            if "{:.2f}".format(今日の値段) == "0.00":
                price_logs_zero.append(print_message)
            else:
                price_logs_price.append(print_message)
        logs.append("料金に変更ありのデータは以下の通りです。")
        if has_high_price_data:
            logs.append("注意! 1ドル以上の増加があります! <@" + slack_mention_user_id + ">")
        for item in price_logs_price:
            logs.append(item)
        if len(price_logs_price) > 0 and len(price_logs_zero) > 0:
            logs.append("")
        for item in price_logs_zero:
            logs.append(item)
    else:
        logs.append("料金に変更ありのデータはありませんでした。")
    if len(昨日と全く同じデータ) != 0:
        # logs.append("昨日から変化の無いデータは以下の通りです。")
        for item in 昨日と全く同じデータ:
            price_log = ("  " +
                         item.サービス名 +
                         "/" +
                         item.オペレーション名 +
                         " " +
                         "{:.2f}".format(item.価格の値) +
                         " " +
                         item.価格の単位 +
                         " / " +
                         ("{:.2f}".format(item.量の値) if not item.量の単位が整数() else str(int(item.量の値))) +
                         " " +
                         item.量の単位()
                         )
            # logs.append(price_log)
    else:
        logs.append("昨日から変化の無いデータはありませんでした。")
    # s3 のファイルの個数
    logs.append("S3には合計 {0} 個のファイルがあり、容量は {1:.3f} GBです。".format(数字に日本語の単位を付ける(今日のS3情報[1]), 今日のS3情報[0] / 1024 / 1024 / 1024))
    logs.append("前日比で + {0} 個、 {1:.3f} GB です。".format(数字に日本語の単位を付ける(今日のS3情報[1] - 昨日のS3情報[1]), (今日のS3情報[0] - 昨日のS3情報[0]) / 1024 / 1024 / 1024))
    url = slask_url
    data = {
        'text': "\n".join(logs),
    }
    headers = {
        'Content-Type': 'application/json',
    }
    req = urllib.request.Request(url, json.dumps(data).encode(), headers)
    with urllib.request.urlopen(req) as res:
        body = res.read()


if __name__ == "__main__":
    get()

0
3
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
3