はじめに
この記事では、CloudWatchメトリクスをJSON形式でエクスポートされたものをCSV形式に変換し、他の監視/ObservabillityツールやBIツールなどに共有する際のサンプルを示したものになります。この記事がどなたかの役に立てば幸いです
実現したいこと
- CloudWatchメトリクスをCSV形式で共有したい
- なるべくコストは押さえたい
アーキテクチャ図
実装(せっかちな人向け)
1. 送信元と送信先のAmazon S3バケットを作成
2. AWS Lambda用のIAMロールを作成(S3の送信元および送信先のGET権限、送信先のPUT権限)
3. Amazon CloudWatch メトリクスストリームを作成し、JSON形式を指定。必要なメトリクスを選択して送信元S3バケットを指定。IAMロールはサービスロールを作成
4. Lambda関数を作成
コード例(Python 3.13)
import json
import csv
import boto3
from datetime import datetime, timedelta
from io import StringIO
s3 = boto3.client('s3')
def flatten_json(y):
out = {}
def flatten(x, name=''):
if isinstance(x, dict):
for a in x:
flatten(x[a], name + a + '_')
else:
out[name[:-1]] = x
flatten(y)
return out
def lambda_handler(event, context):
try:
# 現在のUTC日時からパーティションパスを生成
now = datetime.utcnow()
prefix = f'{now.year}/{now.month:02d}/{now.day:02d}/{now.hour:02d}/'
# ソースバケット名を設定
source_bucket = 'your-source-s3-bucket-name'
# 指定のプレフィックスから最新のオブジェクトを取得
response = s3.list_objects_v2(Bucket=source_bucket, Prefix=prefix)
if 'Contents' not in response:
# 現在のパーティションでファイルが見つからない場合、1分前のパーティションをチェック
past_time = now - timedelta(minutes=1)
prefix = f'{past_time.year}/{past_time.month:02d}/{past_time.day:02d}/{past_time.hour:02d}/'
response = s3.list_objects_v2(Bucket=source_bucket, Prefix=prefix)
if 'Contents' not in response:
return {
'statusCode': 404,
'body': json.dumps('No files found in the specified partition paths')
}
# 最新のオブジェクトを取得
latest_file = max(response['Contents'], key=lambda x: x['LastModified'])
source_key = latest_file['Key']
# S3からJSONファイルを取得
try:
response = s3.get_object(Bucket=source_bucket, Key=source_key)
except Exception as e:
return {
'statusCode': 500,
'body': json.dumps(f"Error fetching the object: {str(e)}")
}
json_lines = response['Body'].read().decode('utf-8').strip().split('\n')
# 現在時刻からタイムスタンプを取得
timestamp = now.strftime('%Y%m%d%H%M%S')
# CSVファイル名を作成
csv_file_name = f"metrics_{timestamp}.csv"
# CSVデータを作成
csv_data = []
# ヘッダー行を作成
headers = []
for line in json_lines:
data = json.loads(line)
flat_data = flatten_json(data)
headers = list(flat_data.keys())
break
csv_data.append(headers)
# 各JSON行を解析してCSV行を作成
for line in json_lines:
data = json.loads(line)
flat_data = flatten_json(data)
row = [flat_data.get(header, '') for header in headers]
csv_data.append(row)
# CSVファイルの内容を文字列として保持
csv_buffer = StringIO()
csv_writer = csv.writer(csv_buffer)
csv_writer.writerows(csv_data)
# 変換されたCSVファイルをパーティションに分けて別のS3バケットにアップロード
destination_bucket = 'your-destination-s3-bucket-name'
partition_prefix = f'{now.year}/{now.month:02d}/{now.day:02d}/{now.hour:02d}/'
# パーティションが存在しない場合は作成
try:
s3.head_object(Bucket=destination_bucket, Key=partition_prefix)
except Exception as e:
s3.put_object(Bucket=destination_bucket, Key=partition_prefix)
s3.put_object(Bucket=destination_bucket, Key=f"{partition_prefix}{csv_file_name}", Body=csv_buffer.getvalue())
return {
'statusCode': 200,
'body': json.dumps('CSV file created and uploaded to S3 successfully')
}
except Exception as e:
return {
'statusCode': 500,
'body': json.dumps(f"An error occurred: {str(e)}")
}
5. 送信元バケットにS3イベント通知を設定し、先ほどのLambda関数を送信先に指定
ユースケース
- JSON形式やOpenTelemetoryに対応していないBIツールを活用したメトリクス分析
- オンプレミスベースの監視ツールとの連携したハイブリッドクラウド環境の監視・運用
- 高解像度メトリクスを利用する必要がなく、分単位での共有で構わない場合(高解像度メトリクスを利用する場合には、CloudWatch メトリクスストリームの送信先をAmazon Data Firehoseストリームを指定して、Data FirehoseとLambda関数を組み合わせたほうが高速に処理できるかと思います)
アーキテクチャの解説
CloudWatchのメトリクスのエクスポートについて
CloudWatchのメトリクスストリームを使うと、サードパーティのツールやS3に対してCloudWatchのメトリクスを共有することができるようになります
対応している送信先は以下の3つになります
- Amazon Data Firehose
Amazon Data Firehoseを利用してOpenSearch等に送信できます。Data Firehoseを利用するので、データをApache Parquetなどの形式に変換することもできます。対応しているデータ形式は下記の通りになります- JSON
- OpenTelemetry 0.7.0
- OpenTelemetry 1.0.0
- AWSパートナー製品
サードパーティの製品にも一部対応しています。本日現在対応しているサードパーティ製品は、以下の通りになります- Datadog
- Dynatrace
- New Relic
- Splunk Observability Cloud
- SumoLogic
- Amazon S3
Amazon S3への共有もできます。Amazon S3に共有する場合も、下記の形式で共有することができます- JSON
- OpenTelemetry 0.7.0
- OpenTelemetry 1.0.0
今回は、Amazon S3への共有を行います
参考文献
Amazon S3へJSON形式で共有された場合のJSONファイルについて
S3バケットにJSONで保管すると、下記のような形式で保管されます
- 保管先のパーティションは/${年}/${月}/${日}/${時}に保管される
- ファイル名は"MetricStreams-${MetricStreamName}-${StreamNumber}-${Number}-${TimeStamp}-${Suffix}"となります
- JSONファイルの中身は改行文字で区切られた複数のJSONオブジェクトが含まれており、各オブジェクトは1つのメトリクスの1つのデータポイントを含んでいます
- オブジェクト単位ではdimensionsキーとvalueキーがネストされているため、CSVにする際にはフラット化する必要があります
参考文献
JSON 形式の CloudWatch メトリクスストリームの出力
Lambda関数について
JSON形式で送信元バケットに保管されたCloudWatchのメトリクスを入力値として使用し、Keyをヘッダー行、Valueを各列に格納します
したがって変換の工程を大きく分けると、3つの工程があります
- 最新のファイルを送信元バケットから検索する
- 最新のタイムスタンプを取得
- 最新のタイムスタンプからパーティションパスを生成し、指定のプレフィックスからの最新オブジェクトを取得する
- CSVデータを作成
- CSVをフラット化して、ヘッダー行を作成する
- Valueを各列に入力
- 送信先バケットへ格納
- 格納先のパーティションを指定。毎時0分などでパーティションが存在ない場合は新たに作成する
- 指定のパーティションへCSVファイルをアップロード
主な注意点
すでにご存じのことも多いかと思いますが、以下に注意点を示しておきます
1. メトリクスを取得し続ける限り無尽蔵にデータが増え続けるので、Amazon S3のライフサイクルルールは事前に決めておきましょう
2. メトリクス数が増えてきた場合にLambdaのメモリ、エフェメラルストレージ、タイムアウトの調整が必要になってきます
3. Lambdaに付与するIAMロールは最小限としましょう(読み取り権限を送信元バケット・読み取り及び書き込み権限を送信先バケットに絞るetc)
4. データ連携先に応じてCSVファイルのKey行は適宜調整が必要です。また、文字エンコードなども適宜合わせる必要があります
5. Lambdaのエラー処理やログメッセージはもう少し改良の余地があるかと思いますので、適宜修正していただければと思います
まとめ
今回CloudWatchのメトリクスストリームのことが調べたかったのと、外部への連携をサードパーティのツールで実装するより、少しアーキテクチャやデータの中身を調べたいと思い、あえてLambdaで変換する形式をとってみました。
実際にCSVで共有するシチュエーションも出てくるかもしれません。その際には、ぜひ参考にしていただければと思います