Edited at

Spring Batchで勉強したことまとめ

SpringBatchで勉強したことをまとめてみましたので記事を書いてみます。

SpringBatchの環境構築については以下の記事をベースにさせていただきました。

Spring Boot + Spring Batchで簡単なバッチサービスを作ってみる


SpringBatch用のテーブルがないと怒られる

様々なサイトを参考に環境構築させていただいた際に構築した環境で実行してみると、例えば"BATCH_JOB_EXECUTION"などのテーブルが無いといって怒られました。

SpringBatchを動かすためには専用のテーブルを用意する必要がありそうです。

必要なテーブルについては以下を参照します。

JobRepositoryのメタデータスキーマ

しかし、これらを自分で挿入するのは大変そうです。

そこで、springでは様々なプラットフォームに合わせたsqlを用意してくれています。

「spring batch schema-〇〇(プラットフォーム名).sql」で検索するとヒットすると思います。

私はpostgresqlを使用しましたので「shema-postgresql.sql」を使用しました。

以下を参考にさせていただきました。

schema-postgresql.sql

schema-drop-postgresql.sql


アプリケーション実行時にテーブルを初期化する

アプリの動作を確認するためにアプリを何回も起動することになると思いますが、その度にデータを作成するのは大変だと思います。

そこで毎回テーブルを初期化することで対応します。

SpringBootにはアプリケーション実行時にテーブルを初期化するために仕組みが用意されています。

以下のサイトを参考にさせていただきました。

SpringBootのDB初期化方法

Spring Boot + PostgreSQLの設定方法

SpringBootのプロジェクト生成時に作成される「src/main/resources」配下に「schema-〇〇.sql」を配置すれば自動的に起動されるようです。

私はpostgresqlを使用しましたので、以下のように対応しました。


application.propertiesに以下を記述


application.properties

spring.datasource.driver-class-name=org.postgresql.Driver

spring.datasource.url=jdbc:postgresql://localhost:5432/testdb
#spring.datasource.username=postgres
#spring.datasource.password=postgres
spring.datasource.initialization-mode=always

SpringBootではDataSourceについて自動的にBean定義してくれているみたいで、エンジニアはapplication.propertiesにDB設定を記述するだけでよいみたいです。


SQLを用意する

SpringBatch用のテーブルがないと怒られる」のところで確認したことも合わせて、以下のようなSQLを用意しました。


schema-all.sql

DROP TABLE IF EXISTS people;

CREATE TABLE people (
person_id SERIAL NOT NULL PRIMARY KEY,
first_name VARCHAR(20),
last_name VARCHAR(20)
);

-- Autogenerated: do not edit this file
DROP TABLE IF EXISTS BATCH_STEP_EXECUTION_CONTEXT;
DROP TABLE IF EXISTS BATCH_JOB_EXECUTION_CONTEXT;
DROP TABLE IF EXISTS BATCH_STEP_EXECUTION;
DROP TABLE IF EXISTS BATCH_JOB_EXECUTION_PARAMS;
DROP TABLE IF EXISTS BATCH_JOB_EXECUTION;
DROP TABLE IF EXISTS BATCH_JOB_INSTANCE;

DROP SEQUENCE IF EXISTS BATCH_STEP_EXECUTION_SEQ ;
DROP SEQUENCE IF EXISTS BATCH_JOB_EXECUTION_SEQ ;
DROP SEQUENCE IF EXISTS BATCH_JOB_SEQ ;

CREATE TABLE BATCH_JOB_INSTANCE (
JOB_INSTANCE_ID BIGINT NOT NULL PRIMARY KEY ,
VERSION BIGINT ,
JOB_NAME VARCHAR(100) NOT NULL,
JOB_KEY VARCHAR(32) NOT NULL,
constraint JOB_INST_UN unique (JOB_NAME, JOB_KEY)
) ;

CREATE TABLE BATCH_JOB_EXECUTION (
JOB_EXECUTION_ID BIGINT NOT NULL PRIMARY KEY ,
VERSION BIGINT ,
JOB_INSTANCE_ID BIGINT NOT NULL,
CREATE_TIME TIMESTAMP NOT NULL,
START_TIME TIMESTAMP DEFAULT NULL ,
END_TIME TIMESTAMP DEFAULT NULL ,
STATUS VARCHAR(10) ,
EXIT_CODE VARCHAR(2500) ,
EXIT_MESSAGE VARCHAR(2500) ,
LAST_UPDATED TIMESTAMP,
JOB_CONFIGURATION_LOCATION VARCHAR(2500) NULL,
constraint JOB_INST_EXEC_FK foreign key (JOB_INSTANCE_ID)
references BATCH_JOB_INSTANCE(JOB_INSTANCE_ID)
) ;

