この記事は
この記事は、筆者がKinesis Client Library(以下、KCL)を利用した経験から、公式ドキュメントなどを読んでも分かりにくかった点や、ソースコードの読み込み、実際に動作する環境を用意して調査する必要があった知見を共有するために記述する。
上記の目的上、この文書だけを読んでもKCLについて理解できるような内容ではない。
KCLの責務について
KCLは「Kinesisストリームに格納されたレコードをアプリケーションが受信すること」を責務として設計されている。言い換えると、「受信したレコードの処理について責任を負わない」と言ってもよい。
もう少し掘り下げる。KCLを利用するアプリケーションはKinesisストリームに格納されたレコードを受信して、なんらかの計算1やデータパイプラインの後段への連携(通知)、またはデータストアへのデータの格納などを行う。この時、例えば受信したレコードに想定しないようなデータが含まれていて計算に失敗したり、データパイプラインの後段との通信に問題があり連携に失敗する、データストアへの格納に失敗するなど、レコードの処理に失敗する状況はいくらでも考えられる。
しかし、KCLはこのような失敗をリカバリする仕組みを持っていない。具体的には、ShardRecordProcessor#processRecords(ProcessRecordsInput)
メソッドにレコードが渡されたあと、仮にそのメソッドを例外終了したとしても、KCLは、すでにprocessRecordsに渡したレコードを、再びprocessRecordsに渡すなどの再処理を積極的に2行うことはない。
KCLではチェックポイント処理をユーザーコードで行う必要があるが、このチェックポイント処理は、あくまでも処理対象のシャードが異なるワーカープロセス間で引き継がれる際に、「どこまでのレコードが処理済みか?」を連絡するためのものであって、失敗したレコードを識別して再度取得・処理するための仕組みではない。3
従って、失敗したレコードの再処理はユーザーコードの責務である。単純な対策としてはprocessRecords()
メソッドの処理中で外部通信をリトライする。短期的な通信障害やシステムエラーであればこのような実装で多くが対処できると見込める。
しかし、外部通信先の長期的な(数分から数時間にわたるような)障害の場合には、短期的なリトライでは問題が解決しないことも多い。このような状況にも対処できるようにするには、外部のデータストレージに、処理に失敗したレコードを格納して、長い間隔でリトライするのが有効である。ストレージと呼んだが、例えばSQS4のようなメッセージキューでも構わないし、S35のようなストレージでも構わない。
いずれにしても重要なことは、正常系の処理で利用する通信先とは異なるサービスを使う6ことである。例えば正常系でLambda関数を直接実行しているなら異常系ではSQS経由でリトライ処理のLambdaを実行する、あるいは、正常系でSQSを経由したLambdaを実行しているなら、SQSを利用せずに直接リトライ処理のLambdaを実行する、またはS3へレコードを格納してS3トリガーでリトライ処理のLambdaを実行する、など。
マルチストリーム処理について
KCLアプリケーションはデフォルトでは単一のKinesisストリームのレコードを処理する。
一方で、マルチストリーム処理も設定でき、MultiStreamTracker
インスタンスの実装方法によっては、稼働中のKCLアプリケーションを再起動することなく、処理対象のストリームの追加や削除が可能。
マルチストリーム処理構成のアプリケーションと単一ストリーム処理構成のアプリケーションでは、Leaseテーブルのレコードが非互換なので、単純にアプリケーションコードを入れ替えることで移行することはできない。
マルチストリーム処理構成で、実質的に単一ストリーム処理構成にするには、処理対象のストリームの情報を固定で返すようにMultiStreamTracker
を実装すればよいだけなので、確実にマルチストリーム処理構成をとる可能性がない場合や、移行時にはストリーム処理を一時停止できる場合でなければ、マルチストリーム処理構成の実質的単一ストリーム処理構成をとっておくほうが安全に思える。
マルチストリーム処理構成での動的なストリーム追加や削除の運用は筆者に経験がないので詳しくは分からない。
KCL v2.xからv3.xへのマイグレーション
KCL v2とv3では、KCLアプリケーションが処理するシャードのワーカー(Schedulerインスタンス; 通常であればJavaアプリケーションインスタンス)割り当て(調整)アルゴリズムが変更されています。
簡単に言うと、v2ではワーカーごとのシャードの数が均一になるように調整されますが、v3ではワーカーごとのメトリクス7に応じたシャード数が割り当たるように調整されます。
アルゴリズムを変更する目的は、一部のワーカーが高負荷になるような状態を回避すること8です。
一部のワーカーが高負荷になってしまう場合、高負荷なときに必要なコンピューティングリソースをすべてのワーカーに割り当てる必要が生じ、負荷が低いワーカーに割り当てられたリソースは(不要にもかかわらず課金されるので)無駄になってしまいます。
しかし、レコードの処理は様々な可能性が考えられるので、KCL v3が報告するメトリクスやそれに基づくシャード割り当てアルゴリズムが、かえって負荷の偏りを誘発する可能性も否定できません。
そのため、KCL v3では、シャード割り当てアルゴリズムを安全に移行するための仕組みと、動的にロールバック・ロールフォワードするためのマイグレーションツール(Pythonスクリプト)が提供されています。
マイグレーションの流れは以下の図の通り
上記図の各状態は、KCLの依存バージョン、ConfigsBuilderインスタンスのCoordinatorConfigに設定するClientVersionConfigの値、CoordinatorStateテーブルのcv
属性(ClientVersionの略)の値で特徴づけられ、以下の表の通りになる。右端の列はそれぞれの状態で採用される負荷分散アルゴリズムを示している。
状態 | KCL Version | ClientVersionConfig | ClientVersion | 負荷分散アルゴリズム |
---|---|---|---|---|
開始前 | 2.x | - | - | シャード数 |
開始 | 3.x | COMPATIBLE_WITH_2X | INIT | シャード数 |
互換モード | 3.x | COMPATIBLE_WITH_2X | UPGRADE_FROM_2X | シャード数 |
ロールバック済 | 3.x | COMPATIBLE_WITH_2X | 2X | シャード数 |
ロールバック可能 | 3.x | COMPATIBLE_WITH_2X | 3X_WITH_ROLLBACK | 負荷メトリクス |
完了 | 3.x | 3X | 3X | 負荷メトリクス |
なお、ClientVersionConfigのデフォルト値は3X
なので、COMPATIBLE_WITH_2X
を明示的に設定しないと、以下の図のような遷移が発生する:
完了状態になるとマイグレーションツールはrollbackもrollforwardもできないので、アプリケーションをロールバックするほかなくなります。
ただし、アプリケーションが負荷分散アルゴリズムの影響をほとんど受けないと考えられる場合は、デプロイの回数を減らすことができるので、あえてこの流れを採用する9こともできます。
参考文献
- Amazon Kinesis Data Streams デベロッパーガイド内のKCLに関するページ: https://docs.aws.amazon.com/ja_jp/streams/latest/dev/kcl.html
- GitHubの Amazon Kinesis Client Library for Javaのページ: https://github.com/awslabs/amazon-kinesis-client
-
数値計算や集計に限らず、レコードの一部データの抽出や変形を含む。 ↩
-
結果的に同じレコードが2回以上処理されることはある。例えばチェックポイント処理を行わないままシャードの引き継ぎやアプリケーションインスタンスの再作成・引き継ぎが発生すると、新しく当該シャードを担当することになったアプリケーションはチェックポイント済みのレコードから後を全て処理するので、結果的に再処理されることはある。この点で、「同一レコードが2度処理されることはない」という前提をおくのもまた間違い。 ↩
-
KCLのドキュメントにも再処理の記述は一切ないので、ある意味で「レコードの再処理はしない」というのは当たり前に思えるかもしれない。しかし、「チェックポイント処理をしなければそのレコードは適切なタイミングでKCLが再度取得して
processRecords
メソッドに渡され、ユーザーコードで再処理される」という誤解に基づいてコーディング・構築されたサービスを改修した筆者の経験上、この点はとくに強調しておきたい。 ↩ -
Amazon Simple Queue Service ↩
-
Amazon Simple Storage Service ↩
-
同じサービスを使うと、サービス単位での障害のときは正常系と異常系で同じ障害の影響を受けてしまうため。 ↩
-
前提条件を満たした場合はCPU使用率、満たしていない場合ワーカーの処理するスループットを使用します; 詳しくはドキュメントを参照 ↩
-
ドキュメントでは「目標は、コンシューマーアプリケーションフリート全体でワークロードをより均等に分散し、単一のワーカーが過負荷にならないようにすることです。(The goal is to balance the workload more evenly across the consumer application fleet, preventing any single worker from becoming overloaded.)」と書かれています。 ↩
-
リスクを低減するには、実際の環境に近い負荷をかけた事前のリハーサルなども検討する必要があるでしょう。 ↩