1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

CloudWatchメトリクスをCSV形式で保管する

Posted at

はじめに

この記事では、CloudWatchメトリクスをJSON形式でエクスポートされたものをCSV形式に変換し、他の監視/ObservabillityツールやBIツールなどに共有する際のサンプルを示したものになります。この記事がどなたかの役に立てば幸いです

実現したいこと

  • CloudWatchメトリクスをCSV形式で共有したい
  • なるべくコストは押さえたい

アーキテクチャ図

image.png

実装(せっかちな人向け)

1. 送信元と送信先のAmazon S3バケットを作成
2. AWS Lambda用のIAMロールを作成(S3の送信元および送信先のGET権限、送信先のPUT権限)
3. Amazon CloudWatch メトリクスストリームを作成し、JSON形式を指定。必要なメトリクスを選択して送信元S3バケットを指定。IAMロールはサービスロールを作成
4. Lambda関数を作成

コード例(Python 3.13)

CloudWatchJSONToCSV.py
import json
import csv
import boto3
from datetime import datetime, timedelta
from io import StringIO

s3 = boto3.client('s3')

def flatten_json(y):
    out = {}
    def flatten(x, name=''):
        if isinstance(x, dict):
            for a in x:
                flatten(x[a], name + a + '_')
        else:
            out[name[:-1]] = x
    flatten(y)
    return out

def lambda_handler(event, context):
    try:
        # 現在のUTC日時からパーティションパスを生成
        now = datetime.utcnow()
        prefix = f'{now.year}/{now.month:02d}/{now.day:02d}/{now.hour:02d}/'
        
        # ソースバケット名を設定
        source_bucket = 'your-source-s3-bucket-name'

        # 指定のプレフィックスから最新のオブジェクトを取得
        response = s3.list_objects_v2(Bucket=source_bucket, Prefix=prefix)
        if 'Contents' not in response:
            # 現在のパーティションでファイルが見つからない場合、1分前のパーティションをチェック
            past_time = now - timedelta(minutes=1)
            prefix = f'{past_time.year}/{past_time.month:02d}/{past_time.day:02d}/{past_time.hour:02d}/'
            response = s3.list_objects_v2(Bucket=source_bucket, Prefix=prefix)
            if 'Contents' not in response:
                return {
                    'statusCode': 404,
                    'body': json.dumps('No files found in the specified partition paths')
                }
        
        # 最新のオブジェクトを取得
        latest_file = max(response['Contents'], key=lambda x: x['LastModified'])
        source_key = latest_file['Key']
        
        # S3からJSONファイルを取得
        try:
            response = s3.get_object(Bucket=source_bucket, Key=source_key)
        except Exception as e:
            return {
                'statusCode': 500,
                'body': json.dumps(f"Error fetching the object: {str(e)}")
            }
        json_lines = response['Body'].read().decode('utf-8').strip().split('\n')
        
        # 現在時刻からタイムスタンプを取得
        timestamp = now.strftime('%Y%m%d%H%M%S')
        
        # CSVファイル名を作成
        csv_file_name = f"metrics_{timestamp}.csv"
        
        # CSVデータを作成
        csv_data = []
        
        # ヘッダー行を作成
        headers = []
        for line in json_lines:
            data = json.loads(line)
            flat_data = flatten_json(data)
            headers = list(flat_data.keys())
            break
        
        csv_data.append(headers)
        
        # 各JSON行を解析してCSV行を作成
        for line in json_lines:
            data = json.loads(line)
            flat_data = flatten_json(data)
            row = [flat_data.get(header, '') for header in headers]
            csv_data.append(row)
        
        # CSVファイルの内容を文字列として保持
        csv_buffer = StringIO()
        csv_writer = csv.writer(csv_buffer)
        csv_writer.writerows(csv_data)
        
        # 変換されたCSVファイルをパーティションに分けて別のS3バケットにアップロード
        destination_bucket = 'your-destination-s3-bucket-name'
        partition_prefix = f'{now.year}/{now.month:02d}/{now.day:02d}/{now.hour:02d}/'
        
        # パーティションが存在しない場合は作成
        try:
            s3.head_object(Bucket=destination_bucket, Key=partition_prefix)
        except Exception as e:
            s3.put_object(Bucket=destination_bucket, Key=partition_prefix)
        
        s3.put_object(Bucket=destination_bucket, Key=f"{partition_prefix}{csv_file_name}", Body=csv_buffer.getvalue())
        
        return {
            'statusCode': 200,
            'body': json.dumps('CSV file created and uploaded to S3 successfully')
        }
    except Exception as e:
        return {
            'statusCode': 500,
            'body': json.dumps(f"An error occurred: {str(e)}")
        }