CREATE TABLE BATCH_JOB_EXECUTION_PARAMS (
JOB_EXECUTION_ID BIGINT NOT NULL ,
TYPE_CD VARCHAR(6) NOT NULL ,
KEY_NAME VARCHAR(100) NOT NULL ,
STRING_VAL VARCHAR(250) ,
DATE_VAL TIMESTAMP DEFAULT NULL ,
LONG_VAL BIGINT ,
DOUBLE_VAL DOUBLE PRECISION ,
IDENTIFYING CHAR(1) NOT NULL ,
constraint JOB_EXEC_PARAMS_FK foreign key (JOB_EXECUTION_ID)
references BATCH_JOB_EXECUTION(JOB_EXECUTION_ID)
) ;

CREATE TABLE BATCH_STEP_EXECUTION (
STEP_EXECUTION_ID BIGINT NOT NULL PRIMARY KEY ,
VERSION BIGINT NOT NULL,
STEP_NAME VARCHAR(100) NOT NULL,
JOB_EXECUTION_ID BIGINT NOT NULL,
START_TIME TIMESTAMP NOT NULL ,
END_TIME TIMESTAMP DEFAULT NULL ,
STATUS VARCHAR(10) ,
COMMIT_COUNT BIGINT ,
READ_COUNT BIGINT ,
FILTER_COUNT BIGINT ,
WRITE_COUNT BIGINT ,
READ_SKIP_COUNT BIGINT ,
WRITE_SKIP_COUNT BIGINT ,
PROCESS_SKIP_COUNT BIGINT ,
ROLLBACK_COUNT BIGINT ,
EXIT_CODE VARCHAR(2500) ,
EXIT_MESSAGE VARCHAR(2500) ,
LAST_UPDATED TIMESTAMP,
constraint JOB_EXEC_STEP_FK foreign key (JOB_EXECUTION_ID)
references BATCH_JOB_EXECUTION(JOB_EXECUTION_ID)
) ;

CREATE TABLE BATCH_STEP_EXECUTION_CONTEXT (
STEP_EXECUTION_ID BIGINT NOT NULL PRIMARY KEY,
SHORT_CONTEXT VARCHAR(2500) NOT NULL,
SERIALIZED_CONTEXT TEXT ,
constraint STEP_EXEC_CTX_FK foreign key (STEP_EXECUTION_ID)
references BATCH_STEP_EXECUTION(STEP_EXECUTION_ID)
) ;

CREATE TABLE BATCH_JOB_EXECUTION_CONTEXT (
JOB_EXECUTION_ID BIGINT NOT NULL PRIMARY KEY,
SHORT_CONTEXT VARCHAR(2500) NOT NULL,
SERIALIZED_CONTEXT TEXT ,
constraint JOB_EXEC_CTX_FK foreign key (JOB_EXECUTION_ID)
references BATCH_JOB_EXECUTION(JOB_EXECUTION_ID)
) ;

CREATE SEQUENCE BATCH_STEP_EXECUTION_SEQ MAXVALUE 9223372036854775807 NO CYCLE;
CREATE SEQUENCE BATCH_JOB_EXECUTION_SEQ MAXVALUE 9223372036854775807 NO CYCLE;
CREATE SEQUENCE BATCH_JOB_SEQ MAXVALUE 9223372036854775807 NO CYCLE;


これで実行に必要なテーブルについては実行毎に初期化してくれるようになりました。


ジョブが無限ループする

Spring Batchでは基本的にReader,Processor,Writerクラスを用いて処理を行いますが、これらを自分で定義したときに無限ループが発生しました。

かなり手こずってしまいましたが、以下のサイトが参考になりました

Spring Batch チャンク管理された一連処理

SpringBatchではItemReaderがnullを返すまで処理がループするようです。ですので、処理が終わるとnullを返却するようにItemReaderを工夫して実装しなければならないようです。

以下のサイトではそのように実装されていました。

Spring Batch 独自のItemReaderの実装

上記のサイトを参考に実装したものは以下です。


PersonItemReader.java

import java.util.List;

import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.NonTransientResourceException;
import org.springframework.batch.item.ParseException;
import org.springframework.batch.item.UnexpectedInputException;
import org.springframework.beans.factory.annotation.Autowired;

public class PersonItemReader implements ItemReader<Person>{

private List<Person> people = null;
private int nextIndex;
private final PersonService service;

public PersonItemReader(PersonService service) {
this.service = service;
}

@Override
public Person read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
if (people == null) {
people = service.selectAll();
nextIndex = 0;
}
Person person = null;
if (nextIndex < people.size()) {
person = people.get(nextIndex);
nextIndex++;
}
return person;
}

}


定義したItemReaderはBean定義します。


BatchConfiguration.java

@Configuration

