LoginSignup
1
0

CloudWatchLogsのログを日次でS3に転送

Last updated at Posted at 2024-03-28
import boto3
import datetime
import time
import csv
import logging

def lambda_handler(event, context):
        # 実行日取得
    to_time = datetime.datetime.now()
    # 1日前取得
    from_time = datetime.datetime.now() - datetime.timedelta(days=1)
    # エポック時刻取得(float型)
    epoc_from_time = from_time.timestamp()
    epoc_to_time = to_time.timestamp()
    # エポック時刻をミリ秒にしint型にキャスト
    m_epoc_from_time = int(epoc_from_time * 1000)
    m_epoc_epoc_to_time = int(epoc_to_time * 1000)
    
    # S3からCSVファイルを読み込む
    s3 = boto3.client('s3')
    bucket_name = 'バケット名'
    key = 'example.csv'
    obj = s3.get_object(Bucket=bucket_name, Key=key)
    rows = csv.reader(obj['Body'].read().decode('utf-8').splitlines())
    
    client = boto3.client('logs')
    # ロググループに保存されているログをS3にエクスポート
    for row in rows:
        logging.basicConfig(level=logging.DEBUG)
        log_group_name = row[0]
        s3_bucket = row[1]
        
        client.create_export_task(
            logGroupName=log_group_name,
            fromTime=m_epoc_from_time,
            to=m_epoc_epoc_to_time,
            destination=s3_bucket,
            destinationPrefix=log_group_name
        )

        # ログエクスポートが実行中か確認し、実行中の場合処理を5秒停止
        time.sleep(5) #記載しないとエラーになる
        export_tasks = client.describe_export_tasks(statusCode='RUNNING')
        print(export_tasks)
        while len(export_tasks['exportTasks']) >= 1:
            export_tasks = client.describe_export_tasks(statusCode='RUNNING')
            # time.sleep(5)
1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0