Spring Batchとは?
- Springのフレームワークの1つ
- バッチ処理を実装するためのフレームワーク
※ バッチ処理とは、あらかじめ一連の処理手順を登録しておき、自動的に処理を行う方式のことで、一定期間ごとに大量のデータ処理したい場合に有効な手法で、定期的なデータ集計やデータのバックアップ処理に向いている。
Spring Batchの仕組み
JobRepository |
---|
Spring Batchの実行履歴を管理する機能で 実行履歴はDB上のテーブルで管理される |
↕️ ↕️ ↕️
JobLaucher | Job | Step | Tasklet |
---|---|---|---|
JobLaucherを 起動するためのモジュール |
Stepを順に呼び出す | Taskletを呼び出す | 実際に処理が実装される場所で2種類の実装パターンがある ①Chunkモデル②Taskletモデル |
Jobの実装
- Jobの実装では、『どのStepをどのような順番、分岐で実行していくか』という情報を定義する
@Configuration
public class SampleBatchConfig {
//job
@Bean
public Job sampleJob(JobRepository jobRepository, Step sampleStep) {
//jobを定義する
//第一引数:JobRepositoryへのアクセスするために使用
//第二引数:Jobに登録したいStepを定義する
return new JobBuilder("sampleJob", jobRepository)
//ジョブ情報の生成を行っている
//第一引数:JobRepositoryへのアクセスするために使用
//第二引数:Jobに登録したいStepを定義する
.start(sampleStep)
.build();
}
//step
@Bean
public Step sampleStep(~略~) {
~略~(この中で呼び出す Taskletを定義する)
}
}
(1)Step実行順序パターン1
@Bean
public Job sampleJob(JobRepository jobRepository, Step stepA,
Step stepB, Step stepC) {
return new JobBuilder("sampleJob", jobRepository)
.start(stepA)
.next(stepB)
.next(stepC)
.build();
}
- 単純パターンで、上から順にStepが実行される
- はじめに実行するStepはstart()に定義し、それ以降のStepはnext()に定義する
(2)Step実行順序パターン2
@Bean
public Job sampleJob(JobRepository jobRepository, Step stepA,
Step stepB, Step stepC) {
return new JobBuilder("sampleJob", jobRepository)
.start(stepA)
.on("*").to(stepB)
.from(stepA).on("FAILED").to(stepC)
.end()
.build();
}
- on()は条件を定義し、その条件を満たす場合に実行するStepをto()で定義する
①処理が成功した場合 ⇨ 『StepB』が実行される
②処理が失敗した場合 ⇨ 『StepC』が実行される
(3)Step実行順序パターン3
@Bean
public Job sampleJob(JobRepository jobRepository, Step stepA,
Step stepB, Step stepC) {
return new JobBuilder("sampleJob", jobRepository)
.start(stepA)
.next(stepB).on("FAILED").end()
.from(stepB).on("*").to(stepC).end()
.build();
}
- end()を使用すると、その時点でJobを終了する
①StepBの処理が成功した場合 ⇨ 続いて『StepC』が実行される
②StepBの処理が失敗した場合 ⇨ Jobは”処理完了”のステータスを返す
Taskletの実装
@Component
public class SampleTasklet implements Tasklet {
//TaskletモデルではTaskletインターフェイスを実装したクラスを作成する
@Override
public RepeatStatus execute(StepContribution contribution,
ChunkContext chunkContext) throws Exception {
//Taskletインターフェイスに定義されているexecuteメソッドを実装する
contribution.setExitStatus(ExitStatus.FAILED);
//Stepの『処理結果ステータス』を設定する
return RepeatStatus.FINISHED;
//Taskletの『繰り返し要否ステータス』を返却する
}
}
※FINISHED:繰り返し不要(そのままTaskletを終える)
※continuable:もう一度Taskletを呼ぶ
(1)Stepの実装
@Bean
public Job sampleStep(JobRepository jobRepository,
PlatformTrnsactionManager trnsactionManager, Tasklet sampleTasklet) {
//メソッド内で実装するステップ定義に必要な変数を引数に用意する
return new JobBuilder("sampleStep", jobRepository)
//ステップ定義を行う
//第一引数:任意のステップ名称
JobRepositoryに実行履歴を登録する時に使用される
//第二引数:JobRepositoryへアクセスするために使用される
.tasklet(sampleTasklet, trnsactionManager)
//Stepから呼ぶTaskletを設定する
//第一引数:Stepから呼びたいTaskletクラス
//第二引数:バッチ処理としてのトランザクションを担保するために使用する
.build();
}
【引数について】
①JobRepository:JobRepositoryへのアクセス用のクラス
②PlatformTrnsactionManager:トランザクション管理用のクラス
⇨ Spring Batchが実装まで用意しているクラス(自分で実装する必要はない)
③Tasklet:業務処理用のクラス
⇨ Spring Batchがインターフェースは用意しているが、実装は自分でする必要がある
Spring Batchで使用するステータスについて
①RepeatStatus: Taskletから返される繰り返し要否ステータス
②ExitStatus: Tasklet(Step)から返される結果ステータス
③BatchStatus: Jobから返される結果ステータス(バッチ処理の最終ステータスとなる)
ExitStatusの種類
種別 | 意味 | 使用例 |
---|---|---|
EXECUTING | 処理が実行中 | 設定不要(Spring Batchにより設定される) |
COMPLETED | 処理が正常終了した | 設定不要(Spring Batchにより設定される) |
NOOP | 処理を行わなかった | 処理を実行する必要がなかった場合に設定する等 |
FAILED | 処理が失敗した | 業務エラーが発生した際に設定する等 異常終了時はSpring Batchにより自動設定される |
STOPPED | 処理が停止した | 業務エラーや異常終了等が発生した際に設定する等 ※Spring Batchにより自動設定される例がある |
UNKNOWN | 状態不明 | どのステータスにも該当しない場合に設定する等 |
BatchStatusの種類
種別 | 意味 | reStart可否 | 適用例 |
---|---|---|---|
STARTING | 開始前 | ✖️ | 自動設定 |
STARTED | 実行中 | ✖️ | STARTINGの後の自動設定 |
COMPLETED | 実行成功 | ✖️ | 自動設定(エラーがない場合) |
STOPPING | 停止待ち | ✖️ | Jobで".stop()"された際、または 手動でジョブを中断した際に設定 |
STOPPED | 停止 | ○ | STOPPINGの後に自動設定 |
FAILED | 実行失敗 | ○ | Jobで".fail()"された際に設定 |
ABANDONED | 予期しないエラー により処理中断 |
✖️ | ・DBの接続エラー ・ハードウェアの故障等の予期しないエラーが発生した際の自動設定 |
UNKNOWN | 状態不明 | ✖️ | 自動設定 |
Chunkモデル
Chunkモデルとは①〜③によって構成されるステップモデルで①〜③の順でStepが構成され、処理が実行される。
①読み込み(ItemReader) ⇨ ファイルやDBのデータ取得
②加工(ItemProcessor) ⇨ 取得したデータのチェック、更新情報の作成
③書き込み(ItemWriter) ⇨ 更新情報からファイルやDBのデータ更新
トランザクションの単位をChunkと呼び、その間隔は実装者が指定することができる。
実装者は①〜③のクラスをそれぞれ作成し、処理を実装していくが、その際には、Spring Batch側から提供されているインターフェースを実装する必要がある。
★ 同一Chunk内で読み込みと加工は1件ずつ、書き込みは全件まとめて行う
(1)Stepの実装
@Bean
Step sampleStep(JobRepository jobRepository,
PlatformTrnsactionManager trnsactionManager, Tasklet sampleTasklet) {
return new StepBuilder("sampleStep", jobRepository)
.<ReceiveFileInfo, ProcessedFileInfo> chunk(1000, transactionManeger)
//Chunkモデルを実装するために宣言する
【引数について】
//第一引数:コミット単位を示す数値
//第二引数:トランザクション管理用のクラス
.reader(reader)
.processor(processor)
.writer(writer)
.build();
}
①読み込み(ItemReader)
//インターフェイス
public interface ItemReader<T>{
@Nullable
T read()throws Exception, UnexcepectedInputException, ParseException,
NonTransientResouceException;
}
・Tの部分は、読み込んだ情報を保管するBeanを指定する。
・Chunk単位でreadメソッドが連続して呼び出され、returnしたBeanは次の加工処理へ渡される
・readメソッドからNullが返却されるとデータの読み込みが完了したとみなされる
ItemReaderの実装クラス
DBの読み込み
アクセス方法 | 実装クラス |
---|---|
JDBC | JdbcCursourItemReader JdbcPagingItemReader |
MyBatis | MyBatisCursourItemReader MyBatisPagingItemReader |
JPA | JpaCursourItemReader JpaPagingItemReader |
ファイルの読み込み
ファイル数 | 実装クラス |
---|---|
単一 | FlatFileItemReader |
複数 | MultiResouceItemReader |
⚫︎ chunk/sampleReader.javaを作成する
package com.udemy.hello.chunkModel;
import java.nio.charset.Charset;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.text.ParseException;
import java.util.List;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.UnexpectedInputException;
import org.springframework.batch.item.NonTransientResourceException;
import org.springframework.stereotype.Component;
@Component
public class SampleReader implements ItemReader<ReceiveFileInfo> {
private List<String> lines = null;
private int currentIndex = 0;
@Override
public ReceiverFileInfo read()
throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
if (lines == null) {
try {
lines = Files.readAllLines(Paths.get("C:/work/sample.csv"), Charset.forName("UTF-8"));
} catch (Exception e) {
throw new UnexpectedInputException("Error reading file", e);
}
}
if (currentIndex < lines.size()) {
var arrColumn = lines.get(currentIndex++).split(",");
var receiveFileInfo = new ReceiveFileInfo();
receiveFileInfo.setId(arrColumn[0]);
receiveFileInfo.setName(arrColumn[1]);
receiveFileInfo.setAge(Integer.parseInt(arrColumn[2]));
return receiveFileInfo;
}
return null;
}
}
②加工(ItemProcessor)
//インターフェイス
public interface Processor<I,O> {
@Nullable
O proccess(@NonNull I item)throws Exception;
}
・I の部分は入力情報のBean、Iの部分は出力情報を保管するBeanを指定する。
・Chunk単位でproccessメソッドが連続して呼び出され、returnしたBeanは次の書き込み処理へ渡される
・ItemProcessorは実装必須ではない
⚫︎ chunk/SampleProcessor.javaを作成する
package com.udemy.hello.chunkModel;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.stereotype.Component;
@Component
public class SampleProcessor implements ItemProcessor<ReceiveFileInfo, FileInfo> {
@Override
public FileInfo process(ReceiveFileInfo item) throws Exception {
var processedFileInfo = new FileInfo();
processedFileInfo.setId(item.getId());
processedFileInfo.setName(item.getName());
processedFileInfo.setAge(item.getAge());
processedFileInfo.setAdult(item.getAge() > 19);
return processedFileInfo;
}
}
③書き込み(ItemWriter)
//インターフェイス
public interface ItemWriter<T> {
void write(@NonNull Chunk<? extend T>chunk)throws Exception;
}
・T の部分は、書き込み入力情報となるBeanを指定する。
・Chunk単位でwriteメソッドが呼び出される。※この単位で書き込み(=コミット)が行われる。
・引数のchunkはchunk.getItems()することで、前工程の出力情報をリスト形式で取得できる。
・書き込み処理では戻り値の設定は不要。
⚫︎ chunk/SampleWriter1.javaを作成する
package com.udemy.hello.chunkModel;
import org.springframework.batch.item.Chunk;
import org.apache.tomcat.jni.FileInfo;
import org.springframework.batch.item.ItemWriter;
import org.springframework.stereotype.Component;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.stereotype.Repository;
import lombok.RequiredArgsConstructor;
@Component
@RequiredArgsConstructor
public class SampleWriter1 implements ItemWriter<FileInfo> {
private final FileRepository repository;
@Override
public void write(Chunk<? extends FileInfo> chunk) throws Exception {
for (FileInfo processedFileInfo : chunk.getItems()) {
try {
repository.save(processedFileInfo);
} catch (Exception e) {
throw e;
}
}
}
}
ItemProcessorの実装クラス
実装クラス | 実現できること |
---|---|
CompositeItemProcessor | 複数のItemProcessorを設定 |
BeanValidatingItemProcessor | ItemProcessorの入力情報に対して バリーデーションチェックを実施 |
ItemWriterの実装クラス
DBの読み込み
アクセス方法 | 実装クラス |
---|---|
JDBC | JdbcBatchItemWriter |
MyBatis | MyBatisBatchItemWriter |
JPA | JpaItemWriter |
ファイルの書き込み
アクセス方法 | 実装クラス |
---|---|
単一 | FlatFileItemWriter |
複数 | MultiResouceItemWriter |
その他
用途 | 実装クラス |
---|---|
メール送信 | SimpleMailMessageItemWriter MimelMessageItemWriter |
DB更新 | RepositoryItemWriter |
JobRepositoryについて
- Spring Batchの実行履歴を管理する場所で、具体的にはDBのテーブルで管理される。
JobRepositoryの構成テーブル
- BATCH_JOB_EXECUTION : 実行結果(ステータス)が確認できる
- BATCH_JOB_EXECUTION_PARAMS : 入力パラメータが確認できる
- BATCH_STEP_EXECUTION : 実行結果(ステータス)が確認できる
参考サイト
【Spring Batchで作るBatchアプリ | part1】Spring Batchって何?(超概要編)【初学者向け】
【Spring Batchで作るBatchアプリ | part2】Jobの実装方法を学ぼう!【初学者向け】
【Spring Batchで作るBatchアプリ | part3】Step、Taskletの実装方法を学ぼう!【Taskletモデル編】【初学者向け】
【Spring Batchで作るBatchアプリ | part4】処理結果ステータスについて学ぼう!【初学者向け】
【Spring Batchで作るBatchアプリ | part5】Step、Taskletの実装方法を学ぼう!【Chunkモデル編①】【初学者向け】
【Spring Batchで作るBatchアプリ | part6】Step、Taskletの実装方法を学ぼう!【Chunkモデル編②】【初学者向け】
【Spring Batchで作るBatchアプリ | part7】Step、Taskletの実装方法を学ぼう!【Chunkモデル編③】【初学者向け】
【Spring Batchで作るBatchアプリ | part8】Step、Taskletの実装方法を学ぼう!【Chunkモデル編④】【初学者向け】
【Spring Batchで作るBatchアプリ | part9】Step、Taskletの実装方法を学ぼう!【Chunkモデル編⑤】【初学者向け】
【Spring Batchで作るBatchアプリ | part10】Step、Taskletの実装方法を学ぼう!【Chunkモデル編⑥】【初学者向け】
【Spring Batchで作るBatchアプリ | part11】JobRepositoryの超基礎知識、構築方法を学ぼう!【初学者向け】