モチベーション
Cost Explorer API を使用してアカウント毎に DAILY の請求額を取得し、以下のようなデータを CSV で出力したい。
Account Id | Account Name | 2022/4/1 | 2022/4/2 | 2022/4/3 | ... | 2022/4/30 |
---|---|---|---|---|---|---|
000000000000 | account-0000 | 42.792716528 | 40.124716527 | 43.123416527 | ... | 50.922465287 |
111111111111 | account-1111 | 32.263379809 | 30.235379809 | 31.263353594 | ... | 22.133798094 |
222222222222 | account-2222 | 751.71034839 | 720.51234839 | 772.62033294 | ... | 651.71042035 |
333333333333 | account-3333 | 4.6428 | 5.1234 | 7.8765 | ... | 6.2234 |
444444444444 | account-4444 | 407.74542211 | 420.12345211 | 395.12499518 | ... | 417.99454118 |
555555555555 | account-5555 | 386.78950595 | 400.12500509 | 352.89924506 | ... | 370.75102656 |
... |
AWS Cost Explorer コンソールでも同等の CSV をダウンロードできますが、日次で自動取得する方法について紹介します。
API リクエストの例
AWS Lambda を使用して当月の日別請求額を取得する最小限の例です。
import datetime
import boto3
def lambda_handler(event, context):
today = datetime.date.today()
start = today.replace(day=1).strftime('%Y-%m-%d')
end = today.strftime('%Y-%m-%d')
ce = boto3.client('ce')
response = ce.get_cost_and_usage(
TimePeriod={
'Start': start,
'End' : end,
},
Granularity='DAILY',
Metrics=[
'NetUnblendedCost'
]
GroupBy=[
{
'Type': 'DIMENSION',
'Key': 'LINKED_ACCOUNT'
}
]
)
return response['ResultsByTime']
DAILY で取得する際の注意点としては、ある1日のコストが確定するまでにタイムラグがあるという点です。例えば 4/26 に 4/25 分のコストを取得したとすると、実際の請求額より少ないケースがあります。AWS 側のデータが更新されるタイミングは公開されていませんが、翌々日の 4/27 には 4/28 分のコストが確定しているイメージです。
以下の記事でも確定タイミングについて紹介されていました。
Pandas で結果を加工する
前述の例で得られるレスポンスはネストされた json になっているため、Pandas を使用して CSV に加工する例を考えます。AWS Lambda で Pandas を使用する際には、Lambda Layers を使用する必要がある点についてご注意ください。
各要素に日別の請求データが格納されているため、for 文で 1 日ずつ処理します。pandas.json_normalize を使用してフラット化した後に、 pandas.concat で請求額と連結します。更に列名を請求日に変更したあとに Account Id をキーに結果をマージしています。
merged_cost = pandas.DataFrame(
index=[],
columns=['Account Id']
)
for index, item in enumerate(response):
normalized_json = pandas.json_normalize(item['Groups'])
split_keys = pandas.DataFrame(
normalized_json['Keys'].tolist(),
columns=['Account Id']
)
cost = pandas.concat(
[split_keys, normalized_json['Metrics.NetUnblendedCost.Amount']],
axis=1
)
renamed_cost = cost.rename(
columns={'Metrics.NetUnblendedCost.Amount': item['TimePeriod']['Start']}
)
merged_cost = pandas.merge(merged_cost, renamed_cost, on='Account Id', how='right')
print(merged_cost)
Account Id ... 2022-04-25
0 000000000000 ... 15.4985752779
1 111111111111 ... 0.2176
2 222222222222 ... 6.5567854795
3 333333333333 ... 6.6300957379
4 444444444444 ... 8.2720868504
.. ... ... ...
19 777777777777 ... 10.0121863554
18 888888888888 ... 6.5976412116
20 999999999999 ... 6.493243618
[20 rows x 26 columns]
最終的な Lambda 関数の例
for 文で処理した後に、AWS Organizations の API から取得したアカウント名のリストをマージし、冒頭に紹介した形式で CSV 出力しています。
from logging import getLogger, INFO
import os
import datetime
import boto3
import pandas
from botocore.exceptions import ClientError
logger = getLogger()
logger.setLevel(INFO)
def upload_s3(output, key, bucket):
try:
s3_resource = boto3.resource('s3')
s3_bucket = s3_resource.Bucket(bucket)
s3_bucket.upload_file(output, key, ExtraArgs={'ACL': 'bucket-owner-full-control'})
except ClientError as err:
logger.error(err.response['Error']['Message'])
raise
def get_ou_ids(org, parent_id):
ou_ids = []
try:
paginator = org.get_paginator('list_children')
iterator = paginator.paginate(
ParentId=parent_id,
ChildType='ORGANIZATIONAL_UNIT'
)
for page in iterator:
for ou in page['Children']:
ou_ids.append(ou['Id'])
ou_ids.extend(get_ou_ids(org, ou['Id']))
except ClientError as err:
logger.error(err.response['Error']['Message'])
raise
else:
return ou_ids
def list_accounts():
org = boto3.client('organizations')
root_id = 'r-xxxx'
ou_id_list = [root_id]
ou_id_list.extend(get_ou_ids(org, root_id))
accounts = []
try:
for ou_id in ou_id_list:
paginator = org.get_paginator('list_accounts_for_parent')
page_iterator = paginator.paginate(ParentId=ou_id)
for page in page_iterator:
for account in page['Accounts']:
item = [
account['Id'],
account['Name'],
]
accounts.append(item)
except ClientError as err:
logger.error(err.response['Error']['Message'])
raise
else:
return accounts
def get_cost_json(start, end):
ce = boto3.client('ce')
response = ce.get_cost_and_usage(
TimePeriod={
'Start': start,
'End' : end,
},
Granularity='DAILY',
Metrics=[
'NetUnblendedCost'
],
GroupBy=[
{
'Type': 'DIMENSION',
'Key': 'LINKED_ACCOUNT'
}
]
)
return response['ResultsByTime']
def lambda_handler(event, context):
today = datetime.date.today()
start = today.replace(day=1).strftime('%Y-%m-%d')
end = today.strftime('%Y-%m-%d')
key = 'daily-cost-' + today.strftime('%Y-%m') + '.csv'
output_file = '/tmp/output.csv'
bucket = os.environ['BUCKET']
account_list = pandas.DataFrame(list_accounts(), columns=['Account Id', 'Account Name'])
daily_cost_list = get_cost_json(start, end)
merged_cost = pandas.DataFrame(
index=[],
columns=['Account Id']
)
for index, item in enumerate(daily_cost_list):
normalized_json = pandas.json_normalize(item['Groups'])
split_keys = pandas.DataFrame(
normalized_json['Keys'].tolist(),
columns=['Account Id']
)
cost = pandas.concat(
[split_keys, normalized_json['Metrics.NetUnblendedCost.Amount']],
axis=1
)
renamed_cost = cost.rename(
columns={'Metrics.NetUnblendedCost.Amount': item['TimePeriod']['Start']}
)
merged_cost = pandas.merge(merged_cost, renamed_cost, on='Account Id', how='outer')
daily_cost = pandas.merge(account_list, merged_cost, on='Account Id', how='right')
daily_cost.to_csv(output_file, index=False)
upload_s3(output_file, key, bucket)
あとは EventBridge で任意の起動スケジュールを設定し、ターゲットに上記 Lambda 関数を設定すれば OK です。
過去の Cost Exploerer API ネタ
以上です。
参考になれば幸いです。