LoginSignup
5
7

More than 3 years have passed since last update.

ParallelizationFactorによる並列化でKinesisのレコードはLambdaにどう渡されるのか

Posted at

Lambdaはこのアップデートで、ParallelizationFactor がサポートされ、ストリームソースの1つのシャードを複数のLambda呼び出しで並列処理できるようになりました。

並列化した場合に関数にはどのようにレコードが渡ってくるのか、気になる点を確認しました。

ParallelizationFactor の意義

Kinesis Data Streamsではシャード毎にレコードに振ったシーケンス番号でレコードの入力と出力の順序を保証しているため、コンシューマで、あるレコードの処理が完了せず停滞した場合、次以降のレコードはその分遅延します。

この遅延量は、Kinesisでは GetRecords.IteratorAgeMilliseconds メトリクスで1、Lambda側では IteratorAge メトリクス2で監視でき、遅延測定のポイントが違いますが、Kinesisを使う時は両方のメトリクスの増加に気をつける必要があります。

遅延はストリームのデータの供給速度より消費速度が遅ければ、徐々に蓄積していくことになります。
これまで、Kinesis Client LibraryでもLambdaでも、基本的に1つのシャードを1つのコンシューマが担当してシリアルに処理するため、シャード数を増やして遅延対策をする他ありませんでした。

しかし、Lambdaに ParallelizationFactor オプションが追加され、1つのシャードに対して、Lambda呼び出しを1〜10並列実行でき、消費側のスループットだけをスケールさせられるようになりました。
これは IteratorAge が高くなりがちなワークロードに対して非常に有効な対策となります。

実験環境

下記のように、コンシューマ側で遅延を生じる環境を作成して、DynamoDBに記録された処理結果と、CloudWatchのメトリクスを見ていきます。

  • Kinesis Data Streams
    • シャード数: 1
  • Producer
    • producer/put_records.rb をPCで実行する
    • put_records.rb
    • 15 をパーティションキーとして、各パーティションキーごとに1始まりの整数をデータとするレコードを1秒毎に30回送信する
  • Consumer
    • consumer/lambda_function.rb をトリガをKinesisとして、Lambdaで実行する
    • lambda_function.rb
    • 1呼び出しごとに、DynamoDBに受信したレコードと処理時刻を記録する
    • 1レコードの処理ごとに3秒スリープする

環境構築はこちらのterraformで行いました。

並列なし(ParallelizationFactor = 1)の場合

まず、並列なしで実行した結果です。
以下は関数から処理開始時刻、終了時刻、イベントで渡されたレコードの {パーティションキー}-{データ番号} をDynamoDBに記録したものと、CloudWatchのメトリクスです。
呼び出しは計4回シリアルに行われ、最後の呼び出し時点ではLambdaのIteratorAgeは300秒の遅延となり、トータル450秒かかりました。

$ aws dynamodb scan --table-name $TABLENAME | jq '.Items | map({start: .start.S, end: .end.S, records: [.records.L[].S] | join(",")}) | sort_by(.start)'
[
  {
    "start": "2020-01-14 06:01:10 +0000",
    "end": "2020-01-14 06:01:25 +0000",
    "records": "1-1,2-1,3-1,4-1,5-1"
  },
  {
    "start": "2020-01-14 06:01:25 +0000",
    "end": "2020-01-14 06:03:55 +0000",
    "records": "1-2,2-2,3-2,4-2,5-2,1-3,2-3,3-3,4-3,5-3,1-4,2-4,3-4,4-4,5-4,1-5,2-5,3-5,4-5,5-5,1-6,2-6,3-6,4-6,5-6,1-7,2-7,3-7,4-7,5-7,1-8,2-8,3-8,4-8,5-8,1-9,2-9,3-9,4-9,5-9,1-10,2-10,3-10,4-10,5-10,1-11,2-11,3-11,4-11,5-11"
  },
  {
    "start": "2020-01-14 06:03:55 +0000",
    "end": "2020-01-14 06:06:25 +0000",
    "records": "1-12,2-12,3-12,4-12,5-12,1-13,2-13,3-13,4-13,5-13,1-14,2-14,3-14,4-14,5-14,1-15,2-15,3-15,4-15,5-15,1-16,2-16,3-16,4-16,5-16,1-17,2-17,3-17,4-17,5-17,1-18,2-18,3-18,4-18,5-18,1-19,2-19,3-19,4-19,5-19,1-20,2-20,3-20,4-20,5-20,1-21,2-21,3-21,4-21,5-21"
  },
  {
    "start": "2020-01-14 06:06:25 +0000",
    "end": "2020-01-14 06:08:40 +0000",
    "records": "1-22,2-22,3-22,4-22,5-22,1-23,2-23,3-23,4-23,5-23,1-24,2-24,3-24,4-24,5-24,1-25,2-25,3-25,4-25,5-25,1-26,2-26,3-26,4-26,5-26,1-27,2-27,3-27,4-27,5-27,1-28,2-28,3-28,4-28,5-28,1-29,2-29,3-29,4-29,5-29,1-30,2-30,3-30,4-30,5-30"
  }
]

