LoginSignup
2
0

More than 3 years have passed since last update.

【Python】Lambdaで定期的にCloudWatch LogsからS3へエクスポートする

Posted at

【AWS】CloudFormationでS3バケット作成とライフサイクルルールを設定する で作成したS3にCloudWatch Logsに保存しているログを定期的にエクスポートするLambdaをPythonで開発したお話です。

簡単に試せるようにGitHubリポジトリを作成しておきました -> homoluctus/lambda-cwlogs-s3

要件

  • 毎日JST14:00に前日のログをエクスポート
  • 複数のロググループをエクスポート可能

開発

言語

  • Python3.8

開発ライブラリ

  • isort
  • mypy
  • flake8
  • autopep8

本番ライブラリ

  • boto3
  • boto3-stubs (type annotation用)

デプロイメントツール

  • Serverless Framework
  • GitHub Actions

AWSサービス

  • Lambda
  • CloudWatch Events
  • CloudWatch Logs
  • S3

コード

コード全部を載せるのは無理なので、メインとなるコードだけ解説用に載せます
完全なソースコードはリポジトリをみてね

エクスポートするロググループ設定用のクラス

TypeScriptのInterface風にエクスポートしたいロググループを複数設定できるようにしました。
エクスポートするロググループを追加する場合はLogGroupクラスを継承するだけでOKです。

