3
1

CloudWatchLogsのロググループをS3へエクスポートするLambdaの作成

Posted at

はじめに

EC2等からCloudWatchLogsへ格納されたログをS3へ格納する場合はGUIやCLIからCloudWatchLogsのcreate-export-taskを実行する必要がある。
シェルスクリプトを作成してサーバ上からcronで定期実行することで、
日次や月次でS3へのエクスポートが可能であるが、これをEventBridgeとLambdaを使ってサーバレスで実行できるようにPythonでLambdaの中身を作成した。

環境・事前準備

以下の言語・バージョンを使用する。

  • Lambdaランタイム: Python 3.12

以下を前提とする。

  • EC2からCloudWatchLogsへログが出力されていること
  • CloudWatchLogsからエクスポート先となるS3バケットが作成済みであること

以下の仕様とする。

  • エクスポートする範囲はLambda実行前日の12:30~当日12:30とする
  • エクスポートする対象の定義はSystems Manager(SSM)のパラメータストアに定義する
  • エクスポート先S3バケット内に"EC2/ホスト名/yyyymmdd/"という日付別のディレクトリが作成され、その中にエクスポートする

事前環境

ロググループを以下のような命名で出力されるよう、CloudWatchAgentを設定する。

cw-logs-<ホスト名>-<ログ名>
例)cw-logs-gp-dev-ec2-01-messages

SSMパラメータストアに、以下のようなパラメータを用意しておく。

名前: CWlogs-ExportS3-Config
種類: String
値: ホスト名
例)
gp-dev-ec2-01
gp-dev-ec2-02

エクスポート先Bucketポリシー設定

エクスポート先となるS3バケットは、以下のドキュメントに従い、logs.Region.amazonaws.comからのアクセスを許可しておく。

Lambda実行用IAMポリシー

Lambdaの実行用ロールに付与するポリシーとして、AWSLambdaBasicExecutionRole
以下の権限を持つカスタムポリシーを付与する。

ambda用カスタムポリシー
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "AllowCreateExportTask",
            "Effect": "Allow",
            "Action": [
                "logs:CreateExportTask",
                "logs:CancelExportTask",
                "logs:DescribeExportTasks",
                "logs:DescribeLogStreams",
                "logs:DescribeLogGroups"
            ],
            "Resource": "*"
        },
        {
            "Sid": "AllowGetSSMparam",
            "Effect": "Allow",
            "Action": [
                "ssm:DescribeParameters",
                "ssm:GetParameter"
            ],
            "Resource": "*"
        },
        {
            "Sid": "AllowGetS3Bucket",
            "Effect": "Allow",
            "Action": [
                "s3:ListBucket",
                "s3:GetObject",
                "s3:PutObject"
            ],
            "Resource": "*"
        }
    ]
}

Lambdaコード

以下のコードを作成した。
コメントアウトでテスト用としているnow_unixに関しては、ログのエクスポート対象を前日12:30から当日のコード実行時刻までを対象とする場合に利用する。

lambda.py
import os
import logging
import boto3
import datetime
import time

# 環境変数取得
TARGET_BUCKET = os.environ["TARGET_BUCKET"]
SSM_PARAM_NAME = os.environ["SSM_PARAM_NAME"]
LOG_LEVEL = os.environ["LOG_LEVEL"]

# logger設定
logger = logging.getLogger()
if LOG_LEVEL == "INFO":
    print(f"LOGLEVEL:{LOG_LEVEL}")
    logger.setLevel(logging.INFO)
elif LOG_LEVEL == "DEBUG":
    print(f"LOGLEVEL:{LOG_LEVEL}")
    logger.setLevel(logging.DEBUG)
else:
    print("LOG_LEVEL is not defined.")

def func_get_date(mode="today"):
    date = datetime.datetime.now(datetime.timezone.utc)
    if mode == "today":
        # 当日日付取得
        date = date.strftime("%Y%m%d")
    if mode == "today_full":
        # 当日フルタイム取得
        date = date.strftime("%Y%m%d-%H%M%S%f")
    elif mode == "today_unix":
        # 当日UTC 12:30のUNIXタイム取得
        date = datetime.datetime.combine(date.date(), datetime.time(12, 30))
        date = int(date.timestamp() * 1000)
    elif mode == "yesterday_unix":
        # 前日UTC 12:30のUNIXタイム取得
        yesterday = date - datetime.timedelta(days=1)
        yesterday = datetime.datetime.combine(yesterday.date(), datetime.time(12, 30))
        date = int(yesterday.timestamp() * 1000)
    elif mode == "now_unix":
        # テスト用
        date = int(date.timestamp() * 1000)
    return date