01-metrics.png

re:invent2019の A serverless journey: AWS Lambda under the hood (SVS405-R1)で説明されていますが、Lambdaのストリームソースの処理の内部では、Stream Tracker が、Leasing Service を通じて、起動しているシャード数に従って Poller をアレンジし、Poller がシャードをサブスクライブをしてGetRecordsを行っています。そしてLambdaの関数はPollerから呼び出されます。

ここで、Kinesisの GetRecords.IteratorAgeMilliSeconds に遅延がないのは、PollerのGetRecordsの実行には遅延がないためです。
GetRecords.IteratorAgeMilliSeconds は、タイムアウトなどLambdaの呼び出しがエラーになったことで、GetRecordsが遡って行われる場合に増加します。

ParallelizationFactor = 3の場合

次にParallelizationFactorを3にして実行します。変更はコンソールか、CLIで下記のように設定できます。

$ aws lambda update-event-source-mapping --uuid $TRIGGER_UUID --parallelization-factor 3

結果はこのようになりました。
Lambdaの呼び出しは計10回で、トータルは180秒で完了しました。IteratorAgeの最大も67秒に短縮できています。

各呼び出しで渡されたレコードを見ると、パーティションキー 1, 4 のレコード、3のレコード、2, 5のレコードのグループに分けられています。
そして呼び出しのタイムスタンプを見ると、各グループの処理は並列に呼び出されているけれども、グループごとにシリアルに実行されています。
つまり、並列化してもパーティションキー毎のレコード順序は保証されます。

こちらでも説明されていますが、PollerはLambdaのParallelizationFactorの設定をみて、フロントエンドの起動数をスケールします。
このときスケールするのはBatcherで、パーティションキー毎に割り振られるBatcherは一意に決まることで順序を維持しているようです。

[
  {
    "start": "2020-01-14 06:23:05 +0000",
    "end": "2020-01-14 06:23:17 +0000",
    "records": "1-1,4-1,1-2,4-2"
  },
  {
    "start": "2020-01-14 06:23:05 +0000",
    "end": "2020-01-14 06:23:11 +0000",
    "records": "3-1,3-2"
  },
  {
    "start": "2020-01-14 06:23:05 +0000",
    "end": "2020-01-14 06:23:17 +0000",
    "records": "2-1,5-1,2-2,5-2"
  },
  {
    "start": "2020-01-14 06:23:11 +0000",
    "end": "2020-01-14 06:23:29 +0000",
    "records": "3-3,3-4,3-5,3-6,3-7,3-8"
  },
  {
    "start": "2020-01-14 06:23:17 +0000",
    "end": "2020-01-14 06:24:23 +0000",
    "records": "2-3,5-3,2-4,5-4,2-5,5-5,2-6,5-6,2-7,5-7,2-8,5-8,2-9,5-9,2-10,5-10,2-11,5-11,2-12,5-12,2-13,5-13"
  },
  {
    "start": "2020-01-14 06:23:17 +0000",
    "end": "2020-01-14 06:24:23 +0000",
    "records": "1-3,4-3,1-4,4-4,1-5,4-5,1-6,4-6,1-7,4-7,1-8,4-8,1-9,4-9,1-10,4-10,1-11,4-11,1-12,4-12,1-13,4-13"
  },
  {
    "start": "2020-01-14 06:23:29 +0000",
    "end": "2020-01-14 06:24:17 +0000",
    "records": "3-9,3-10,3-11,3-12,3-13,3-14,3-15,3-16,3-17,3-18,3-19,3-20,3-21,3-22,3-23,3-24"
  },
  {
    "start": "2020-01-14 06:24:17 +0000",
    "end": "2020-01-14 06:24:35 +0000",
    "records": "3-25,3-26,3-27,3-28,3-29,3-30"
  },
  {
    "start": "2020-01-14 06:24:23 +0000",
    "end": "2020-01-14 06:26:05 +0000",
    "records": "2-14,5-14,2-15,5-15,2-16,5-16,2-17,5-17,2-18,5-18,2-19,5-19,2-20,5-20,2-21,5-21,2-22,5-22,2-23,5-23,2-24,5-24,2-25,5-25,2-26,5-26,2-27,5-27,2-28,5-28,2-29,5-29,2-30,5-30"
  },
  {
    "start": "2020-01-14 06:24:23 +0000",
    "end": "2020-01-14 06:26:05 +0000",
    "records": "1-14,4-14,1-15,4-15,1-16,4-16,1-17,4-17,1-18,4-18,1-19,4-19,1-20,4-20,1-21,4-21,1-22,4-22,1-23,4-23,1-24,4-24,1-25,4-25,1-26,4-26,1-27,4-27,1-28,4-28,1-29,4-29,1-30,4-30"
  }
]

