5
Help us understand the problem. What are the problem?

posted at

updated at

Organization

Cost Explorer API でアカウント毎に日別の請求額を取得する

モチベーション

Cost Explorer API を使用してアカウント毎に DAILY の請求額を取得し、以下のようなデータを CSV で出力したい。

Account Id Account Name 2022/4/1 2022/4/2 2022/4/3 ... 2022/4/30
000000000000 account-0000 42.792716528 40.124716527 43.123416527 ... 50.922465287
111111111111 account-1111 32.263379809 30.235379809 31.263353594 ... 22.133798094
222222222222 account-2222 751.71034839 720.51234839 772.62033294 ... 651.71042035
333333333333 account-3333 4.6428 5.1234 7.8765 ... 6.2234
444444444444 account-4444 407.74542211 420.12345211 395.12499518 ... 417.99454118
555555555555 account-5555 386.78950595 400.12500509 352.89924506 ... 370.75102656
...

AWS Cost Explorer コンソールでも同等の CSV をダウンロードできますが、日次で自動取得する方法について紹介します。

API リクエストの例

AWS Lambda を使用して当月の日別請求額を取得する最小限の例です。

import datetime
import boto3

def lambda_handler(event, context):
    today = datetime.date.today()
    start = today.replace(day=1).strftime('%Y-%m-%d')
    end = today.strftime('%Y-%m-%d')
    ce = boto3.client('ce')
    response = ce.get_cost_and_usage(
        TimePeriod={
            'Start': start,
            'End' :  end,
        },
        Granularity='DAILY',
        Metrics=[
            'NetUnblendedCost'
        ]
        GroupBy=[
            {
                'Type': 'DIMENSION',
                'Key': 'LINKED_ACCOUNT'
            }
        ]
    )
    return response['ResultsByTime']

DAILY で取得する際の注意点としては、ある1日のコストが確定するまでにタイムラグがあるという点です。例えば 4/26 に 4/25 分のコストを取得したとすると、実際の請求額より少ないケースがあります。AWS 側のデータが更新されるタイミングは公開されていませんが、翌々日の 4/27 には 4/28 分のコストが確定しているイメージです。

以下の記事でも確定タイミングについて紹介されていました。

Pandas で結果を加工する

前述の例で得られるレスポンスはネストされた json になっているため、Pandas を使用して CSV に加工する例を考えます。AWS Lambda で Pandas を使用する際には、Lambda Layers を使用する必要がある点についてご注意ください。

各要素に日別の請求データが格納されているため、for 文で 1 日ずつ処理します。pandas.json_normalize を使用してフラット化した後に、 pandas.concat で請求額と連結します。更に列名を請求日に変更したあとに Account Id をキーに結果をマージしています。

    merged_cost = pandas.DataFrame(
        index=[],
        columns=['Account Id']
    )

    for index, item in enumerate(response):
        normalized_json = pandas.json_normalize(item['Groups'])
        split_keys = pandas.DataFrame(
            normalized_json['Keys'].tolist(),
            columns=['Account Id']
        )
        cost = pandas.concat(
            [split_keys, normalized_json['Metrics.NetUnblendedCost.Amount']],
            axis=1
        )
        renamed_cost = cost.rename(
            columns={'Metrics.NetUnblendedCost.Amount': item['TimePeriod']['Start']}
        )
        merged_cost = pandas.merge(merged_cost, renamed_cost, on='Account Id', how='right')

   print(merged_cost)
Account Id  ...     2022-04-25
0   000000000000  ...  15.4985752779
1   111111111111  ...         0.2176
2   222222222222  ...   6.5567854795
3   333333333333  ...   6.6300957379
4   444444444444  ...   8.2720868504
..           ...  ...            ...
19  777777777777  ...  10.0121863554
18  888888888888  ...   6.5976412116
20  999999999999  ...    6.493243618
[20 rows x 26 columns]

最終的な Lambda 関数の例

for 文で処理した後に、AWS Organizations の API から取得したアカウント名のリストをマージし、冒頭に紹介した形式で CSV 出力しています。

