結論
以下のページの一番最後に記載がある通り、「ItemProcessorはべき等になるように実装する必要がある」という話です。
起きたこと
Spring Batchで次のような実装をしたところ、無限ループが発生しました。
※事象を再現するための簡略化した処理です
@Component
public class DemoBatchProcessor implements ItemProcessor<MstUser, MstUser> {
@Override
public MstUser process(MstUser item) throws Exception {
LocalDateTime now = LocalDateTime.now();
item.setExpireDate(now);
return item;
}
}
@Component
public class DemoBatchWriter implements ItemWriter<MstUser> {
@Override
public void write(Chunk<? extends MstUser> chunk) throws Exception {
chunk.forEach(item ->{
System.out.println(item);
});
throw new Exception(); // 例外を発生させる
}
}
@Bean("demoBatchChunkStep")
Step demoBatchChunkStep(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
return new StepBuilder("demoBatchChunkStep", jobRepository).<MstUser, MstUser> chunk(1, transactionManager)
.reader(demoBatchReader).processor(demoBatchProcessor).writer(demoBatchWriter)
.faultTolerant() //フォールトトレランスを設定
.build();
}
なぜ起きるのか
チャンクがロールバックされると、読み取り中にキャッシュされたアイテムが再処理される場合があります。ステップが耐障害性を持つように構成されている場合 (通常、スキップまたは再試行処理を使用することによって)、使用される ItemProcessor はべき等になるように実装する必要があります。通常、これは、ItemProcessor の入力項目に対して変更を実行せず、結果であるインスタンスのみを更新することで構成されます。
上記の処理ではWriterの処理で例外を投げているため、チャンクがロールバックされます。
ドキュメントによると、その場合にキャッシュされたアイテムが再処理されることがあるようです。
実際、デバッグをおいてみると、ItemProcessorが再度呼び出されることがわかります。
今回の処理では、ItemProcessorの中で、Itemに対してLocalDateTime.now()をセットしています。
つまり、再処理されたときは、初回の処理とはItemの中身が変わっています。
MstUser(userId=1, userName=田中太郎, age=21, expireDate=2024-05-25T01:25:01.426943700)
MstUser(userId=1, userName=田中太郎, age=21, expireDate=2024-05-25T01:25:01.437740600)
MstUser(userId=1, userName=田中太郎, age=21, expireDate=2024-05-25T01:25:01.441745100)
MstUser(userId=1, userName=田中太郎, age=21, expireDate=2024-05-25T01:25:01.444741200)
・・・
すると、同じItemのリトライは何回まで、といった設定をしていたとしても、そもそも同じItemとしてみなされず、無限ループが発生してしまうようです。
※同じItemとしてみなされずに無限ループが発生する、の根拠となる部分は、ドキュメント内に明確な記載を見つけられませんでした。
どうすればいいか
ドキュメントにある通り、ItemProcessorの中では、Itemに対して直接変更をかけるのではなく、新しいインスタンスを作成してWriterに渡す必要があります。
@Component
public class DemoBatchProcessor implements ItemProcessor<MstUser, MstUser> {
@Override
public MstUser process(MstUser item) throws Exception {
// 新しいインスタンスを作成
MstUser mstUser = new MstUser();
mstUser.setUserId(item.getUserId());
︙
LocalDateTime now = LocalDateTime.now();
mstUser.setExpireDate(now);
return mstUser;
}
}
これで、無限ループが発生せずに、リトライやエラーの処理を行うことができます。
事象発生からドキュメントの記載にたどり着くまでかなり時間がかかりました…。