02-metrics.png

呼び出しのタイミングのイメージです。

para3.png

エラーハンドリング

並列化した場合も、パーティションキー毎に担当されるBatcherは固定されていることがわかりました。では、あるレコードでエラーが発生した場合に、同じBatcherに割り振られた他のパーティションキーのレコードの処理はどうなるのでしょうか。
意図的にパーティションキー 1 のデータ 5 の処理でエラー終了するように関数を変更して実行しました。

[
  {
    "id": "dd0433d1-d3ea-45ad-b90f-38571fae3682@2020-01-15 06:30:21 +0000",
    "start": "2020-01-15 06:30:21 +0000",
    "end": "2020-01-15 06:30:27 +0000",
    "records": "2-1,5-1"
  },
  {
    "id": "c50dd83a-6f7e-4bb0-99dd-373f3514d41a@2020-01-15 06:30:21 +0000",
    "start": "2020-01-15 06:30:21 +0000",
    "end": "2020-01-15 06:30:24 +0000",
    "records": "3-1"
  },
  {
    "id": "22bbbc6c-393a-4cb4-9270-a32fab209970@2020-01-15 06:30:21 +0000",
    "start": "2020-01-15 06:30:21 +0000",
    "end": "2020-01-15 06:30:27 +0000",
    "records": "1-1,4-1"
  },

  ()

  {
    "aws_request_id": "1ec49225-cb26-444f-b2a4-a7c7bc2eb71f",
    "start": "2020-01-15 06:30:27 +0000",
    "end": "2020-01-15 06:31:03 +0000",
    "records": "1-2,4-2,1-3,4-3,1-4,4-4,1-5,4-5,1-6,4-6,1-7,4-7"
  },

  ()

  {
    "aws_request_id": "a966ff27-8569-4fbf-8594-adcf7947683f",
    "start": "2020-01-15 06:31:00 +0000",
    "end": "2020-01-15 06:31:51 +0000",
    "records": "3-14,3-15,3-16,3-17,3-18,3-19,3-20,3-21,3-22,3-23,3-24,3-25,3-26,3-27,3-28,3-29,3-30"
  },
  {
    "aws_request_id": "1ec49225-cb26-444f-b2a4-a7c7bc2eb71f",
    "start": "2020-01-15 06:31:03 +0000",
    "end": "2020-01-15 06:31:39 +0000",
    "records": "1-2,4-2,1-3,4-3,1-4,4-4,1-5,4-5,1-6,4-6,1-7,4-7"
  },
  {
    "aws_request_id": "c43dbc93-16b1-4666-b565-2df99f7f68eb",
    "start": "2020-01-15 06:31:03 +0000",
    "end": "2020-01-15 06:33:21 +0000",
    "records": "2-8,5-8,2-9,5-9,2-10,5-10,2-11,5-11,2-12,5-12,2-13,5-13,2-14,5-14,2-15,5-15,2-16,5-16,2-17,5-17,2-18,5-18,2-19,5-19,2-20,5-20,2-21,5-21,2-22,5-22,2-23,5-23,2-24,5-24,2-25,5-25,2-26,5-26,2-27,5-27,2-28,5-28,2-29,5-29,2-30,5-30"
  },
  {
    "aws_request_id": "1ec49225-cb26-444f-b2a4-a7c7bc2eb71f",
    "start": "2020-01-15 06:31:40 +0000",
    "end": "2020-01-15 06:32:16 +0000",
    "records": "1-2,4-2,1-3,4-3,1-4,4-4,1-5,4-5,1-6,4-6,1-7,4-7"
  },
  {
    "aws_request_id": "1ec49225-cb26-444f-b2a4-a7c7bc2eb71f",
    "start": "2020-01-15 06:32:17 +0000",
    "end": "2020-01-15 06:32:53 +0000",
    "records": "1-2,4-2,1-3,4-3,1-4,4-4,1-5,4-5,1-6,4-6,1-7,4-7"
  },
  {
    "aws_request_id": "1ec49225-cb26-444f-b2a4-a7c7bc2eb71f",
    "start": "2020-01-15 06:32:55 +0000",
    "end": "2020-01-15 06:33:31 +0000",
    "records": "1-2,4-2,1-3,4-3,1-4,4-4,1-5,4-5,1-6,4-6,1-7,4-7"
  },
  {
    "aws_request_id": "1ec49225-cb26-444f-b2a4-a7c7bc2eb71f",
    "start": "2020-01-15 06:33:34 +0000",
    "end": "2020-01-15 06:34:10 +0000",
    "records": "1-2,4-2,1-3,4-3,1-4,4-4,1-5,4-5,1-6,4-6,1-7,4-7"
  },
  {
    "aws_request_id": "1ec49225-cb26-444f-b2a4-a7c7bc2eb71f",
    "start": "2020-01-15 06:34:17 +0000",
    "end": "2020-01-15 06:34:53 +0000",
    "records": "1-2,4-2,1-3,4-3,1-4,4-4,1-5,4-5,1-6,4-6,1-7,4-7"
  }
]

