0
0

More than 3 years have passed since last update.

[Spring Batch] ItemWriter内でJPARepositoryを使用してデータの更新を行う

Posted at

はじめに

Spring BatchのChankモデルではDBからデータを一括で取得するReader, 取得したデータに対しビジネスロジックを追加するProcessor, DBへデータの更新を行うWriterの3層構造になっています。WriterにSQLを渡してデータの永続化を行う方法では、同時に2テーブルの更新ができないので、今回Writerで2つのテーブルの更新を同時に行う方法を記述したいと思います。

シナリオ

実際の例題があった方が書きやすいので...
例えば、とあるサービスの会員申請フォームから新規会員登録の申請情報を格納するテーブルregistration
会員テーブルuser を用意して、registrationからビジネスロジックを加えてuserへ一括INSERTをすることを考えます。
めちゃめちゃシンプルにテーブルは

registrationテーブル(会員登録申請テーブル)

user_name, // ユーザの名前
status     // 処理ステータス(未処理0, 完了1, エラー9)

userテーブル(会員テーブル)

id,   // ID
name // 名前

各層で行いたいことは、

reader: registrationテーブルからstatus = 0(未処理)のレコードを全て取得する
processor: それらのレコードからuserテーブルのカラムを作成
writer: userテーブルのカラムをDBに永続化する&永続化できたらregistration.status = 1(完了)に、できなければ9(エラー)で更新する

とします。

実装

コードはKotlinです。
まずはBatchJob用のConfiguration

@Configuration
class JobConfiguration(
        private val jobBuilderFactory: JobBuilderFactory,
        private val stepBuilderFactory: StepBuilderFactory,
        private val userRepository: UserRepository,
        private val registrationRepsitory: RegistrationRepository
) {
    @Bean
    fun Job(
            reader: RepositoryItemReader<Registration>,
            writer: ItemWriter<User>,
    ): Job {
        return jobBuilderFactory
                .get("job")
                .start(step())
                .build()
    }

    @Bean
    fun step(
            reader: RepositoryItemReader<Registration>,
            writer: ItemWriter<User>
    ): Step {
        return stepBuilderFactory
                .get("step")
                .chunk<Registration, User>(CHUNK_SIZE)
                .reader(reader)
                .processor(processor())
                .writer(writer)
                .build()
    }

    @Bean 
    fun reader(): RepositoryItemReader<Registration> {
        // readerは以下のように適切に設定すれば自前で用意しなくても大丈夫です
        return RepositoryItemReaderBuilder<Registration>()
                .name("reader")
                // JPARepositoryを使用してstatusが未処理のデータを取得
                .repository(registrationRepository)
                .methodName("findAllByStatus")
                .arguments(listOf(NOT_COMPLETED))
                .pageSize(CHUNK_SIZE)
                .build()
    }

    @Bean
    fun processor(): ItemProcessor<Registration, Pair<User, Registration>> {
        // processorは自作します
        // registration から userと、それの元になったregistrationをPairにしてwriterに送ります
        return MyProcessor()
    }

    @Bean 
    fun writer(): ItemWriter<Pair<User, Registration>> {
        // writerも自作します
        // userとregistrationを更新します
        return MyWriter(userRepository, registrationRepository)
    }
}

MyProcessor
以下のようにProcessorを継承、処理本体の関数をoverrideすれば、柔軟にロジックを設計することができます。

