AWS Kinesis Stream APIで記載されているShard iterator typeのうち、KCL(Kinesis Client Liblary)に標準で用意されているのは LATEST
と TRIM_HORIZON
ですが、これの挙動について気になる点があったので調査してみました。途中の訳文は適当です。
LATEST
原義では
(原文)LATEST - Start reading just after the most recent record in the shard, so that you always read the most recent data in the shard.
(訳)LATEST - シャード内の最新レコードの直後から読み取りを開始します。なのでシャード内の最新のデータを常に読みます。
となっています。
実際の挙動としてはKCLはまずDynamoDBにアプリケーション名のテーブルが存在することを確認します。Streamに対してあるアプリケーション名で初めて取得を開始するとき、KCLはDynamoDBにアプリケーション名のテーブルが作成し、シャードIDがhashPkとなるレコードが作成されます。その時、 checkpoint
に LATEST
が書き込まれます。
その後、KCLは checkpoint
を見に行き、LATESTと書いて書いてあればApp起動後に初めて入ってきたデータから取得を開始します。この設定は一度アプリケーションを終了したあとも変わらず、アプリケーションが動いてない間にデータがシャードに入ってきてから起動してもそのデータは取得されません。
TRIM_HORIZON
原義では
(原文) TRIM_HORIZON - Start reading at the last untrimmed record in the shard in the system, which is the oldest data record in the shard.
(訳) TRIM_HORIZON - シャード内に存在する最初のデータから読み込みを開始します。
となっています。
実際の挙動では、初めてのアプリケーションであればLATESTの時と変わらなく、 checkpoint
が TRIM_HORIZON
になるぐらいの違いです。
KCLはアプリケーションを起動したあと、DynamoDBの checkpoint
を見に行き、 TRIM_HORIZON
であれば、シャード内のデータを全件取得してきます。 この設定は一度アプリケーションを終了したあとも変わらず、アプリケーションを再起動したあともシャード内のデータを必ず全件取得するようになっています。
AT_SEQUENCE_NUMBER
と AFTER_SEQUENCE_NUMBER
は?
結論からいうとKCLでは実装されていないようです。もしかしたらKCLでやる方法があるかもしれませんが、AWS SDKに実装されている com.amazonaws.services.kinesis.model
の GetShardIteratorRequest
Class の setStartingSequenceNumber(String startingSequenceNumber)
に当たるメソッドは見つかりませんでした。
検証中、DynamoDBの checkpoint
に シャード内に存在するレコードのSequenceNumber(ex: 00000221946949406733343604294600004294483820243067076610
) で上書きしたところ、 AFTER_SEQUENCE_NUMBER
とおなじ挙動になりました。ただし、再起動してもそのSequenceNumberが書き換わることは無いので取り扱いには注意が必要です。
結論
ドキュメントに書いてあるとおりの挙動だったのはわかりましたが、アプリケーションを停止している間にきたレコードをどう扱うかという点に関しては実装者側で何かしら案を練らないとダメっぽいのでそこら辺の知見が必要になります。さてさてどうしようか。