【AWS】CloudFormationでS3バケット作成とライフサイクルルールを設定する で作成したS3にCloudWatch Logsに保存しているログを定期的にエクスポートするLambdaをPythonで開発したお話です。
簡単に試せるようにGitHubリポジトリを作成しておきました -> homoluctus/lambda-cwlogs-s3
要件
- 毎日JST14:00に前日のログをエクスポート
- 複数のロググループをエクスポート可能
開発
言語
- Python3.8
開発ライブラリ
- isort
- mypy
- flake8
- autopep8
本番ライブラリ
- boto3
- boto3-stubs (type annotation用)
デプロイメントツール
- Serverless Framework
- GitHub Actions
AWSサービス
- Lambda
- CloudWatch Events
- CloudWatch Logs
- S3
コード
コード全部を載せるのは無理なので、メインとなるコードだけ解説用に載せます
完全なソースコードはリポジトリをみてね
エクスポートするロググループ設定用のクラス
TypeScriptのInterface風にエクスポートしたいロググループを複数設定できるようにしました。
エクスポートするロググループを追加する場合はLogGroupクラスを継承するだけでOKです。
エクスポートするS3のオブジェクトキーがどのロググループのログでいつのかわかりやすくなるようにしています。
docstringにも書いていますがdest_bucket/dest_obj_first_prefix or log_group/dest_obj_final_prefix/*
というような階層構造になります。dest_obj_first_prefixが指定してなければ、log_groupの名前が入ります。*
以降はエクスポートタスクのID/ログストリーム/ファイル
みたいな感じになります。これは自動で付加されるのでコントロールできません.
class LogGroup(object, metaclass=ABCMeta):
"""CloudWatch LogsをS3へエクスポートするための設定用基底クラス
エクスポートするロググループの追加方法
class Example(LogGroup):
log_group = 'test'
"""
# log_groupは必須でそれ以外はオプション
log_group: ClassVar[str]
log_stream: ClassVar[str] = ''
start_time: ClassVar[int] = get_specific_time_on_yesterday(
hour=0, minute=0, second=0)
end_time: ClassVar[int] = get_specific_time_on_yesterday(
hour=23, minute=59, second=59)
dest_bucket: ClassVar[str] = 'lambda-cwlogs-s3'
dest_obj_first_prefix: ClassVar[str] = ''
dest_obj_final_prefix: ClassVar[str] = get_yesterday('%Y-%m-%d')
@classmethod
def get_dest_obj_prefix(cls) -> str:
"""完全なS3のobject prefixを取得
S3の階層構造
dest_bucket/dest_obj_first_prefix/dest_obj_final_prefix/*
Returns:
str
"""
first_prefix = cls.dest_obj_first_prefix or cls.log_group
return f'{first_prefix}/{cls.dest_obj_final_prefix}'
@classmethod
def to_args(cls) -> Dict[str, Union[str, int]]:
args: Dict[str, Union[str, int]] = {
'logGroupName': cls.log_group,
'fromTime': cls.start_time,
'to': cls.end_time,
'destination': cls.dest_bucket,
'destinationPrefix': cls.get_dest_obj_prefix()
}
if cls.log_stream:
args['logStreamNamePrefix'] = cls.log_stream
return args
CloudWatch Logsのクライアント
CloudWatch LogsのAPIで使用するのは以下の2つ
- create_export_task
- S3へエクスポートするタスクを作成
- タスクを作成するとCloudWatch LogsがS3へエクスポートしてくれる
- レスポンスには作成したタスクのIDが含まれている
- describe_export_tasks
- 作成したタスクのtaskIdを引数としてタスクの情報を取得
- タスクの進捗を確認するために使用する
- ステータスコード
-
COMPLETED
ならエクスポート完了 -
CANCELLED
とFAILED
はエラー扱いにする - それ以外ならまだ未完了とする
-
@dataclass
class Exporter:
region: InitVar[str]
client: CloudWatchLogsClient = field(init=False)
def __post_init__(self, region: str):
self.client = boto3.client('logs', region_name=region)
def export(self, target: Type[LogGroup]) -> str:
"""CloudWatch Logsの任意のロググループをS3へエクスポート
Args:
target (Type[LogGroup])
Raises:
ExportToS3Error
Returns:
str: CloudWatch Logs APIからのレスポンスに含まれるtaskId
"""
try:
response = self.client.create_export_task(
**target.to_args()) # type: ignore
return response['taskId']
except Exception as err:
raise ExportToS3Error(err)
def get_export_progress(self, task_id: str) -> str:
try:
response = self.client.describe_export_tasks(taskId=task_id)
status = response['exportTasks'][0]['status']['code']
return status
except Exception as err:
raise GetExportTaskError(err)
@classmethod
def finishes(cls, status_code: str) -> bool:
"""エクスポートタスクが終了したかをステータスコードから判別する
Args:
status_code (str):
describe_export_tasksのレスポンスに含まれるステータスコード
Raises:
ExportToS3Failure: ステータスコードがCANCELLEDかFAILEDの場合
Returns:
bool
"""
uppercase_status_code = status_code.upper()
if uppercase_status_code == 'COMPLETED':
return True
elif uppercase_status_code in ['CANCELLED', 'FAILED']:
raise ExportToS3Failure('S3へのエクスポート失敗')
return False
main
LogGroup.__subclasses__()でエクスポートしたいロググループの設定子クラスを取得します。
__subclasses__()はリストを返すので、それをfor文で回します。
CloudWatch Logsのエクスポートタスクはアカウントで1つしか同時に実行できないので、describe_export_tasks APIを叩いてタスクが完了しているか確認します。未完了であれば5s待つようにしています。create_export_taskが非同期APIなのでそのようにポーリングするしかありません。
def export_to_s3(exporter: Exporter, target: Type[LogGroup]) -> bool:
task_id = exporter.export(target)
logger.info(f'{target.log_group}をS3へエクスポート中 ({task_id=})')
while True:
status = exporter.get_export_progress(task_id)
if exporter.finishes(status):
return True
sleep(5)
def main(event: Any, context: Any) -> bool:
exporter = Exporter(region='ap-northeast-1')
targets = LogGroup.__subclasses__()
logger.info(f'エクスポート対象のロググループは{len(targets)}個')
for target in targets:
try:
export_to_s3(exporter, target)
except GetExportTaskError as err:
logger.warning(err)
logger.warning(f'{target.log_group}の進捗状況の取得失敗')
except Exception as err:
logger.error(err)
logger.error(f'{target.log_group}のS3へエクスポート失敗')
else:
logger.info(f'{target.log_group}のS3へエクスポート完了')
return True
serverless.yml
以下は一部を抜粋したものです。
Lambdaがエクスポートタスク作成とタスク情報の取得ができるようにIAM Roleを設定します。
それから、毎日JST14:00でエクスポートを実行したいので、eventsにcron(0 5 * * ? *)
を指定します。CloudWatch EventsはUTCで動いているので、-9hすれば期待通りのJST14:00に実行してくれます。
iamRoleStatements:
- Effect: 'Allow'
Action:
- 'logs:createExportTask'
- 'logs:DescribeExportTasks'
Resource:
- 'arn:aws:logs:${self:provider.region}:${self:custom.accountId}:log-group:*'
functions:
export:
handler: src/handler.main
memorySize: 512
timeout: 120
events:
- schedule: cron(0 5 * * ? *)
environment:
TZ: Asia/Tokyo
おわりに
homoluctus/lambda-cwlogs-s3にはGitHub Actionsやエクスポート先のS3を作成するためのCloudFormationテンプレートもあります。ぜひ参照してみてください。