1
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

AWS Lambda durable functionsでCloudWatch Logsのログをエクスポートしてみる

1
Last updated at Posted at 2026-02-28

こんにちは。
AWS CDKに囚われている@sori883です。

最近Lambda durable functionsを使ってCloudWatch Logsのログエクスポートを実装したので、その備忘録を残します。

AWS Lambda durable functionsの概要

durable functionsは通常のLambdaにチェックポイント、リプレイを追加し、
通常のLambda関数単体では難しかった待機、中断、再開をネイティブサポートしてくれる機能です。

  • @durable_executionを設定することで、durable実行が出来る
  • context.step@durable_stepを設定することでstepが定義できる
    • step完了時は自動でチェックポイントが保存される
  • context.waitで処理を待機出来る
    • wait完了後は自動でチェックポイントが保存される
from aws_durable_execution_sdk_python import durable_execution, durable_step, DurableContext
from aws_durable_execution_sdk_python.config import Duration

@durable_step
def step_1(step_context):
    return "Step 1 done"

@durable_step
def step_2(step_context):
    return "Step 2 done"

@durable_execution
def lambda_handler(event: dict, context: DurableContext):
    result1 = context.step(step_1(), name="step-1") # step1
    context.wait(Duration.from_seconds(10)) # 10秒待機
    result2 = context.step(step_2(), name="step-2") # step2
    return {"step1": result1, "step2": result2}

今までは15分以上実行するためにStep Functionsと組み合わせる一工夫が必要でしたが、durable functionsを使えばLambda関数単体で実装できるようになります。

durable functionsでログエクスポートする

今までLambda関数とStep Functionsで実装していましたが、durable functionsで実装してみます。

レポジトリ

durable functions、S3、EventBridgeをCDKで実装したレポジトリです。
cdk deployだけで使えるはず。

durable functionsの実装

durable functionsで実装するログエクスポートフローは下記です。

  1. エクスポート対象のロググループを取得
  2. CreateExportTaskを実行しログをS3にエクスポート
  3. CreateExportTaskのステータスを確認しながら完了するまで待機

これをそのままdurable functionsのstepとして定義します。

Step1. エクスポート対象のロググループを取得

特定のタグが付与されたロググループを取得するStepを定義します。

@durable_step
def get_target_log_groups(step_context):
    # 特定タグが付与されたロググループを取得する
    log_groups = []
    paginator = tagging_client.get_paginator("get_resources")

    for page in paginator.paginate(
        TagFilters=[{"Key": TARGET_TAG_KEY, "Values": [TARGET_TAG_VALUE]}],
        ResourceTypeFilters=["logs:log-group"],
    ):
        for resource in page.get("ResourceTagMappingList", []):
            arn = resource["ResourceARN"]
            log_group_name = arn.split(":log-group:")[-1]
            if log_group_name.endswith(":*"):
                log_group_name = log_group_name[:-2]
            log_groups.append(log_group_name)

    return log_groups

Step2. CreateExportTaskを実行しログをS3にエクスポート

次にCreateExportTaskを実行しロググループをS3niエクスポートするStepを定義します。

@durable_step
def create_export_task(step_context, log_group_name, from_time, to_time, date_str):
    # CreateExportTaskを呼び出してログをエクスポート
    prefix = f"logs/{log_group_name.replace('/', '-').lstrip('-')}/{date_str}"
    response = logs_client.create_export_task(
        logGroupName=log_group_name,
        fromTime=from_time,
        to=to_time,
        destination=EXPORT_BUCKET,
        destinationPrefix=prefix,
    )
    return response["taskId"]

Step3. CreateExportTaskの完了ステータスを確認

最後にCreateExportTaskのステータスを確認するStepを定義します。

@durable_step
def check_export_status(step_context, task_id):
    # エクスポートタスクのステータスを確認する
    response = logs_client.describe_export_tasks(taskId=task_id)
    tasks = response.get("exportTasks", [])

    if not tasks:
        raise Exception(f"Export task not found: {task_id}")

    current_status = tasks[0]["status"]["code"]

    if current_status in ["FAILED", "CANCELLED"]:
        message = tasks[0]["status"].get("message", "Unknown error")
        raise Exception(f"Export task {current_status}: {message}")

    return current_status

ハンドラー関数

@durable_executionを付与したハンドラー関数で処理フローを組んでいきます。

  • context.stepで定義したstepを使用する
  • context.waitで待機する
@durable_execution
def handler(event: dict, context: DurableContext):
    # Step1. エクスポート対象のロググループを取得
    log_groups = context.step(get_target_log_groups(), name="get-target-log-groups")

    results = []

    for log_group_name in log_groups:
      # Step2. CreateExportTaskを実行しログをS3にエクスポート
        task_id = context.step(
            create_export_task(log_group_name, from_time, to_time, date_str),
            name=f"create-export-{log_group_name}",
        )
        
        status = "PENDING"
        # ステータスがCOMPLETEDになるまで繰り返す
        while status != "COMPLETED":
            # 120秒待機
            context.wait(Duration.from_seconds(120))
            # Step3. CreateExportTaskの完了ステータスを確認
            status = context.step(
                check_export_status(task_id),
                name=f"check-{task_id}",
            )

        results.append({
            "logGroupName": log_group_name,
            "taskId": task_id,
            "status": "COMPLETED",
        })

    return {
        "status": "COMPLETED",
        "exportedCount": len(results),
        "results": results,
    }

実行結果

ログが少なすぎて4分で終わっていますが、お仕事環境でやった時は3時間くらい動いていることを確認してます。
また、永続オペレーションの実行ログではdurable functionsで定義した各stepがWaitを挟んで意図したフローで実行されていることが伺えます。

durable functionsの実行結果.jpg

1
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?