@EnableBatchProcessing
public class BatchConfiguration {

...(省略)...

@Autowired
PersonService personService;

@Bean
public PersonItemReader reader() {
return new PersonItemReader(personService);
}

...(省略)...

}



Reader・Processor・Writerの一般的な実装方法

SpringBatchではReaderやProcessor、Writerを使うのが一般的ですが、それぞれの一般的な実装方法があります。

デザインパターンのようなものだと思います。

以下では私が行ってしまった実装方法を記載します。

まずはReaderクラスです。


PersonItemReader.java

import java.util.ArrayList;

import java.util.List;

import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.NonTransientResourceException;
import org.springframework.batch.item.ParseException;
import org.springframework.batch.item.UnexpectedInputException;

public class PersonItemReader implements ItemReader<List<Person>>{

private final PersonService service;
private final PersonCheckService checkService;

public PersonItemReaderForTest(PersonService service, PersonCheckService checkService) {
this.service = service;
this.checkService = checkService;
}

@Override
public List<Person> read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
List<Person> people = service.selectAll();
List<Person> ret = null;
for(Person person : people) {
if(checkService.check(person)) {
if(ret == null)
ret = new ArrayList<Person>();
ret.add(person);
}
}
return ret;
}

}


DBから取得したものをListの形式でProcessorに渡しています。

続いてProcessorクラスです。


PersonItemProcessor.java

import java.util.ArrayList;

import java.util.List;

import org.springframework.batch.item.ItemProcessor;

public class PersonItemProcessor implements ItemProcessor<List<Person>, List<Person>> {

@Override
public List<Person> process(final List<Person> people) throws Exception {
List<Person> transformedPeople = new ArrayList<Person>();
for(Person person : people) {
final String firstName = person.getFirstName().toUpperCase();
final String lastName = person.getLastName().toUpperCase();
final Person transformedPerson = new Person(firstName, lastName);
transformedPeople.add(transformedPerson);
}
return transformedPeople;
}

}


Readerから渡されたListを加工して新たなListを返しています。

続いてWriterクラスです。


PersonItemWriter.java

import java.util.List;

import org.springframework.batch.item.ItemWriter;

public class PersonItemWriterForTest implements ItemWriter<Object>{

PersonService service;

public PersonItemWriterForTest(PersonService service) {
this.service = service;
}

@Override
public void write(List<? extends Object> items) throws Exception {
List<Person> people = (List<Person>) items.get(0);
for(Person person : people) {
service.updatePerson(person);
}

}

}


WriterクラスではProcessorから渡ってきたListをDBに登録しています。

しかし、特筆すべきはWriterクラスの次のコードです

List<Person> people = (List<Person>) items.get(0);

Listを得るために、パラメータのListからget(0)しています。

つまりパラメータにはList<List<Person>>が渡ってきていることになります。

これには少し違和感を感じました。

最初はパラメータとしてList<Person>が来るんだろうな、と思って実装していたら挙動がおかしなことになったので気づきました。

なんでだろうと思って検索してみると同じような疑問を持った人がいました。

Spring Batch - Using an ItemWriter with List of Lists

上記のサイトには次のようなことが書いてあります。


Typically, the design pattern is:


Reader -> reads something, returns ReadItem

Processor -> ingests ReadItem, returns ProcessedItem

Writer -> ingests List<ProcessedItem>


If your processor is returning List<Object>, then you need your Writer to expect List<List<Object>>.


典型的はデザインパターンはReader、Processorが単一のitemを返し、WriterはitemのListを処理すると言っています。

バッチ処理を行うわけですから、当然DBから複数データ取得したものをProcessorにそのままListの形式で渡すのが普通だろうと思っていました。

しかし、WriterではProcessorが単一で返却したオブジェクトをListに格納された状態で受け取ることができるようです。

ですから、Processorで単一のオブジェクトを返却することで、上記のようなitems.get(0)という処理をする必要が無くなります。

Readerも「ジョブが無限ループする」のところで紹介したような単一のオブジェクトを返却するような実装方法が一般的のようですね。


ジョブやステップをテストする

SpringBatchではジョブやステップをテストするための仕組みが用意されています。

テストを行うためにはJobLauncherTestUtilsを使います。

以下のサイトを参考にさせていただきました。

Unit Testing

まず、JobLauncherTestUtilsを使うためにBean定義を行います。


BatchConfigurationForTest.java

import javax.sql.DataSource;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider;
import org.springframework.batch.item.database.JdbcBatchItemWriter;
import org.springframework.batch.item.database.builder.JdbcBatchItemWriterBuilder;
import org.springframework.batch.test.JobLauncherTestUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jdbc.datasource.DriverManagerDataSource;