エクスポートするS3のオブジェクトキーがどのロググループのログでいつのかわかりやすくなるようにしています。
docstringにも書いていますがdest_bucket/dest_obj_first_prefix or log_group/dest_obj_final_prefix/*というような階層構造になります。dest_obj_first_prefixが指定してなければ、log_groupの名前が入ります。*以降はエクスポートタスクのID/ログストリーム/ファイルみたいな感じになります。これは自動で付加されるのでコントロールできません.

class LogGroup(object, metaclass=ABCMeta):
    """CloudWatch LogsをS3へエクスポートするための設定用基底クラス

    エクスポートするロググループの追加方法

    class Example(LogGroup):
        log_group = 'test'
    """

    # log_groupは必須でそれ以外はオプション
    log_group: ClassVar[str]

    log_stream: ClassVar[str] = ''
    start_time: ClassVar[int] = get_specific_time_on_yesterday(
        hour=0, minute=0, second=0)
    end_time: ClassVar[int] = get_specific_time_on_yesterday(
        hour=23, minute=59, second=59)
    dest_bucket: ClassVar[str] = 'lambda-cwlogs-s3'
    dest_obj_first_prefix: ClassVar[str] = ''
    dest_obj_final_prefix: ClassVar[str] = get_yesterday('%Y-%m-%d')

    @classmethod
    def get_dest_obj_prefix(cls) -> str:
        """完全なS3のobject prefixを取得

        S3の階層構造
        dest_bucket/dest_obj_first_prefix/dest_obj_final_prefix/*

        Returns:
            str
        """

        first_prefix = cls.dest_obj_first_prefix or cls.log_group
        return f'{first_prefix}/{cls.dest_obj_final_prefix}'

    @classmethod
    def to_args(cls) -> Dict[str, Union[str, int]]:
        args: Dict[str, Union[str, int]] = {
            'logGroupName': cls.log_group,
            'fromTime': cls.start_time,
            'to': cls.end_time,
            'destination': cls.dest_bucket,
            'destinationPrefix': cls.get_dest_obj_prefix()
        }

        if cls.log_stream:
            args['logStreamNamePrefix'] = cls.log_stream

        return args

CloudWatch Logsのクライアント

CloudWatch LogsのAPIで使用するのは以下の2つ

  • create_export_task
    • S3へエクスポートするタスクを作成
    • タスクを作成するとCloudWatch LogsがS3へエクスポートしてくれる
    • レスポンスには作成したタスクのIDが含まれている
  • describe_export_tasks
    • 作成したタスクのtaskIdを引数としてタスクの情報を取得
    • タスクの進捗を確認するために使用する
    • ステータスコード
      • COMPLETEDならエクスポート完了
      • CANCELLEDFAILEDはエラー扱いにする
      • それ以外ならまだ未完了とする
@dataclass
class Exporter:
    region: InitVar[str]
    client: CloudWatchLogsClient = field(init=False)

    def __post_init__(self, region: str):
        self.client = boto3.client('logs', region_name=region)

    def export(self, target: Type[LogGroup]) -> str:
        """CloudWatch Logsの任意のロググループをS3へエクスポート

        Args:
            target (Type[LogGroup])

        Raises:
            ExportToS3Error

        Returns:
            str: CloudWatch Logs APIからのレスポンスに含まれるtaskId
        """

        try:
            response = self.client.create_export_task(
                **target.to_args())  # type: ignore
            return response['taskId']
        except Exception as err:
            raise ExportToS3Error(err)

    def get_export_progress(self, task_id: str) -> str:
        try:
            response = self.client.describe_export_tasks(taskId=task_id)
            status = response['exportTasks'][0]['status']['code']
            return status
        except Exception as err:
            raise GetExportTaskError(err)

    @classmethod
    def finishes(cls, status_code: str) -> bool:
        """エクスポートタスクが終了したかをステータスコードから判別する

        Args:
            status_code (str):
                describe_export_tasksのレスポンスに含まれるステータスコード

        Raises:
            ExportToS3Failure: ステータスコードがCANCELLEDかFAILEDの場合

        Returns:
            bool
        """

        uppercase_status_code = status_code.upper()

        if uppercase_status_code == 'COMPLETED':
            return True
        elif uppercase_status_code in ['CANCELLED', 'FAILED']:
            raise ExportToS3Failure('S3へのエクスポート失敗')
        return False

main

LogGroup.__subclasses__()でエクスポートしたいロググループの設定子クラスを取得します。
__subclasses__()はリストを返すので、それをfor文で回します。
CloudWatch Logsのエクスポートタスクはアカウントで1つしか同時に実行できないので、describe_export_tasks APIを叩いてタスクが完了しているか確認します。未完了であれば5s待つようにしています。create_export_taskが非同期APIなのでそのようにポーリングするしかありません。

def export_to_s3(exporter: Exporter, target: Type[LogGroup]) -> bool:
    task_id = exporter.export(target)
    logger.info(f'{target.log_group}をS3へエクスポート中 ({task_id=})')

    while True:
        status = exporter.get_export_progress(task_id)
        if exporter.finishes(status):
            return True
        sleep(5)


def main(event: Any, context: Any) -> bool:
    exporter = Exporter(region='ap-northeast-1')
    targets = LogGroup.__subclasses__()
    logger.info(f'エクスポート対象のロググループは{len(targets)}個')

    for target in targets:
        try:
            export_to_s3(exporter, target)
        except GetExportTaskError as err:
            logger.warning(err)
            logger.warning(f'{target.log_group}の進捗状況の取得失敗')
        except Exception as err:
            logger.error(err)
            logger.error(f'{target.log_group}のS3へエクスポート失敗')
        else:
            logger.info(f'{target.log_group}のS3へエクスポート完了')

    return True

serverless.yml

以下は一部を抜粋したものです。
Lambdaがエクスポートタスク作成とタスク情報の取得ができるようにIAM Roleを設定します。
それから、毎日JST14:00でエクスポートを実行したいので、eventsにcron(0 5 * * ? *)を指定します。CloudWatch EventsはUTCで動いているので、-9hすれば期待通りのJST14:00に実行してくれます。

  iamRoleStatements:
    - Effect: 'Allow'
      Action:
        - 'logs:createExportTask'
        - 'logs:DescribeExportTasks'
      Resource:
        - 'arn:aws:logs:${self:provider.region}:${self:custom.accountId}:log-group:*'

functions:
  export:
    handler: src/handler.main
    memorySize: 512
    timeout: 120
    events:
      - schedule: cron(0 5 * * ? *)
    environment:
      TZ: Asia/Tokyo

おわりに

homoluctus/lambda-cwlogs-s3にはGitHub Actionsやエクスポート先のS3を作成するためのCloudFormationテンプレートもあります。ぜひ参照してみてください。

Reference

2
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
0