def func_put_s3_object(bucketname, objectkey):
    logger.debug("===[function]Start:create_object===")
    s3 = boto3.client("s3")
    s3.put_object(
        Bucket = bucketname,
        Key = objectkey
    )
    logger.debug("===[function]End:create_object===")

def func_get_ssm_param_value(paramnamae):
    logger.debug("===[function]Start:get_ssm_param_value===")
    ssm = boto3.client("ssm")
    ssm_param_value = ssm.get_parameter(
        Name = paramnamae
    )
    ssm_param_value = ssm_param_value["Parameter"]["Value"]
    logger.debug("===[function]End:get_ssm_param_value===")
    return ssm_param_value

def func_get_logs_loggroups(hostname):
    logger.debug("===[function]Start:get_logs_loggroups===")
    logs = boto3.client("logs")
    logs_loggroups = logs.describe_log_groups(
        logGroupNamePattern = hostname
    )
    logs_loggroups = logs_loggroups["logGroups"]
    logger.debug("===[function]End:get_logs_loggroups===")
    return logs_loggroups

def func_create_cwlogs_export_task(loggroupname, hostname):
    today = func_get_date("today")
    today_full = func_get_date("today_full")
    yesterday_unix = func_get_date("yesterday_unix")
    # now_unix = func_get_date("now_unix") # テスト用
    today_unix = func_get_date("today_unix")
    logs = boto3.client("logs")
    export_task_name = f"{loggroupname}-{today_full}"

    # Exportタスク作成
    logger.debug("===[function]Start:create_cwlogs_export_task===")
    task_id = logs.create_export_task(
        taskName = export_task_name,
        logGroupName = loggroupname,
        fromTime = yesterday_unix,
        # to = now_unix, # テスト用
        to = today_unix,
        destination = TARGET_BUCKET,
        destinationPrefix = f"EC2/{hostname}/{today}"
    )
    logger.info(f"Create task name:{export_task_name}")
    logger.info(f"Time range:{yesterday_unix} to {today_unix}")
    logger.info(f"Create task ID:{task_id["taskId"]}")

    # タスクステータスチェック
    task_status_complete = False
    MAX_TASK_WAIT_COUNT = 90 # タスクタイムアウト秒数
    wati_count = 0
    while task_status_complete == False:
        time.sleep(1)
        task_status = logs.describe_export_tasks(
            taskId = task_id["taskId"]
        )
        task_status = task_status["exportTasks"][0]["status"]["code"]
        if task_status == "COMPLETED":
            logger.info(f"Task completed:{task_id["taskId"]}")
            task_status_complete = True
        elif task_status == "PENDING" or "RUNNING":
            if wati_count < MAX_TASK_WAIT_COUNT:
                logger.info(f"Task pending or running:{task_id["taskId"]}")
                logger.info(f"Task status:{task_status}")
                wati_count += 1
            elif wati_count >= MAX_TASK_WAIT_COUNT:
                logger.warning(f"Task time out:{task_id["taskId"]}")
                logger.warning(f"Task status:{task_status}")
                logs.cancel_export_task(
                    taskId = task_id["taskId"]
                )
                logger.warning(f"Task canceled:{task_id["taskId"]}")
                break
        else:
            logger.warning(f"Task status error:{task_id["taskId"]}")
            logger.info(f"Task status:{task_status}")
            break
    logger.debug("===[function]End:create_cwlogs_export_task===")

def lambda_handler(event, context):
    print("===START===")
    # 対象ホスト名取得
    host_list = func_get_ssm_param_value(SSM_PARAM_NAME)
    host_list = host_list.split("\n")

    # S3 Export先ディレクトリ作成
    for item in host_list:
        today = func_get_date("today")
        log_dir_prefix = f"EC2/{item}/{today}/"
        func_put_s3_object(TARGET_BUCKET, log_dir_prefix)

    # ロググループ名取得
    loggroup_name_list = []
    for hostname in host_list:
        print(f"HOSTNAME:{hostname}")
        loggroups_value = func_get_logs_loggroups(hostname)

        if not loggroups_value:
            print(f"{hostname}が含まれるロググループが存在しません。処理をスキップします。")
            logger.warning("Not exsist loggroup.")
        else:
            for key in loggroups_value:
                value = key["logGroupName"]
                loggroup_name_list.append(value)
            print(loggroup_name_list)
            for loggroup_name in loggroup_name_list:
                func_create_cwlogs_export_task(loggroup_name, hostname)
    print("===END===")

実行結果

以下のようにエクスポート先バケットが作成され、正常にCloudWatchLogsの中身がエクスポートされた。