@Configuration
@EnableBatchProcessing
public class BatchConfigurationForTest {

@Autowired
public JobBuilderFactory jobBuilderFactory;

@Autowired
public StepBuilderFactory stepBuilderFactory;

@Autowired
JobLauncher jobLauncher;

@Autowired
JobRepository jobRepository;

@Bean
public DataSource dataSource() {
DriverManagerDataSource dataSource = new DriverManagerDataSource();
dataSource.setDriverClassName("org.postgresql.Driver");
dataSource.setUrl("jdbc:postgresql://localhost:5432/ec");
// dataSource.setUsername(username);
// dataSource.setPassword(password);
return dataSource;
}

@Bean
PersonService personService() {
return new PersonService();
};

@Bean
public PersonItemReader reader() {
return new PersonItemReader(personService());
}

@Bean
public PersonItemProcessor processor() {
return new PersonItemProcessor();
}

@Bean
public JdbcBatchItemWriter<Person> writer(DataSource dataSource) {
return new JdbcBatchItemWriterBuilder<Person>()
.itemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>())
.sql("INSERT INTO people (first_name, last_name) VALUES (:firstName, :lastName)")
.dataSource(dataSource)
.build();
}

@Bean
public NoWorkFoundStepExecutionListener noWorkFoundStepExecutionListener() {
return new NoWorkFoundStepExecutionListener();
}

@Bean
public Job importUserJob(Step step1) {
return jobBuilderFactory.get("importUserJob")
.incrementer(new RunIdIncrementer())
.flow(step1)
.end()
.build();
}

@Bean
public Step step1(NoWorkFoundStepExecutionListener listener, JdbcBatchItemWriter<Person> writer) {
return stepBuilderFactory.get("step1")
.<Person, Person> chunk(1)
.reader(reader())
.processor(processor())
.writer(writer)
.listener(listener)
.build();
}

@Bean
public JobLauncherTestUtils jobLauncherTestUtils() {
JobLauncherTestUtils utils = new JobLauncherTestUtils();
utils.setJob(importUserJob(step1(noWorkFoundStepExecutionListener(), writer(dataSource()))));
utils.setJobLauncher(jobLauncher);
utils.setJobRepository(jobRepository);
return utils;
}
}


一番下にJobLauncherTestUtilsをBean定義しています。

上記はテスト用に新しくConfigurationクラスを定義したものになります。

内容自体はConfigurationクラスをさほど変わりません。

SpringBatchではステップの後に「リスナー」というものを設定し、何か処理を行うことができますが、リスナーのテストも行えるので、一緒にBean定義しております(リスナークラスについては上記参考サイトに記載されています)。

続いてテストクラスです。

まずはジョブをテストするクラスです。


JobTest.java

import static org.hamcrest.CoreMatchers.*;

import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.test.JobLauncherTestUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(classes=BatchConfigurationForTest.class)
public class JobTest {

@Autowired
private JobLauncherTestUtils jobLauncherTestUtils;

@Test
public void testJob() throws Exception {
JobExecution jobExecution = jobLauncherTestUtils.launchJob();
Assert.assertThat("COMPLETED", is(jobExecution.getExitStatus().getExitCode()));
}

}


テストクラスではBean定義したJobLauncherTestUtilsをAutowiredしてきます。

ジョブの実行はlaunchJobで行います。

どのジョブを実行するかはBean定義時に指定しています。

続いてステップをテストするクラスです。


StepTest.java

import static org.hamcrest.CoreMatchers.*;

import static org.junit.Assert.*;

import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.test.JobLauncherTestUtils;
import org.springframework.batch.test.MetaDataInstanceFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(classes=BatchConfigurationForTest.class)
public class StepTest {

@Autowired
private JobLauncherTestUtils jobLauncherTestUtils;

@Autowired
NoWorkFoundStepExecutionListener tested;

@Test
public void testStep() {
JobExecution jobExecution = jobLauncherTestUtils.launchStep("step1");
Assert.assertThat("COMPLETED", is(jobExecution.getExitStatus().getExitCode()));
}

@Test
public void testAfterStep() {
StepExecution stepExecution = MetaDataInstanceFactory.createStepExecution();

stepExecution.setExitStatus(ExitStatus.COMPLETED);
stepExecution.setReadCount(0);

ExitStatus exitStatus = tested.afterStep(stepExecution);
assertThat(ExitStatus.FAILED.getExitCode(), is(exitStatus.getExitCode()));
}
}


ステップのテストを行う際にはlaunchStep()の引数に実行するステップの名前を渡します。

testAfterStep()ではBean定義したリスナーをテストしています。

setReadCount()はReaderクラスが読み取ったアイテムの数を表しています。

参考サイトに記載のNoWorkFoundStepExecutionListenerではgetReadCount() == 0 のとき、ExisStatus.FAILEDを返却するように実装されています。

以上でSpringBatchのまとめは終了です。