0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Lambda Durable Functions って何ができるのか?

Posted at

はじめに

ちょっと間が空いてしまいましたが、Lambda の新機能、Lambda Durable Functions が発表されました。
どんなものなのか理解を目的として、調べてみようと思います。

Lambda Durable Functions は日本語にしてみると「耐久性のある関数」「永続的な関数」みたいな感じですね。
Lambda の欠点として 15 分の制約があるので、永続的に処理が動いたら嬉しいものです。

今回は、この Durable の意味を解き明かしていきたいと思います。

概要

Lambda Durable Functions は、Lambda 関数の実行状態を AWS が自動的に永続化し、再開可能にする仕組みです。

これにより、以下のようなことが可能になります。

  • 最大 1 年間に及ぶ処理を Lambda で実装
  • wait による明示的な待機(スリープ):課金発生無し
  • ステップ単位での処理実行と再実行
  • Lambda の再起動・失敗を意識しないコード記述

実行履歴や中間状態が管理されており、チェックポイントとリプレイの仕組みを提供します。
Durable Functions が待機ポイントや再試行などの中断から再開すると、コードは最初から実行されますが、完了したチェックポイントはスキップされ、かつ保存された結果が使用されます。
その結果、冪等性を担保した長期的な処理実行を実現しています。

とはいってもイメージが湧きづらいので、以降でより詳細な使い方を見ていきます。

構成要素

Durable Functions を利用するには、
Durable Execution SDK を使用します。
※ Python の場合のコードで記述します。

from aws_durable_execution_sdk import DurableContext, durable_execution

@durable_execution
def handler(event: dict, context: DurableContext):
    # Your function receives DurableContext
    # Use context.step(), context.wait(), etc.
    return result
  • @durable_execution
    → Lambda を Durable Execution として実行するためのデコレータ
  • DurableContext
    → ステップ実行・待機を制御するためのコンテキスト

このSDK は、永続的な実行を可能にする 3 つの重要な役割を担います。

チェックポイント管理:SDK は自動的にチェックポイントを作成し、各チェックポイントには、操作の種類、入力、結果が記録されます。ステップを完了するとチェックポイントを保持し、これにより中断された場合でも完了した操作から再開できます。

リプレイ調整:関数が一時停止または中断後に再開されるとコードは最初から実行されますが、完了した操作はスキップし、保存されたチェックポイント結果を使用して再実行される。同じ入力とチェックポイントログが与えられた場合、関数は同じ結果を生成します。

状態の分離: 実行状態はビジネスロジックとは別に管理される。それぞれの実行には、他の実行からはアクセスできない独自のチェックポイントログが存在する。

DurableContext は、いくつかの操作を提供しています。
主となる操作は stepwaitです。

context.step()

result = context.step(
    lambda _: payment_service.charge(amount),
    name='process-payment'
)
  • 自動チェックポイントと再試行機能を備えたビジネスロジックを実行
  • 前後にチェックポイントを作成し、結果を保存することで再試行可能
  • 再実行時にはすでに完了したステップはスキップ

context.wait()

context.wait(3600)
  • コンピューティングリソースを消費することなく、指定された期間実行を一時停止
  • チェックポイントを作成し、関数呼び出しを終了
  • 待機が完了後、関数は再度呼び出しされ、待機ポイントまで再生してから処理を続行

他には以下のような操作がありますが、今回は触れないでおきます。

  • context.createCallback
  • context.waitForCallback
  • context.parallel
  • context.map
  • context.runInChildContext
  • context.waitForCondition
  • context.invoke

サンプルを理解してみる

公式ドキュメントに記載があるサンプルコードを見ていきたいと思います。
私はデータエンジニアリング領域の仕事をしていますので、データパイプラインのサンプルを見てみます。

大規模なデータセットを抽出(E)、変換(T)、ロード(L)の各フェーズで処理し、各フェーズ間にチェックポイントを設定しています。チェックポイントにより、中断された場合でもパイプラインをどのフェーズからでも再開できます。

サンプルコード全量は以下の通りです。


from aws_durable_execution_sdk import DurableContext, durable_execution
from datetime import datetime

