Spring Batchを業務で使用する機会があり調べていたところ、最新のバージョンでの実装方法や躓いたポイントなどで日本語の記事が見つかりにくかったので、備忘録がてら記事にしました。
Spring Batchそのものについては特に解説しませんので、全く触れたことがないという方は以下の記事等を参考に、基本的な概念について理解することをおすすめします。
前提
以下の環境での実装例です。
また、実装例はあくまで検証用のため、設計上必ずしも最適化されていないところはご容赦ください。
- Spring Boot 3.2.2
- Spring Batch 5.1.0
1.メタテーブルの作成について
Spring Batchを使う上で、バッチの実行状況を管理するメタテーブルが必要です。
これらは初回実行時に作成する必要があります。
設定ファイルにspring.batch.jdbc.initialize-schema=alwaysを指定することで、バッチ起動時に自動的にメタテーブルが作成されます。
ただし、@EnableBatchProcessingアノテーションを使用している場合、この初期化が期待通りに機能しない場合があるので注意が必要です。
Spring Batchの5系から@EnableBatchProcessingの挙動が変わり、自動構成が機能しないようです。
spring.batch.jdbc.initialize-schema=always
@Component
@RequiredArgsConstructor
//@EnableBatchProcessingはつけない
public class BatchCommandLineRunner implements CommandLineRunner{
@Value("${spring.batch.jdbc.initialize-schema}")
public String initialize;
@Override
public void run(String... args) throws Exception {
if (initialize.equals(BatchConst.INITIALIZE_SCHEMA_ALWAYS)) {
// 初期化時は起動のみ
return;
}
// 以下、Jobの実行処理など
}
}
2.Job名を指定してバッチを起動する
複数のJobを定義している場合、特定のJobを指定して起動したいケースがあると思います。
Spring Batchでは、JobRegistryを使用してアプリケーション内のジョブを管理します。
これにより、起動引数で指定されたジョブ名に基づいて、ジョブを動的に取得し起動することが可能になります。
特にSpring Batch 5.1系以降では、ジョブがJobRegistryに自動的に登録されるため、このプロセスがさらに簡単になりました。
ただし、5.1系より前のバージョンを使用している場合は、JobRegistryBeanPostProcessorを利用してジョブを手動でJobRegistryに登録する必要があります。
これにより、起動時に引数で指定されたジョブ名に応じて、適切なジョブを実行できるようになります。
※JobRegistryBeanPostProcessorの使用方法については以下を参考にしてください。
以下の例では、「batch=demoJob」のようにジョブ名を渡してやることで、特定のJobを実行することができます。
@Component
@RequiredArgsConstructor
public class BatchCommandLineRunner implements CommandLineRunner{
@Autowired
private final JobLauncher jobLauncher;
@Autowired
private final JobRegistry jobRegistry;
@Value("${spring.batch.jdbc.initialize-schema}")
public String initialize;
@Override
public void run(String... args) throws Exception {
if (initialize.equals(BatchConst.INITIALIZE_SCHEMA_ALWAYS)) {
// 初期化時は起動のみ
return;
}
// 起動パラメータを取得
Properties properties = StringUtils.splitArrayElementsIntoProperties(args, "=");
String batchName = properties.getProperty("batch");
// 設定したジョブ名からジョブを取得
Job job = jobRegistry.getJob(batchName);
// 起動パラメータ設定
JobParameters jobParameters = new DefaultJobParametersConverter().getJobParameters(properties);
// ジョブ実行
jobLauncher.run(job, jobParameters);
}
}
}
3.ExitCodeの設定
処理の結果に応じて適切なExitCodeを返したい場合があると思います。
その場合は、アプリケーションの実行終了時にSystem.exit()を呼び出す必要があります。
また、正常終了のみならず、エラーや警告などの特定の状態を示すために、独自のExitCodeを設定したい場合、独自のシングルトンクラスを作成してExitCodeを管理する方法をとることができます。
後述するSkipItemListenerと組み合わせて、何か特定の条件で警告終了扱いにするなど、シェルスクリプト側で適切な対応をとれるようになります。
/**
* ExitCodeをシングルトンで使うためのクラス
*
*/
public class ExitCodeSingleton {
/*
* ExitCode(デフォルトは正常終了)
*/
public ExitCode exitCode = ExitCode.COMPLETE;
/*
* インスタンス
*/
private static ExitCodeSingleton instance = new ExitCodeSingleton();
private ExitCodeSingleton() {
}
public static ExitCodeSingleton getInstance() {
return instance;
}
public void setExitCode(ExitCode exitCode) {
this.exitCode = exitCode;
}
}
@Component
@RequiredArgsConstructor
public class BatchCommandLineRunner implements CommandLineRunner{
@Autowired
private final JobLauncher jobLauncher;
@Autowired
private final JobRegistry jobRegistry;
@Value("${spring.batch.jdbc.initialize-schema}")
public String initialize;
@Override
public void run(String... args) throws Exception {
if (initialize.equals(BatchConst.INITIALIZE_SCHEMA_ALWAYS)) {
// 初期化時は起動のみ
return;
}
// 起動パラメータを取得
Properties properties = StringUtils.splitArrayElementsIntoProperties(args, "=");
String batchName = properties.getProperty("batch");
// 終了コードを設定するためのシングルトンクラス
ExitCodeSingleton exitCodeSingleton = ExitCodeSingleton.getInstance();
try {
// 設定したジョブ名からジョブを取得
Job job = jobRegistry.getJob(batchName);
// 起動パラメータ設定
JobParameters jobParameters = new DefaultJobParametersConverter().getJobParameters(properties);
// ジョブ実行
jobLauncher.run(job, jobParameters);
} catch (Exception e) {
// ジョブ起動時にエラーが発生した場合は、異常終了とする
exitCodeSingleton.setExitCode(ExitCode.ERROR);
}
}
}
@SpringBootApplication
public class DemoApplication {
public static void main(String[] args) {
ApplicationContext context = SpringApplication.run(DemoApplication.class, args);
SpringApplication.exit(context);
// System.exitで、バッチ内で設定したExitCodeを出力(デフォルトは「1:正常終了」)
System.exit(ExitCodeSingleton.getInstance().exitCode.getExitCode());
}
}
4.標準のReader・Processor・Writerコンポーネントについて
さまざまなコンポーネントが用意されていますが、実装例を見かけないものも多かったので、一例として紹介します。
CompositeItemProcessorやClassifierCompositeItemWriterを使うと、複数のコンポーネントを組み合わせたり、処理を振り分けたりすることも可能です。
以下は、CSVファイルからユーザー情報を読み込み、バリデーションを行った後、加工してデータベースに登録する処理を行っています。
(O/RマッパーとしてMyBatisを使用しています)
@Configuration
public class DemoBatchChunkConfig {
@Autowired
MstUserMapper mstUserMapper;
@Autowired
JobLoggingListener jobLoggingListener;
@Autowired
SkipLoggingListener skipLoggingListener;
/**
* CSVファイルを読み込むItemReader
*/
@Bean
public FlatFileItemReader<UserDto> userFlatFileItemReader() {
// CSVのカラム構成
String[] columnNameArray = new String[] { "userId", "userName", "age", "gender" };
// CSVファイルのパス
String filePath = "C:\\demo\\input\\user.csv";
return new FlatFileItemReaderBuilder<UserDto>().name("userCsvReader").resource(new FileSystemResource(filePath))
.linesToSkip(1).encoding("Shift_JIS").delimited().names(columnNameArray)
.fieldSetMapper(new BeanWrapperFieldSetMapper<UserDto>() {
{
setTargetType(UserDto.class);
}
}).build();
}
/**
* バリデーション
*/
@Bean
public BeanValidatingItemProcessor<UserDto> validatingProcessor() throws Exception {
BeanValidatingItemProcessor<UserDto> beanValidatingItemProcessor = new BeanValidatingItemProcessor<>();
beanValidatingItemProcessor.setFilter(false);
return beanValidatingItemProcessor;
}
/**
* データ加工
*/
@Bean
@StepScope
public FunctionItemProcessor<UserDto, MstUser> userProcessor(
@Value("#{jobParameters[date]}") String date) {
return new FunctionItemProcessor<>(user -> {
MstUser mstUser = new MstUser();
mstUser.setUserId(user.getUserId());
mstUser.setUserName(user.getUserName());
mstUser.setAge(user.getAge());
if (user.getAge() < 20) {
mstUser.setUserKbn("1");
} else {
mstUser.setUserKbn("2");
}
mstUser.setRegisterDate(date);
return mstUser;
});
}
/*
* バリデーションプロセスと加工プロセスを連結
*/
@Bean
public CompositeItemProcessor<UserDto, MstUser> compositeProcessor()
throws Exception {
List<ItemProcessor<?, ?>> delegates = List.of(
validatingProcessor(),
userProcessor(null));
CompositeItemProcessor<UserDto, MstUser> itemProcessor = new CompositeItemProcessor<>();
itemProcessor.setDelegates(delegates);
return itemProcessor;
}
/*
* ユーザーテーブルに登録する
*/
@Bean
public ClassifierCompositeItemWriter<MstUser> classifierWriter(
SqlSessionFactory sqlSessionFactory) throws Exception {
ClassifierCompositeItemWriter<MstUser> compositeWriter = new ClassifierCompositeItemWriter<MstUser>();
compositeWriter.setClassifier(mstUser -> {
MstUser existData = mstUserMapper.selectByPrimaryKey(mstUser.getUserId());
if (existData == null) {
return insertWriter(sqlSessionFactory);
} else {
return updateWriter(sqlSessionFactory);
}
});
return compositeWriter;
}
/*
* DBに書き込むItemWriter(MyBatis利用・INSERT)
*/
@Bean
public MyBatisBatchItemWriter<MstUser> insertWriter(SqlSessionFactory sqlSessionFactory) {
MyBatisBatchItemWriter<MstUser> writer = new MyBatisBatchItemWriter<>();
writer.setSqlSessionFactory(sqlSessionFactory);
String statementId = MstUserMapper.class.getName() + ".insert";
writer.setStatementId(statementId);
return writer;
}
/*
* DBに書き込むItemWriter(MyBatis利用・UPDATE)
*/
@Bean
public MyBatisBatchItemWriter<MstUser> updateWriter(SqlSessionFactory sqlSessionFactory) {
MyBatisBatchItemWriter<MstUser> writer = new MyBatisBatchItemWriter<>();
writer.setSqlSessionFactory(sqlSessionFactory);
String statementId = MstUserMapper.class.getName() + ".updateByPrimaryKey";
writer.setStatementId(statementId);
return writer;
}
/*
* Step
*/
@Bean("demoBatchChunkStep1")
Step demoBatchChunkStep1(JobRepository jobRepository, PlatformTransactionManager transactionManager,
FlatFileItemReader<UserDto> userFlatFileItemReader,
CompositeItemProcessor<UserDto, MstUser> compositeProcessor,
ClassifierCompositeItemWriter<MstUser> classifierWriter) {
return new StepBuilder("demoBatchChunkStep1", jobRepository).<UserDto, MstUser> chunk(20, transactionManager)
.reader(userFlatFileItemReader).processor(compositeProcessor).writer(classifierWriter)
.faultTolerant().skipPolicy(new BatchSkipPolicy())
.listener(skipLoggingListener)
.build();
}
/*
* Job
*/
@Bean("demoBatchChunkJob")
Job demoBatchChunkJob(JobRepository jobRepository, Step demoBatchChunkStep1, Step demoBatchChunkStep2) {
return new JobBuilder("demoBatchChunkJob", jobRepository).incrementer(new RunIdIncrementer())
.start(demoBatchChunkStep1).listener(jobLoggingListener).build();
}
}
5.バリデーションとスキップ
Spring Batchは、指定の例外が発生した際に、そのアイテムだけをスキップする機能を提供しています。
たとえば、BeanValidatingItemProcessorを使用してアイテムのバリデーションを行った場合、バリデーションエラーのアイテムはValidationExceptionを投げるため、これを捕捉して条件に合わないアイテムをスキップする設定が可能です。
@Data
@NoArgsConstructor
public class UserDto {
private Integer userId;
@NotEmpty
// ユーザー名がブランクの場合はBeanValidatingItemProcessorでValidationExceptionが発生
private String userName;
private Integer age;
private String gender;
}
例外発生時の挙動はいくつかの方法で指定できますが、スキップできる最大数を無制限にしたい場合は、独自のSkipPolicyを実装する必要があります。
以下は、ValidationExceptionであれば、何件でもスキップしてOKという設定です。
例外の種類によって処理を分けることも可能です。
public class BatchSkipPolicy implements SkipPolicy {
@Override
public boolean shouldSkip(Throwable t, long skipCount) throws SkipLimitExceededException {
// バリデーションエラーの場合はすべてスキップ
if (t instanceof ValidationException) {
return true;
}
return false;
}
}
スキップされたアイテムに対して後処理を行いたい場合には、SkipItemListenerを利用します。
例えば、スキップされたアイテムのログを出力する、あるいは特定の例外が発生したアイテムに対して特別な処理を行うなど、柔軟な対応が可能です。
今回の例ではBeanValidatingItemProcessorが投げるValidationExceptionを対象にしていますが、独自に作成したProcessor内で条件に応じて特定の例外を投げ、SkipItemListenerでその例外を捕捉して処理するといった使い方もできます。
また、先ほどのExitCodeSingletonの実装とあわせて、スキップが発生した場合は警告終了扱いにするなどの対応も可能です。
@Slf4j
@Component
public class SkipLoggingListener implements SkipListener<Object, Object> {
@Override
public void onSkipInProcess(Object item, Throwable throwable) {
if (throwable instanceof ValidationException) {
log.error("スキップされました:" + item.toString());
ExitCodeSingleton exitCodeSingleton = ExitCodeSingleton.getInstance();
// 警告終了のExitCodeを設定する
exitCodeSingleton.setExitCode(ExitCode.COMPLETE_WITH_SKIP);
} else {
log.error("ステップの実行中にエラーが発生しました");
}
}
}
6.JobContextにアクセスする方法
様々なタイミングで、JobContextにアクセスしてデータをセットまたは取得したいことがあると思いますが、意外と書き方がややこしいです。
- JobExecutionListenerで使用する例
シンプルにJobExecutionから取得できます
@Slf4j
@Component
public class JobLoggingListener implements JobExecutionListener {
/**
* Job実行前処理
*/
@Override
public void beforeJob(JobExecution jobExecution) {
// ジョブ開始ログ
log.info("ジョブを開始します");
// Job内で使用したい値をContextにセットする
ExecutionContext exContext = jobExecution.getExecutionContext();
exContext.put("test", "ExecutionContext Test Value");
}
/**
* Job実行後処理
*/
@Override
public void afterJob(JobExecution jobExecution) {
ExitCodeSingleton exitCodeSingleton = ExitCodeSingleton.getInstance();
if (exitCodeSingleton.exitCode == ExitCode.COMPLETE_WITH_SKIP) {
jobExecution.setExitStatus(new ExitStatus(ExitCode.COMPLETE_WITH_SKIP.getValue()));
}
if (jobExecution.getStatus() == BatchStatus.COMPLETED) {
// 正常終了
log.info("ジョブが正常終了しました");
} else {
log.error("ジョブの実行中にエラーが発生しました");
exitCodeSingleton.setExitCode(ExitCode.ERROR);
}
}
}
- Tasklet内で使用する例
Tasklet内では、ChunkContext→StepContext→JobExecutionContextとたどることで取得できます。
@Component
public class DemoBatchTasklet implements Tasklet {
@Autowired
MstUserMapper mstUserMapper;
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
// ChunkContext→StepContext→JobExecutionContextから値を取り出す
Map<String, Object> jobContext = chunkContext.getStepContext().getJobExecutionContext();
String exContextValue = (String) jobContext.get("test");
System.out.println(exContextValue);
// ・・・以降の処理
}
}
- Chunkで使用する例
Chunkの各コンポーネントで使用する場合は、StepExecution→JobExecution→ExecutionContextとたどることになります。
@Component
@StepScope
public class DemoBatchProcessor implements ItemProcessor<MstUser, MstUser> {
@Value("#{stepExecution}")
StepExecution stepExecution;
@Override
public MstUser process(MstUser item) throws Exception {
// StepExecution→JobExecution→ExecutionContextから取り出す
String exContextValue = (String) stepExecution.getJobExecution().getExecutionContext().get("test");
System.out.println(exContextValue);
// ・・・以降の処理
}
}
ちなみに、標準コンポーネントではContextにアクセスする方法はないようです。
JobParameterは使用することができます。
少し不思議に見える書き方ですが、Bean生成時はNullで引数を渡していたとしてもStepの実行時にはJobParameterから値を取得することができます。
@Bean
@StepScope
public FunctionItemProcessor<UserDto, MstUser> userProcessor(
@Value("#{jobParameters[" + BatchConst.KEY_DATE + "]}") String date) {
// 実行時にはdateはJobParameterから取得できる
System.out.println(date);
return new FunctionItemProcessor<>(user -> {
// ・・・以降の処理
});
}
/*
* バリデーションプロセスと加工プロセスを連
*/
@Bean
public CompositeItemProcessor<UserDto, MstUser> compositeProcessor()
throws Exception {
List<ItemProcessor<?, ?>> delegates = List.of(
validatingProcessor(),
userProcessor(null)); // コンパイル上必要なため引数を設定
CompositeItemProcessor<UserDto, MstUser> itemProcessor = new CompositeItemProcessor<>();
itemProcessor.setDelegates(delegates);
return itemProcessor;
}
7. 参考資料
記事を書くにあたって、以下を参考にさせていただきました。