実行結果
> aws s3 ls s3://cwlogs-export-bucket/EC2/ --recursive
2024-06-23 17:03:43          0 EC2/gp-dev-ec2-01/20240623/
2024-06-23 17:03:51       1086 EC2/gp-dev-ec2-01/20240623/12345-12345-12345-12345/gp-dev-ec2-01-secure/000000.gz
2024-06-23 17:03:50         27 EC2/gp-dev-ec2-01/20240623/aws-logs-write-test
2024-06-23 17:03:48      29846 EC2/gp-dev-ec2-01/20240623/11111-11111-11111/gp-dev-ec2-01-messages/000000.gz
2024-06-23 17:03:44          0 EC2/gp-dev-ec2-02/20240623/

エクスポート対象となるロググループをSSMのパラメータストアに指定したホスト名で検索しているため、
対象ホスト名が含まれるロググループ("cw-logs-gp-dev-ec2-01-messages"や"cw-logs-gp-dev-ec2-01-secure")がエクスポートされる。

実行時、動作検証のためにあえてロググループが存在しないホスト名を定義していた。
その場合は"EC2/ホスト名/yyyymmdd/"までは作成されるものの、エクスポートはスキップされ、エクスポートジョブも作成されない仕様とした。

Lambda出力ログ
2024-06-23T08:03:39.218Z	===START===
2024-06-23T08:03:39.421Z	[INFO] 2024-06-23T08:03:39.421Z Found credentials in environment variables.
2024-06-23T08:03:43.658Z	HOSTNAME:gp-dev-ec2-01
2024-06-23T08:03:44.339Z	['cw-logs-gp-dev-ec2-01-messages', 'cw-logs-gp-dev-ec2-01-secure']
2024-06-23T08:03:45.177Z	[INFO] 2024-06-23T08:03:45.177Z Create task name:cw-logs-gp-dev-ec2-01-messages-20240623-080344339750
2024-06-23T08:03:45.177Z	[INFO] 2024-06-23T08:03:45.177Z Time range:1719059400000 to 1719129824339
2024-06-23T08:03:45.177Z	[INFO] 2024-06-23T08:03:45.177Z Create task ID:11111-11111-11111
2024-06-23T08:03:46.199Z	[INFO] 2024-06-23T08:03:46.199Z Task pending or running:11111-11111-11111
2024-06-23T08:03:46.199Z	[INFO] 2024-06-23T08:03:46.199Z Task status:RUNNING
2024-06-23T08:03:47.219Z	[INFO] 2024-06-23T08:03:47.219Z Task pending or running:11111-11111-11111
2024-06-23T08:03:47.219Z	[INFO] 2024-06-23T08:03:47.219Z Task status:RUNNING
2024-06-23T08:03:48.228Z	[INFO] 2024-06-23T08:03:48.228Z Task completed:11111-11111-11111
2024-06-23T08:03:49.111Z	[INFO] 2024-06-23T08:03:49.111Z Create task name:cw-logs-gp-dev-ec2-01-secure-20240623-080348258852
2024-06-23T08:03:49.112Z	[INFO] 2024-06-23T08:03:49.112Z Time range:1719059400000 to 1719129828258
2024-06-23T08:03:49.112Z	[INFO] 2024-06-23T08:03:49.112Z Create task ID:12345-12345-12345-12345
2024-06-23T08:03:50.120Z	[INFO] 2024-06-23T08:03:50.120Z Task pending or running:12345-12345-12345-12345
2024-06-23T08:03:50.120Z	[INFO] 2024-06-23T08:03:50.120Z Task status:RUNNING
2024-06-23T08:03:51.139Z	[INFO] 2024-06-23T08:03:51.139Z Task completed:12345-12345-12345-12345
2024-06-23T08:03:51.141Z	HOSTNAME:gp-dev-ec2-02
2024-06-23T08:03:51.727Z	gp-dev-ec2-02が含まれるロググループが存在しません。処理をスキップします。
2024-06-23T08:03:51.727Z	[WARNING] 2024-06-23T08:03:51.727Z Not exsist loggroup.
2024-06-23T08:03:51.727Z	===END===

注意点

エクスポートタスクは以下の公式ドキュメントにある通り、並列にエクスポートタスクの実行ができない。

アカウントごとに、一度に 1 つのアクティブ (実行中または保留中) のエクスポートタスクがあります。このクォータは変更できません。

今回実装した処理もタスク作成→ステータスチェック→未完了の場合は1秒待機→再度ステータスチェックというような1タスクずつ完了を待つ流れをとっている(コード内で最大90秒まで待機設定)。
Lambdaの最長実行時間が15分であることを考えると、エクスポートするロググループが大量にある場合はタイムアウト時間や最長実行時間内に処理が完了しない懸念がある。
あまり規模が大きくない環境であれば15分以内に完了すると思われるが、
MWのログ等も多く取得している場合はLambdaタイムアウト時のリカバリーやEC2上でシェルでの実行に戻すことも考慮しなければならない。

3
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
3
1