Kafka StreamsにはWindow Aggregationという仕組みがある。
特定の時間間隔毎にイベントの数を数えたり等を行うための機能だ。
で、それを利用してユーザー毎のイベント実行回数をカウントする処理を書いていたのだが、何故か結果が合わない。
テストコードでは再現しないし確率的に結果がズレるという状況が発生した。
物凄くハマったが、原因は未来のtimestampが来た時の処理にあった。
検証バージョン
- kafka-streams-2.4.0
問題の詳細
一度書き込んでバックエンドのKafkaトピックまでデータが届いているにも関わらず、データを再度取得しようとした時に既存の集計カウントが取得できない。
そのためカウントがリセットされるし、他の箇所でも結果が取得できない状態になっていた。
- 確実に
put
処理は行われている - 紐付いているKafkaのトピックには確実にデータが届いている
原因: RocksDBのSegmentが未来のtimestampの影響でexpireしていた
Kafka StreamsのWindowStoreにはretention periodが存在する。retention periodを過ぎたデータは削除対象になる。
以下の例で言うと30日が保持期間になる。
private StoreBuilder<WindowStore<String, Counter>> counterStore =
Stores.windowStoreBuilder(
Stores.persistentWindowStore(
"counter",
Duration.ofDays(30),
Duration.ofHours(24),
false),
Serdes.String(),
Serdes.Integer())
.withCachingEnabled();
また、WindowStoreは内部でSegmentという単位でデータディレクトリを分けている。
Segmentの間隔はretention periodの半分の時間間隔を利用する。(最短1分)
大まかに表現すると以下の様な動きを示す。
- timestampと共にputを実行する
- timestampとsegment intervalからsegment idを算出する
@Override
public String segmentName(final long segmentId) {
// (1) previous format used - as a separator so if this changes in the future
// then we should use something different.
// (2) previous format used : as a separator (which did break KafkaStreams on Windows OS)
// so if this changes in the future then we should use something different.
return name + "." + segmentId * segmentInterval;
}
@Override
public long segmentId(final long timestamp) {
return timestamp / segmentInterval;
}
- 対象のSegmentを特定しそこにデータをputする
@Override
public void put(final Bytes key,
final byte[] value) {
final long timestamp = keySchema.segmentTimestamp(key);
observedStreamTime = Math.max(observedStreamTime, timestamp);
final long segmentId = segments.segmentId(timestamp);
final S segment = segments.getOrCreateSegmentIfLive(segmentId, context, observedStreamTime);
if (segment == null) {
expiredRecordSensor.record();
LOG.debug("Skipping record for expired segment.");
} else {
segment.put(key, value);
}
}
- その際、受け取ったことのある最も未来のtimestampを
observedStreamTime
として利用し、そこを基準にgetOrCreateSegmentIfLive
を実行する。
そしてその処理の中でobservedStreamTime
からretentioPeriod
を引いた時間よりも前のSegmentをcleanupする。
cleanup対象のsegmentは削除フラグを付与されてcloseされる。
@Override
public S getOrCreateSegmentIfLive(final long segmentId,
final InternalProcessorContext context,
final long streamTime) {
final long minLiveTimestamp = streamTime - retentionPeriod;
final long minLiveSegment = segmentId(minLiveTimestamp);
final S toReturn;
if (segmentId >= minLiveSegment) {
// The segment is live. get it, ensure it's open, and return it.
toReturn = getOrCreateSegment(segmentId, context);
} else {
toReturn = null;
}
cleanupEarlierThan(minLiveSegment);
return toReturn;
}
private void cleanupEarlierThan(final long minLiveSegment) {
final Iterator<Map.Entry<Long, S>> toRemove =
segments.headMap(minLiveSegment, false).entrySet().iterator();
while (toRemove.hasNext()) {
final Map.Entry<Long, S> next = toRemove.next();
toRemove.remove();
final S segment = next.getValue();
segment.close();
try {
segment.destroy();
} catch (final IOException e) {
log.error("Error destroying {}", segment, e);
}
}
}
もし、ここにかなり未来のtimestampを持ったレコードが届いた場合、observedStreamTime
が未来の時刻に固定されてしまい、そこからretention periodを引いた過去のStateStoreは全てcloseされて利用できなくなる!
結論
つまり、retention periodを越える程の未来のtimestampを一度でも書き込んでしまうと、それより過去の結果を取得することも書き込むことも不可能になる。
その後に正しい今のtimestampのレコードが来たとしてもそれは利用できない。
もし、上記の例で1ヶ月以上未来のtimestampを持ったレコードが来たら、同じパーティションを利用しているレコードのカウントは壊れることになる。
しかも、この状況が起きた際にエラーログや警告の類は一切無い。
現時点では、対象になっている集計対象のtimestampのズレが信用できない場合、一定以上未来のtimestampを持っているレコードを除外する処理が必ず必要になる。
(というか信用できてもガードしておいた方が良い)
正直、滅茶苦茶ハマったし機能全般の信頼性に対して物凄く不安になったが、何とか原因が確定できて本当に良かった。
Kafka Streamsはtimestampの扱いに依存した仕組みが他にもいくつかあるので、時計が大きくズレたデータが届く可能性のあるシステムは扱いに注意した方が良いだろう。