class MyProcessor() : ItemProcessor<Registration, Pair<User, Registration>> {
    override fun process(registration: Registration) {
        val returnUser = User(0, registration.user_name)
        registration.status = COMPLETED
        return Pair(returnUser, registration)
}

MyWriter
Writerも同様。データの永続化はJpaRepositoryの機能(.saveメソッド)を使用します。
Userテーブルに正しくデータの更新を行えた場合のみ、対応するRegistrationテーブルの更新を行います。

class MyWriter(
    private val userRepository: UserRepository
    private val registrationRepository: RegistrationRepository
): ItemWriter<Pair<User, Registration>> {
    override fun writer(items: MutableList<Pair<User, Registration>>) {
        items.forEach {
            try {
                userRepository.save(it.first)?.apply {
                    // userの更新が完了した場合のみregistrationも更新
                    registrationRepository.save(it.second)
                }
            } catch(e: Exception) {
                log.error("CANNOT SAVE!")
        }
    }   
}

実行

このまま実行しても以下のエラーが出る場合があります。

2021-03-28 14:30:25.553 ERROR 25908 --- [           main] MyWriter : Could not open JPA EntityManager for transaction; nested exception is java.lang.IllegalStateException: Already value [org.springframework.jdbc.datasource.ConnectionHolder@63814bbe] for key [HikariDataSource (HikariPool-1)] bound to thread [main]
2021-03-28 14:30:25.553 ERROR 25908 --- [           main] MyWriter : Pre-bound JDBC Connection found! JpaTransactionManager does not support running within DataSourceTransactionManager if told to manage the DataSource itself. It is recommended to use a single JpaTransactionManager for all transactions on a single DataSource, no matter whether JPA or JDBC access.
2021-03-28 14:30:25.553 ERROR 25908 --- [           main] MyWriter : Pre-bound JDBC Connection found! JpaTransactionManager does not support running within DataSourceTransactionManager if told to manage the DataSource itself. It is recommended to use a single JpaTransactionManager for all transactions on a single DataSource, no matter whether JPA or JDBC access.
2021-03-28 14:30:25.553 ERROR 25908 --- [           main] MyWriter : Pre-bound JDBC Connection found! JpaTransactionManager does not support running within DataSourceTransactionManager if told to manage the DataSource itself. It is recommended to use a single JpaTransactionManager for all transactions on a single DataSource, no matter whether JPA or JDBC access.

 Upgrading JobExecution status: StepExecution: id=210, version=2, name=step, status=FAILED, exitStatus=FAILED, readCount=4, filterCount=0, writeCount=0 readSkipCount=0, writeSkipCount=0, processSkipCount=0, commitCount=0, rollbackCount=1, exitDescription=org.springframework.batch.core.step.FatalStepExecutionException: JobRepository failure forcing rollback
        at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:464)
        at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:331)
        at org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:140)
        at org.springframework.batch.core.step.tasklet.TaskletStep$2.doInChunkContext(TaskletStep.java:273)
        at org.springframework.batch.core.scope.context.StepContextRepeatCallback.doInIteration(StepContextRepeatCallback.java:82)
        at org.springframework.batch.repeat.support.RepeatTemplate.getNextResult(RepeatTemplate.java:375)
        at org.springframework.batch.repeat.support.RepeatTemplate.executeInternal(RepeatTemplate.java:215)
        at org.springframework.batch.repeat.support.RepeatTemplate.iterate(RepeatTemplate.java:145)
        at org.springframework.batch.core.step.tasklet.TaskletStep.doExecute(TaskletStep.java:258)
        at org.springframework.batch.core.step.AbstractStep.execute(AbstractStep.java:208)
        at org.springframework.batch.core.job.SimpleStepHandler.handleStep(SimpleStepHandler.java:148)
        at org.springframework.batch.core.job.AbstractJob.handleStep(AbstractJob.java:410)
        at org.springframework.batch.core.job.SimpleJob.doExecute(SimpleJob.java:136)
        at org.springframework.batch.core.job.AbstractJob.execute(AbstractJob.java:319)
        at org.springframework.batch.core.launch.support.SimpleJobLauncher$1.run(SimpleJobLauncher.java:147)
        at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50)
        at org.springframework.batch.core.launch.support.SimpleJobLauncher.run(SimpleJobLauncher.java:140)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.base/java.lang.reflect.Method.invoke(Method.java:566)
...
Caused by: org.springframework.transaction.CannotCreateTransactionException: Could not open JDBC Connection for transaction; nested exception is java.lang.IllegalStateException: Session/EntityManager is closed
...
Caused by: java.lang.IllegalStateException: Session/EntityManager is closed

既にJpaRepositoryとDBとの接続が存在し、そちらで使用しているtransactionManagerとデフォルトのSpringBatchで使用されるtransactionManagerとで競合する場合があります。
その場合には、Stepに対し、そのStep中で使用するtransactionManagerを指定してあげることで解消します。

@Configuration
class JobConfiguration(
        private val jobBuilderFactory: JobBuilderFactory,
        private val stepBuilderFactory: StepBuilderFactory,
        private val userRepository: UserRepository,
        private val registrationRepsitory: RegistrationRepository
        private val jpaTransactionManager: JpaTransactionManager
) {
...
   @Bean
    fun step(
            reader: RepositoryItemReader<Registration>,
            writer: ItemWriter<User>
    ): Step {
        return stepBuilderFactory
                .get("step")
                .chunk<Registration, User>(CHUNK_SIZE)
                .reader(reader)
                .processor(processor())
                .writer(writer)
                .transactionManager(jpaTransactionManager)
                .build()
    }
0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0