@durable_execution
def lambda_handler(event, context: DurableContext):
    dataset_id = event['datasetId']
    batch_size = event['batchSize']
    
    # Stage 1: Extract data from source
    def extract_data(_):
        records = source_database.extract_records(dataset_id)
        return {'recordCount': len(records), 'records': records}
    
    extracted = context.step(extract_data, name='extract-data')
    
    # Wait 5 minutes to respect source system rate limits
    context.wait(300)
    
    # Stage 2: Transform data in batches
    def transform_data(_):
        batches = chunk_array(extracted['records'], batch_size)
        results = []
        
        for batch in batches:
            transformed = transform_service.process_batch(batch)
            results.append(transformed)
        
        return {'batchCount': len(batches), 'results': results}
    
    transformed = context.step(transform_data, name='transform-data')
    
    # Wait until off-peak hours (e.g., 2 AM)
    now = datetime.now()
    target_hour = 2
    ms_until_target = calculate_ms_until_hour(now, target_hour)
    context.wait(ms_until_target // 1000)
    
    # Stage 3: Load data to destination
    def load_data(_):
        loaded_count = 0
        
        for result in transformed['results']:
            destination_database.load_batch(result)
            loaded_count += len(result)
        
        return {'loadedCount': loaded_count}
    
    loaded = context.step(load_data, name='load-data')
    
    # Stage 4: Verify and finalize
    def verify_pipeline(_):
        verification = destination_database.verify_records(dataset_id)
        pipeline_service.mark_complete(dataset_id, verification)
        return verification
    
    verified = context.step(verify_pipeline, name='verify-pipeline')
    
    return {
        'datasetId': dataset_id,
        'recordsProcessed': extracted['recordCount'],
        'batchesProcessed': transformed['batchCount'],
        'recordsLoaded': loaded['loadedCount'],
        'verified': verified['success']
    }

処理の流れは以下の通りです。

  1. データ抽出(Extract)
  2. 待機(レート制限対策)
  3. バッチ変換(Transform)
  4. 指定時刻まで待機
  5. データロード(Load)
  6. 検証・完了処理

まず、データ抽出の処理を見てみます。
extract_data という関数を定義し、それを context.step() に引数として渡すことで、データ取得処理を 1 step として実行しています。
後続処理で使いたい値、今回の場合は抽出データ recordsreturn する必要があるようです。

    # Stage 1: Extract data from source
    def extract_data(_):
        records = source_database.extract_records(dataset_id)
        return {'recordCount': len(records), 'records': records}
    
    extracted = context.step(extract_data, name='extract-data')

次は、外部システムのレート制限を考慮し、5 分待機を行なっています。
この待機時間は課金対象外です。

    # Wait 5 minutes to respect source system rate limits
    context.wait(300)

次は変換処理です。
抽出処理と同じで、transform_data という関数を context.step() に渡して実行しています。
ビジネスロジックとしてはデータをバッチに分割し、変換処理を繰り返し実行しています。

    # Stage 2: Transform data in batches
    def transform_data(_):
        batches = chunk_array(extracted['records'], batch_size)
        results = []
        
        for batch in batches:
            transformed = transform_service.process_batch(batch)
            results.append(transformed)
        
        return {'batchCount': len(batches), 'results': results}
    
    transformed = context.step(transform_data, name='transform-data')

次は、処理のオフピークである午前2時まで処理を待機させるという内容です。
これまでのシンプルな Lambda ではできなかった制御ですね。

    # Wait until off-peak hours (e.g., 2 AM)
    now = datetime.now()
    target_hour = 2
    ms_until_target = calculate_ms_until_hour(now, target_hour)
    context.wait(ms_until_target // 1000)

次は ETL の最後、ロード処理です。
context.step() の使い方は同じです。
変換済みデータを順次ロードしています。

    # Stage 3: Load data to destination
    def load_data(_):
        loaded_count = 0
        
        for result in transformed['results']:
            destination_database.load_batch(result)
            loaded_count += len(result)
        
        return {'loadedCount': loaded_count}
    
    loaded = context.step(load_data, name='load-data')

最後に、レコード検証を行なって処理を完了しています。

    # Stage 4: Verify and finalize
    def verify_pipeline(_):
        verification = destination_database.verify_records(dataset_id)
        pipeline_service.mark_complete(dataset_id, verification)
        return verification
    
    verified = context.step(verify_pipeline, name='verify-pipeline')

ETL 処理の場合、気になるのは処理時間です。
各 step では Lambda のタイムアウト時間 15 分を超える可能性があります。

step 内の途中で Lambda が終了した場合、Durable Functions の仕様上、step の途中状態は保存されないため、再実行時は step の最初から処理されると考えられます。(※公式ドキュメントおよび挙動からの推測であり、詳細な検証は行っていません)
そうなると、大規模なデータパイプラインを構築する場合は冪等性を担保して、かつ処理を細分化して 15 分以内に終わるような step に分割する必要があります。

おわりに

Durable Functions を使うことで、

  • 長時間処理
  • 待機
  • ステップ単位の再実行

を Lambda だけで実装することができるようになりました。
まだまだどう使っていけるのかは見極める必要がありますが、Lambda でできることが増えることは、実装の幅が広がり、結果としてシステムの複雑化を防ぐ効果があると感じました。

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?