lambda_function.py
from logging import getLogger, INFO
import os
import datetime
import boto3
import pandas
from botocore.exceptions import ClientError

logger = getLogger()
logger.setLevel(INFO)

def upload_s3(output, key, bucket):
    try:
        s3_resource = boto3.resource('s3')
        s3_bucket = s3_resource.Bucket(bucket)
        s3_bucket.upload_file(output, key, ExtraArgs={'ACL': 'bucket-owner-full-control'})
    except ClientError as err:
        logger.error(err.response['Error']['Message'])
        raise

def get_ou_ids(org, parent_id):
    ou_ids = []

    try:
        paginator = org.get_paginator('list_children')
        iterator = paginator.paginate(
            ParentId=parent_id,
            ChildType='ORGANIZATIONAL_UNIT'
        )
        for page in iterator:
            for ou in page['Children']:
                ou_ids.append(ou['Id'])
                ou_ids.extend(get_ou_ids(org, ou['Id']))
    except ClientError as err:
        logger.error(err.response['Error']['Message'])
        raise
    else:
        return ou_ids

def list_accounts():
    org = boto3.client('organizations')
    root_id = 'r-xxxx'
    ou_id_list = [root_id]
    ou_id_list.extend(get_ou_ids(org, root_id))
    accounts = []

    try:
        for ou_id in ou_id_list:
            paginator = org.get_paginator('list_accounts_for_parent')
            page_iterator = paginator.paginate(ParentId=ou_id)
            for page in page_iterator:
                for account in page['Accounts']:
                    item = [
                        account['Id'],
                        account['Name'],
                    ]
                    accounts.append(item)
    except ClientError as err:
        logger.error(err.response['Error']['Message'])
        raise
    else:
        return accounts

def get_cost_json(start, end):
    ce = boto3.client('ce')
    response = ce.get_cost_and_usage(
        TimePeriod={
            'Start': start,
            'End' :  end,
        },
        Granularity='DAILY',
        Metrics=[
            'NetUnblendedCost'
        ],
        GroupBy=[
            {
                'Type': 'DIMENSION',
                'Key': 'LINKED_ACCOUNT'
            }
        ]
    )
    return response['ResultsByTime']

def lambda_handler(event, context):
    today = datetime.date.today()
    start = today.replace(day=1).strftime('%Y-%m-%d')
    end = today.strftime('%Y-%m-%d')
    key = 'daily-cost-' + today.strftime('%Y-%m') + '.csv'
    output_file = '/tmp/output.csv'
    bucket = os.environ['BUCKET']
    account_list = pandas.DataFrame(list_accounts(), columns=['Account Id', 'Account Name'])
    daily_cost_list = get_cost_json(start, end)

    merged_cost = pandas.DataFrame(
        index=[],
        columns=['Account Id']
    )

    for index, item in enumerate(daily_cost_list):
        normalized_json = pandas.json_normalize(item['Groups'])
        split_keys = pandas.DataFrame(
            normalized_json['Keys'].tolist(),
            columns=['Account Id']
        )
        cost = pandas.concat(
            [split_keys, normalized_json['Metrics.NetUnblendedCost.Amount']],
            axis=1
        )
        renamed_cost = cost.rename(
            columns={'Metrics.NetUnblendedCost.Amount': item['TimePeriod']['Start']}
        )
        merged_cost = pandas.merge(merged_cost, renamed_cost, on='Account Id', how='outer')

    daily_cost = pandas.merge(account_list, merged_cost, on='Account Id', how='right')
    daily_cost.to_csv(output_file, index=False)
    upload_s3(output_file, key, bucket)

あとは EventBridge で任意の起動スケジュールを設定し、ターゲットに上記 Lambda 関数を設定すれば OK です。

過去の Cost Exploerer API ネタ

以上です。
参考になれば幸いです。

Register as a new user and use Qiita more conveniently

  1. You can follow users and tags
  2. you can stock useful information
  3. You can make editorial suggestions for articles
What you can do with signing up
5
Help us understand the problem. What are the problem?