はじめに
Spring BatchのChankモデルではDBからデータを一括で取得するReader, 取得したデータに対しビジネスロジックを追加するProcessor, DBへデータの更新を行うWriterの3層構造になっています。WriterにSQLを渡してデータの永続化を行う方法では、同時に2テーブルの更新ができないので、今回Writerで2つのテーブルの更新を同時に行う方法を記述したいと思います。
シナリオ
実際の例題があった方が書きやすいので...
例えば、とあるサービスの会員申請フォームから新規会員登録の申請情報を格納するテーブルregistration
と
会員テーブルuser
を用意して、registration
からビジネスロジックを加えてuser
へ一括INSERTをすることを考えます。
めちゃめちゃシンプルにテーブルは
registration
テーブル(会員登録申請テーブル)
user_name, // ユーザの名前
status // 処理ステータス(未処理0, 完了1, エラー9)
user
テーブル(会員テーブル)
id, // ID
name // 名前
各層で行いたいことは、
reader
: registrationテーブルからstatus = 0(未処理)のレコードを全て取得する
processor
: それらのレコードからuserテーブルのカラムを作成
writer
: userテーブルのカラムをDBに永続化する&永続化できたらregistration.status = 1(完了)に、できなければ9(エラー)で更新する
とします。
実装
コードはKotlinです。
まずはBatchJob用のConfiguration
@Configuration
class JobConfiguration(
private val jobBuilderFactory: JobBuilderFactory,
private val stepBuilderFactory: StepBuilderFactory,
private val userRepository: UserRepository,
private val registrationRepsitory: RegistrationRepository
) {
@Bean
fun Job(
reader: RepositoryItemReader<Registration>,
writer: ItemWriter<User>,
): Job {
return jobBuilderFactory
.get("job")
.start(step())
.build()
}
@Bean
fun step(
reader: RepositoryItemReader<Registration>,
writer: ItemWriter<User>
): Step {
return stepBuilderFactory
.get("step")
.chunk<Registration, User>(CHUNK_SIZE)
.reader(reader)
.processor(processor())
.writer(writer)
.build()
}
@Bean
fun reader(): RepositoryItemReader<Registration> {
// readerは以下のように適切に設定すれば自前で用意しなくても大丈夫です
return RepositoryItemReaderBuilder<Registration>()
.name("reader")
// JPARepositoryを使用してstatusが未処理のデータを取得
.repository(registrationRepository)
.methodName("findAllByStatus")
.arguments(listOf(NOT_COMPLETED))
.pageSize(CHUNK_SIZE)
.build()
}
@Bean
fun processor(): ItemProcessor<Registration, Pair<User, Registration>> {
// processorは自作します
// registration から userと、それの元になったregistrationをPairにしてwriterに送ります
return MyProcessor()
}
@Bean
fun writer(): ItemWriter<Pair<User, Registration>> {
// writerも自作します
// userとregistrationを更新します
return MyWriter(userRepository, registrationRepository)
}
}
MyProcessor
以下のようにProcessorを継承、処理本体の関数をoverrideすれば、柔軟にロジックを設計することができます。
class MyProcessor() : ItemProcessor<Registration, Pair<User, Registration>> {
override fun process(registration: Registration) {
val returnUser = User(0, registration.user_name)
registration.status = COMPLETED
return Pair(returnUser, registration)
}
MyWriter
Writerも同様。データの永続化はJpaRepositoryの機能(.saveメソッド)を使用します。
Userテーブルに正しくデータの更新を行えた場合のみ、対応するRegistrationテーブルの更新を行います。
class MyWriter(
private val userRepository: UserRepository
private val registrationRepository: RegistrationRepository
): ItemWriter<Pair<User, Registration>> {
override fun writer(items: MutableList<Pair<User, Registration>>) {
items.forEach {
try {
userRepository.save(it.first)?.apply {
// userの更新が完了した場合のみregistrationも更新
registrationRepository.save(it.second)
}
} catch(e: Exception) {
log.error("CANNOT SAVE!")
}
}
}
実行
このまま実行しても以下のエラーが出る場合があります。
2021-03-28 14:30:25.553 ERROR 25908 --- [ main] MyWriter : Could not open JPA EntityManager for transaction; nested exception is java.lang.IllegalStateException: Already value [org.springframework.jdbc.datasource.ConnectionHolder@63814bbe] for key [HikariDataSource (HikariPool-1)] bound to thread [main]
2021-03-28 14:30:25.553 ERROR 25908 --- [ main] MyWriter : Pre-bound JDBC Connection found! JpaTransactionManager does not support running within DataSourceTransactionManager if told to manage the DataSource itself. It is recommended to use a single JpaTransactionManager for all transactions on a single DataSource, no matter whether JPA or JDBC access.
2021-03-28 14:30:25.553 ERROR 25908 --- [ main] MyWriter : Pre-bound JDBC Connection found! JpaTransactionManager does not support running within DataSourceTransactionManager if told to manage the DataSource itself. It is recommended to use a single JpaTransactionManager for all transactions on a single DataSource, no matter whether JPA or JDBC access.
2021-03-28 14:30:25.553 ERROR 25908 --- [ main] MyWriter : Pre-bound JDBC Connection found! JpaTransactionManager does not support running within DataSourceTransactionManager if told to manage the DataSource itself. It is recommended to use a single JpaTransactionManager for all transactions on a single DataSource, no matter whether JPA or JDBC access.
Upgrading JobExecution status: StepExecution: id=210, version=2, name=step, status=FAILED, exitStatus=FAILED, readCount=4, filterCount=0, writeCount=0 readSkipCount=0, writeSkipCount=0, processSkipCount=0, commitCount=0, rollbackCount=1, exitDescription=org.springframework.batch.core.step.FatalStepExecutionException: JobRepository failure forcing rollback
at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:464)
at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:331)
at org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:140)
at org.springframework.batch.core.step.tasklet.TaskletStep$2.doInChunkContext(TaskletStep.java:273)
at org.springframework.batch.core.scope.context.StepContextRepeatCallback.doInIteration(StepContextRepeatCallback.java:82)
at org.springframework.batch.repeat.support.RepeatTemplate.getNextResult(RepeatTemplate.java:375)
at org.springframework.batch.repeat.support.RepeatTemplate.executeInternal(RepeatTemplate.java:215)
at org.springframework.batch.repeat.support.RepeatTemplate.iterate(RepeatTemplate.java:145)
at org.springframework.batch.core.step.tasklet.TaskletStep.doExecute(TaskletStep.java:258)
at org.springframework.batch.core.step.AbstractStep.execute(AbstractStep.java:208)
at org.springframework.batch.core.job.SimpleStepHandler.handleStep(SimpleStepHandler.java:148)
at org.springframework.batch.core.job.AbstractJob.handleStep(AbstractJob.java:410)
at org.springframework.batch.core.job.SimpleJob.doExecute(SimpleJob.java:136)
at org.springframework.batch.core.job.AbstractJob.execute(AbstractJob.java:319)
at org.springframework.batch.core.launch.support.SimpleJobLauncher$1.run(SimpleJobLauncher.java:147)
at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50)
at org.springframework.batch.core.launch.support.SimpleJobLauncher.run(SimpleJobLauncher.java:140)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
...
Caused by: org.springframework.transaction.CannotCreateTransactionException: Could not open JDBC Connection for transaction; nested exception is java.lang.IllegalStateException: Session/EntityManager is closed
...
Caused by: java.lang.IllegalStateException: Session/EntityManager is closed
既にJpaRepositoryとDBとの接続が存在し、そちらで使用しているtransactionManagerとデフォルトのSpringBatchで使用されるtransactionManagerとで競合する場合があります。
その場合には、Stepに対し、そのStep中で使用するtransactionManagerを指定してあげることで解消します。
@Configuration
class JobConfiguration(
private val jobBuilderFactory: JobBuilderFactory,
private val stepBuilderFactory: StepBuilderFactory,
private val userRepository: UserRepository,
private val registrationRepsitory: RegistrationRepository
private val jpaTransactionManager: JpaTransactionManager
) {
...
@Bean
fun step(
reader: RepositoryItemReader<Registration>,
writer: ItemWriter<User>
): Step {
return stepBuilderFactory
.get("step")
.chunk<Registration, User>(CHUNK_SIZE)
.reader(reader)
.processor(processor())
.writer(writer)
.transactionManager(jpaTransactionManager)
.build()
}