03-metrics.png

結果、1-5 レコードを含むLambdaの呼び出し 1ec49225-cb26-444f-b2a4-a7c7bc2eb71f はエラーとなり、他のBatcherの呼び出しが完了した後もリトライを繰り返しています。
そして、パーティションキー1, 4のレコードはこのバッチ移行処理が進まなくなり、IteratorAgeはどんどん増加しています。これは一番避けるべき事態です。

paraNG.png

対策として、Lambdaのイベントソースマッピングのストリームソースのオプションには、エラーのリカバリのためのサポートがあるので、これを利用すべきです。

BisectBatchOnFunctionError

今回のエラーは意図的に発生させているため回復の見込みがないですが、タイムアウトの場合、処理対象のデータを減らすことは有効です。
BisectBatchOnFunctionError を有効にすると、エラーになったバッチを2分割してLambdaの呼び出しが行われるので、制限時間内に処理を完了できる可能性が高められます。

MaximumRecordAgeInSeconds, MaximumRetryAttempts と DestinationConfig

エラーによるリトライは MaximumRecordAgeInSeconds または MaximumRetryAttempts を満了するまで続きます。
1ec49225-cb26-444f-b2a4-a7c7bc2eb71f の呼び出し時刻を見ると、徐々に前回の終了時刻からのラグが増えていることがわかります。
これはエクスポネンシャルバックオフが採用されているためで、連続するエラーのリトライ回数を緩和するためです。
これらの係数を小さくすることは、リトライアウトを早めますが、ストリームのスループット維持の妨げにならない値に調整する必要があります。
そして仮にリトライアウトが発生しても、 DestinationConfig でSQSのデッドレターキューを指定することができるので、リトライでリカバリ出来ないレコードは、ストリーム処理の枠外でエラーを確認し、場合によってはリトライすることができます。

以上です。
これまでKinesisClientLibraryを使っていましたが、自前で頑張る必要があった部分が、Lambdaでほぼ機能としてサポートされているので、Kinesisの処理はLambda一択になりそうです。

5
7
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
5
7