5. 送信元バケットにS3イベント通知を設定し、先ほどのLambda関数を送信先に指定

ユースケース

  • JSON形式やOpenTelemetoryに対応していないBIツールを活用したメトリクス分析
  • オンプレミスベースの監視ツールとの連携したハイブリッドクラウド環境の監視・運用
  • 高解像度メトリクスを利用する必要がなく、分単位での共有で構わない場合(高解像度メトリクスを利用する場合には、CloudWatch メトリクスストリームの送信先をAmazon Data Firehoseストリームを指定して、Data FirehoseとLambda関数を組み合わせたほうが高速に処理できるかと思います)

アーキテクチャの解説

CloudWatchのメトリクスのエクスポートについて

CloudWatchのメトリクスストリームを使うと、サードパーティのツールやS3に対してCloudWatchのメトリクスを共有することができるようになります
対応している送信先は以下の3つになります

  • Amazon Data Firehose
    Amazon Data Firehoseを利用してOpenSearch等に送信できます。Data Firehoseを利用するので、データをApache Parquetなどの形式に変換することもできます。対応しているデータ形式は下記の通りになります
    • JSON
    • OpenTelemetry 0.7.0
    • OpenTelemetry 1.0.0
  • AWSパートナー製品
    サードパーティの製品にも一部対応しています。本日現在対応しているサードパーティ製品は、以下の通りになります
    • Datadog
    • Dynatrace
    • New Relic
    • Splunk Observability Cloud
    • SumoLogic
  • Amazon S3
    Amazon S3への共有もできます。Amazon S3に共有する場合も、下記の形式で共有することができます
    • JSON
    • OpenTelemetry 0.7.0
    • OpenTelemetry 1.0.0
      今回は、Amazon S3への共有を行います

参考文献

メトリクスストリームを使用する

Amazon S3へJSON形式で共有された場合のJSONファイルについて

S3バケットにJSONで保管すると、下記のような形式で保管されます

  • 保管先のパーティションは/${年}/${月}/${日}/${時}に保管される
  • ファイル名は"MetricStreams-${MetricStreamName}-${StreamNumber}-${Number}-${TimeStamp}-${Suffix}"となります
  • JSONファイルの中身は改行文字で区切られた複数のJSONオブジェクトが含まれており、各オブジェクトは1つのメトリクスの1つのデータポイントを含んでいます
  • オブジェクト単位ではdimensionsキーとvalueキーがネストされているため、CSVにする際にはフラット化する必要があります

参考文献

JSON 形式の CloudWatch メトリクスストリームの出力

Lambda関数について

JSON形式で送信元バケットに保管されたCloudWatchのメトリクスを入力値として使用し、Keyをヘッダー行、Valueを各列に格納します
したがって変換の工程を大きく分けると、3つの工程があります

  • 最新のファイルを送信元バケットから検索する
    • 最新のタイムスタンプを取得
    • 最新のタイムスタンプからパーティションパスを生成し、指定のプレフィックスからの最新オブジェクトを取得する
  • CSVデータを作成
    • CSVをフラット化して、ヘッダー行を作成する
    • Valueを各列に入力
  • 送信先バケットへ格納
    • 格納先のパーティションを指定。毎時0分などでパーティションが存在ない場合は新たに作成する
    • 指定のパーティションへCSVファイルをアップロード

主な注意点

すでにご存じのことも多いかと思いますが、以下に注意点を示しておきます
1. メトリクスを取得し続ける限り無尽蔵にデータが増え続けるので、Amazon S3のライフサイクルルールは事前に決めておきましょう
2. メトリクス数が増えてきた場合にLambdaのメモリ、エフェメラルストレージ、タイムアウトの調整が必要になってきます
3. Lambdaに付与するIAMロールは最小限としましょう(読み取り権限を送信元バケット・読み取り及び書き込み権限を送信先バケットに絞るetc)
4. データ連携先に応じてCSVファイルのKey行は適宜調整が必要です。また、文字エンコードなども適宜合わせる必要があります
5. Lambdaのエラー処理やログメッセージはもう少し改良の余地があるかと思いますので、適宜修正していただければと思います

まとめ

今回CloudWatchのメトリクスストリームのことが調べたかったのと、外部への連携をサードパーティのツールで実装するより、少しアーキテクチャやデータの中身を調べたいと思い、あえてLambdaで変換する形式をとってみました。
実際にCSVで共有するシチュエーションも出てくるかもしれません。その際には、ぜひ参考にしていただければと思います

1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?