はじめに
EC2等からCloudWatchLogsへ格納されたログをS3へ格納する場合はGUIやCLIからCloudWatchLogsのcreate-export-taskを実行する必要がある。
シェルスクリプトを作成してサーバ上からcronで定期実行することで、
日次や月次でS3へのエクスポートが可能であるが、これをEventBridgeとLambdaを使ってサーバレスで実行できるようにPythonでLambdaの中身を作成した。
環境・事前準備
以下の言語・バージョンを使用する。
- Lambdaランタイム: Python 3.12
以下を前提とする。
- EC2からCloudWatchLogsへログが出力されていること
- CloudWatchLogsからエクスポート先となるS3バケットが作成済みであること
以下の仕様とする。
- エクスポートする範囲はLambda実行前日の12:30~当日12:30とする
- エクスポートする対象の定義はSystems Manager(SSM)のパラメータストアに定義する
- エクスポート先S3バケット内に"EC2/ホスト名/yyyymmdd/"という日付別のディレクトリが作成され、その中にエクスポートする
事前環境
ロググループを以下のような命名で出力されるよう、CloudWatchAgentを設定する。
cw-logs-<ホスト名>-<ログ名>
例)cw-logs-gp-dev-ec2-01-messages
SSMパラメータストアに、以下のようなパラメータを用意しておく。
名前: CWlogs-ExportS3-Config
種類: String
値: ホスト名
例)
gp-dev-ec2-01
gp-dev-ec2-02
エクスポート先Bucketポリシー設定
エクスポート先となるS3バケットは、以下のドキュメントに従い、logs.Region.amazonaws.com
からのアクセスを許可しておく。
Lambda実行用IAMポリシー
Lambdaの実行用ロールに付与するポリシーとして、AWSLambdaBasicExecutionRole
と
以下の権限を持つカスタムポリシーを付与する。
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "AllowCreateExportTask",
"Effect": "Allow",
"Action": [
"logs:CreateExportTask",
"logs:CancelExportTask",
"logs:DescribeExportTasks",
"logs:DescribeLogStreams",
"logs:DescribeLogGroups"
],
"Resource": "*"
},
{
"Sid": "AllowGetSSMparam",
"Effect": "Allow",
"Action": [
"ssm:DescribeParameters",
"ssm:GetParameter"
],
"Resource": "*"
},
{
"Sid": "AllowGetS3Bucket",
"Effect": "Allow",
"Action": [
"s3:ListBucket",
"s3:GetObject",
"s3:PutObject"
],
"Resource": "*"
}
]
}
Lambdaコード
以下のコードを作成した。
コメントアウトでテスト用としているnow_unix
に関しては、ログのエクスポート対象を前日12:30から当日のコード実行時刻までを対象とする場合に利用する。
import os
import logging
import boto3
import datetime
import time
# 環境変数取得
TARGET_BUCKET = os.environ["TARGET_BUCKET"]
SSM_PARAM_NAME = os.environ["SSM_PARAM_NAME"]
LOG_LEVEL = os.environ["LOG_LEVEL"]
# logger設定
logger = logging.getLogger()
if LOG_LEVEL == "INFO":
print(f"LOGLEVEL:{LOG_LEVEL}")
logger.setLevel(logging.INFO)
elif LOG_LEVEL == "DEBUG":
print(f"LOGLEVEL:{LOG_LEVEL}")
logger.setLevel(logging.DEBUG)
else:
print("LOG_LEVEL is not defined.")
def func_get_date(mode="today"):
date = datetime.datetime.now(datetime.timezone.utc)
if mode == "today":
# 当日日付取得
date = date.strftime("%Y%m%d")
if mode == "today_full":
# 当日フルタイム取得
date = date.strftime("%Y%m%d-%H%M%S%f")
elif mode == "today_unix":
# 当日UTC 12:30のUNIXタイム取得
date = datetime.datetime.combine(date.date(), datetime.time(12, 30))
date = int(date.timestamp() * 1000)
elif mode == "yesterday_unix":
# 前日UTC 12:30のUNIXタイム取得
yesterday = date - datetime.timedelta(days=1)
yesterday = datetime.datetime.combine(yesterday.date(), datetime.time(12, 30))
date = int(yesterday.timestamp() * 1000)
elif mode == "now_unix":
# テスト用
date = int(date.timestamp() * 1000)
return date
def func_put_s3_object(bucketname, objectkey):
logger.debug("===[function]Start:create_object===")
s3 = boto3.client("s3")
s3.put_object(
Bucket = bucketname,
Key = objectkey
)
logger.debug("===[function]End:create_object===")
def func_get_ssm_param_value(paramnamae):
logger.debug("===[function]Start:get_ssm_param_value===")
ssm = boto3.client("ssm")
ssm_param_value = ssm.get_parameter(
Name = paramnamae
)
ssm_param_value = ssm_param_value["Parameter"]["Value"]
logger.debug("===[function]End:get_ssm_param_value===")
return ssm_param_value
def func_get_logs_loggroups(hostname):
logger.debug("===[function]Start:get_logs_loggroups===")
logs = boto3.client("logs")
logs_loggroups = logs.describe_log_groups(
logGroupNamePattern = hostname
)
logs_loggroups = logs_loggroups["logGroups"]
logger.debug("===[function]End:get_logs_loggroups===")
return logs_loggroups
def func_create_cwlogs_export_task(loggroupname, hostname):
today = func_get_date("today")
today_full = func_get_date("today_full")
yesterday_unix = func_get_date("yesterday_unix")
# now_unix = func_get_date("now_unix") # テスト用
today_unix = func_get_date("today_unix")
logs = boto3.client("logs")
export_task_name = f"{loggroupname}-{today_full}"
# Exportタスク作成
logger.debug("===[function]Start:create_cwlogs_export_task===")
task_id = logs.create_export_task(
taskName = export_task_name,
logGroupName = loggroupname,
fromTime = yesterday_unix,
# to = now_unix, # テスト用
to = today_unix,
destination = TARGET_BUCKET,
destinationPrefix = f"EC2/{hostname}/{today}"
)
logger.info(f"Create task name:{export_task_name}")
logger.info(f"Time range:{yesterday_unix} to {today_unix}")
logger.info(f"Create task ID:{task_id["taskId"]}")
# タスクステータスチェック
task_status_complete = False
MAX_TASK_WAIT_COUNT = 90 # タスクタイムアウト秒数
wati_count = 0
while task_status_complete == False:
time.sleep(1)
task_status = logs.describe_export_tasks(
taskId = task_id["taskId"]
)
task_status = task_status["exportTasks"][0]["status"]["code"]
if task_status == "COMPLETED":
logger.info(f"Task completed:{task_id["taskId"]}")
task_status_complete = True
elif task_status == "PENDING" or "RUNNING":
if wati_count < MAX_TASK_WAIT_COUNT:
logger.info(f"Task pending or running:{task_id["taskId"]}")
logger.info(f"Task status:{task_status}")
wati_count += 1
elif wati_count >= MAX_TASK_WAIT_COUNT:
logger.warning(f"Task time out:{task_id["taskId"]}")
logger.warning(f"Task status:{task_status}")
logs.cancel_export_task(
taskId = task_id["taskId"]
)
logger.warning(f"Task canceled:{task_id["taskId"]}")
break
else:
logger.warning(f"Task status error:{task_id["taskId"]}")
logger.info(f"Task status:{task_status}")
break
logger.debug("===[function]End:create_cwlogs_export_task===")
def lambda_handler(event, context):
print("===START===")
# 対象ホスト名取得
host_list = func_get_ssm_param_value(SSM_PARAM_NAME)
host_list = host_list.split("\n")
# S3 Export先ディレクトリ作成
for item in host_list:
today = func_get_date("today")
log_dir_prefix = f"EC2/{item}/{today}/"
func_put_s3_object(TARGET_BUCKET, log_dir_prefix)
# ロググループ名取得
loggroup_name_list = []
for hostname in host_list:
print(f"HOSTNAME:{hostname}")
loggroups_value = func_get_logs_loggroups(hostname)
if not loggroups_value:
print(f"{hostname}が含まれるロググループが存在しません。処理をスキップします。")
logger.warning("Not exsist loggroup.")
else:
for key in loggroups_value:
value = key["logGroupName"]
loggroup_name_list.append(value)
print(loggroup_name_list)
for loggroup_name in loggroup_name_list:
func_create_cwlogs_export_task(loggroup_name, hostname)
print("===END===")
実行結果
以下のようにエクスポート先バケットが作成され、正常にCloudWatchLogsの中身がエクスポートされた。
> aws s3 ls s3://cwlogs-export-bucket/EC2/ --recursive
2024-06-23 17:03:43 0 EC2/gp-dev-ec2-01/20240623/
2024-06-23 17:03:51 1086 EC2/gp-dev-ec2-01/20240623/12345-12345-12345-12345/gp-dev-ec2-01-secure/000000.gz
2024-06-23 17:03:50 27 EC2/gp-dev-ec2-01/20240623/aws-logs-write-test
2024-06-23 17:03:48 29846 EC2/gp-dev-ec2-01/20240623/11111-11111-11111/gp-dev-ec2-01-messages/000000.gz
2024-06-23 17:03:44 0 EC2/gp-dev-ec2-02/20240623/
エクスポート対象となるロググループをSSMのパラメータストアに指定したホスト名で検索しているため、
対象ホスト名が含まれるロググループ("cw-logs-gp-dev-ec2-01-messages"や"cw-logs-gp-dev-ec2-01-secure")がエクスポートされる。
実行時、動作検証のためにあえてロググループが存在しないホスト名を定義していた。
その場合は"EC2/ホスト名/yyyymmdd/"までは作成されるものの、エクスポートはスキップされ、エクスポートジョブも作成されない仕様とした。
2024-06-23T08:03:39.218Z ===START===
2024-06-23T08:03:39.421Z [INFO] 2024-06-23T08:03:39.421Z Found credentials in environment variables.
2024-06-23T08:03:43.658Z HOSTNAME:gp-dev-ec2-01
2024-06-23T08:03:44.339Z ['cw-logs-gp-dev-ec2-01-messages', 'cw-logs-gp-dev-ec2-01-secure']
2024-06-23T08:03:45.177Z [INFO] 2024-06-23T08:03:45.177Z Create task name:cw-logs-gp-dev-ec2-01-messages-20240623-080344339750
2024-06-23T08:03:45.177Z [INFO] 2024-06-23T08:03:45.177Z Time range:1719059400000 to 1719129824339
2024-06-23T08:03:45.177Z [INFO] 2024-06-23T08:03:45.177Z Create task ID:11111-11111-11111
2024-06-23T08:03:46.199Z [INFO] 2024-06-23T08:03:46.199Z Task pending or running:11111-11111-11111
2024-06-23T08:03:46.199Z [INFO] 2024-06-23T08:03:46.199Z Task status:RUNNING
2024-06-23T08:03:47.219Z [INFO] 2024-06-23T08:03:47.219Z Task pending or running:11111-11111-11111
2024-06-23T08:03:47.219Z [INFO] 2024-06-23T08:03:47.219Z Task status:RUNNING
2024-06-23T08:03:48.228Z [INFO] 2024-06-23T08:03:48.228Z Task completed:11111-11111-11111
2024-06-23T08:03:49.111Z [INFO] 2024-06-23T08:03:49.111Z Create task name:cw-logs-gp-dev-ec2-01-secure-20240623-080348258852
2024-06-23T08:03:49.112Z [INFO] 2024-06-23T08:03:49.112Z Time range:1719059400000 to 1719129828258
2024-06-23T08:03:49.112Z [INFO] 2024-06-23T08:03:49.112Z Create task ID:12345-12345-12345-12345
2024-06-23T08:03:50.120Z [INFO] 2024-06-23T08:03:50.120Z Task pending or running:12345-12345-12345-12345
2024-06-23T08:03:50.120Z [INFO] 2024-06-23T08:03:50.120Z Task status:RUNNING
2024-06-23T08:03:51.139Z [INFO] 2024-06-23T08:03:51.139Z Task completed:12345-12345-12345-12345
2024-06-23T08:03:51.141Z HOSTNAME:gp-dev-ec2-02
2024-06-23T08:03:51.727Z gp-dev-ec2-02が含まれるロググループが存在しません。処理をスキップします。
2024-06-23T08:03:51.727Z [WARNING] 2024-06-23T08:03:51.727Z Not exsist loggroup.
2024-06-23T08:03:51.727Z ===END===
注意点
エクスポートタスクは以下の公式ドキュメントにある通り、並列にエクスポートタスクの実行ができない。
アカウントごとに、一度に 1 つのアクティブ (実行中または保留中) のエクスポートタスクがあります。このクォータは変更できません。
今回実装した処理もタスク作成→ステータスチェック→未完了の場合は1秒待機→再度ステータスチェック
というような1タスクずつ完了を待つ流れをとっている(コード内で最大90秒まで待機設定)。
Lambdaの最長実行時間が15分であることを考えると、エクスポートするロググループが大量にある場合はタイムアウト時間や最長実行時間内に処理が完了しない懸念がある。
あまり規模が大きくない環境であれば15分以内に完了すると思われるが、
MWのログ等も多く取得している場合はLambdaタイムアウト時のリカバリーやEC2上でシェルでの実行に戻すことも考慮しなければならない。