XMLは勿論、JavaConfigでSpring Batchの処理フローを制御することができます。taskletベースのステップをサンプルとして、処理パターン毎に整理します。
##順番に処理するパターン
一番単純な処理パターンであり、jobBuilderにステップを順番に登録すればOKです。
@Configuration
@EnableBatchProcessing
public class BatchConfig {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Autowired
private Task1 task1;
@Autowired
private Task2 task2;
@Autowired
private Task3 task3;
@Bean
public Step step1() {
return stepBuilderFactory.get("step1").tasklet(task1).build();
}
@Bean
public Step step2() {
return stepBuilderFactory.get("step2").tasklet(task2).build();
}
@Bean
public Step step3() {
return stepBuilderFactory.get("step3").tasklet(task3).build();
}
@Bean
public Job job(Step step1, Step step2, Step step3) throws Exception {
// step1 -> step2 -> step3の順に実行される。
return jobBuilderFactory
.get("job")
.incrementer(new RunIdIncrementer())
.start(step1)
.next(step2)
.next(step3)
.build();
}
}
又は、各stepを1つの処理フローにまとめてjobBuilderに登録する。
......
@Bean
public Job job(Step step1, Step step2, Step step3) throws Exception {
// step1 -> step2 -> step3のflowを作成
Flow flow = new FlowBuilder<Flow>("flow")
.from(step1)
.next(step2)
.next(step3)
.build();
return jobBuilderFactory
.get("job")
.incrementer(new RunIdIncrementer())
.start(flow)
.end()
.build();
}
タスク処理の中にStepContributionにExitStatusを設定することでタスク処理結果を登録できます。
@Component
public class Task1 implements Tasklet {
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
if (isCheckOK()) {
// 成功
contribution.setExitStatus(ExitStatus.COMPLETED);
} else {
// 失敗
contribution.setExitStatus(ExitStatus.FAILED);
}
return RepeatStatus.FINISHED;
}
......
}
タスクの結果を取得し、onで条件分岐します。
.....
@Bean
public Job job(Step step1, Step step2, Step step3) throws Exception {
// step1 -> OK -> step2
// NG -> step3
return jobBuilderFactory
.get("job")
.incrementer(new RunIdIncrementer())
.start(step1).on(ExitStatus.COMPLETED.getExitCode()).to(step2)
.from(step1).on(ExitStatus.FAILED.getExitCode()).to(step3)
.end()
.build();
}
条件によって後続タスクがなければ、failを使えば処理終了できます。
.....
@Bean
public Job job(Step step1, Step step2) throws Exception {
// step1 -> OK -> step2
// NG -> end
return jobBuilderFactory
.get("job")
.incrementer(new RunIdIncrementer())
.start(step1).on(ExitStatus.COMPLETED.getExitCode()).to(step2)
.from(step2).on(ExitStatus.FAILED.getExitCode()).fail()
.end()
.build();
}
非同期の処理パターンです。以下のようにflowのsplitを利用します。
......
@Bean
public Job job(Step step1, Step step2, Step step3) throws Exception {
// step1をflow1に登録
Flow flow1 = new FlowBuilder<Flow>("flow1").start(new FlowBuilder<Flow>("step1").from(step1).end()).build();
// 並行処理のstep2、step3をflow2に登録
Flow flow2 = new FlowBuilder<Flow>("flow2").start(new FlowBuilder<Flow>("step2").from(step2).end())
.split(new SimpleAsyncTaskExecutor()).add(new FlowBuilder<Flow>("step3").from(step3).end()).build();
// flow1 -> flow2の順にjobBuilderに登録
return jobBuilderFactory
.get("job")
.incrementer(new RunIdIncrementer())
.start(flow1)
.next(flow2)
.end()
.build();
}
partitionはsplitと異なり、スレッド毎に違う処理を記述できません。並行処理が1つのステップになり、処理量に応じてそのステップが複製され、マルチスレッドで処理します。
必要なもの:
1.masterタスク:slaveタスクの箱
2.slaveタスク:複製される対象タスク
3.handler:slaveスレッドを何個まで生成するかをコントロール
4.partitioner:slave処理の入力情報を設定(例:処理範囲)
@Configuration
@EnableBatchProcessing
public class BatchConfig {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Autowired
private DemoPartitioner demoPartitioner;
@Autowired
private SlaveTask slaveTask;
@Bean
public Step slaveStep() {
return stepBuilderFactory.get("slaveStep").tasklet(slaveTask).build();
}
@Bean
public Step masterStep() {
// masterにslave、handler、partitionerを設定
return stepBuilderFactory.get("masterStep").partitioner(slaveStep().getName(), demoPartitioner)
.partitionHandler(handler()).build();
}
@Bean
public Job job() {
return jobBuilderFactory.get("job")
.incrementer(new RunIdIncrementer())
.start(masterStep())
.build();
}
@Bean
public PartitionHandler handler() {
TaskExecutorPartitionHandler handler = new TaskExecutorPartitionHandler();
handler.setGridSize(10);
handler.setTaskExecutor(taskExecutor());
handler.setStep(slaveStep());
try {
handler.afterPropertiesSet();
} catch (Exception e) {
e.printStackTrace();
}
return handler;
}
@Bean
public SimpleAsyncTaskExecutor taskExecutor() {
return new SimpleAsyncTaskExecutor();
}
}
partitionerクラスにて、ExecutionContextに各スレッド処理のインプット情報を設定します。
下記の例:Thread1は1~10、Threadは11~20...
@Component
public class DemoPartitioner implements Partitioner {
@Override
public Map<String, ExecutionContext> partition(int gridSize) {
Map<String, ExecutionContext> map = new HashMap<String, ExecutionContext>();
int range = 10;
int from = 1;
int to = range;
for (int i = 1; i <= gridSize; i++) {
ExecutionContext context = new ExecutionContext();
context.putString("name", "thread" + i);
context.putInt("from", from);
context.putInt("to", to);
map.put("partition" + i, context);
from = to + 1;
to += range;
}
return map;
}
}
slaveタスクにおいて、処理のインプット情報をExecutionContextから取得します。
@Component
public class SlaveTask implements Tasklet {
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
String name = (String)chunkContext.getStepContext().getStepExecution().getExecutionContext().get("name");
int fromId = (int)chunkContext.getStepContext().getStepExecution().getExecutionContext().get("from");
int toId = (int)chunkContext.getStepContext().getStepExecution().getExecutionContext().get("to");
System.out.println(name + ":" + fromId + "~" + toId);
return RepeatStatus.FINISHED;
}
}
上記の実行結果
thread1:1~10
thread4:31~40
thread7:61~70
thread6:51~60
thread3:21~30
thread10:91~100
thread9:81~90
thread2:11~20
thread8:71~80
thread5:41~50
参考資料:
https://sites.google.com/site/soracane/home/springnitsuite/spring-batch