52
61

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

JavaConfigでSpring Batchの処理フローを制御する。

Posted at

XMLは勿論、JavaConfigでSpring Batchの処理フローを制御することができます。taskletベースのステップをサンプルとして、処理パターン毎に整理します。

##順番に処理するパターン
2017-11-18_144958.jpg
一番単純な処理パターンであり、jobBuilderにステップを順番に登録すればOKです。

@Configuration
@EnableBatchProcessing
public class BatchConfig {

	@Autowired
	private JobBuilderFactory jobBuilderFactory;

	@Autowired
	private StepBuilderFactory stepBuilderFactory;

	@Autowired
	private Task1 task1;
	
	@Autowired
	private Task2 task2;
	
	@Autowired
	private Task3 task3;
	
	@Bean
	public Step step1() {
		return stepBuilderFactory.get("step1").tasklet(task1).build();
	}

	@Bean
	public Step step2() {
		return stepBuilderFactory.get("step2").tasklet(task2).build();
	}
	
	@Bean
	public Step step3() {
		return stepBuilderFactory.get("step3").tasklet(task3).build();
	}

	@Bean
	public Job job(Step step1, Step step2, Step step3) throws Exception {
		// step1 -> step2 -> step3の順に実行される。
		return jobBuilderFactory
				.get("job")
				.incrementer(new RunIdIncrementer())
				.start(step1)
				.next(step2)
				.next(step3)
				.build();
	}
}

又は、各stepを1つの処理フローにまとめてjobBuilderに登録する。

    ......

	@Bean
	public Job job(Step step1, Step step2, Step step3) throws Exception {
		// step1 -> step2 -> step3のflowを作成
		Flow flow = new FlowBuilder<Flow>("flow")
				.from(step1)
				.next(step2)
				.next(step3)
				.build();
		
		return jobBuilderFactory
				.get("job")
				.incrementer(new RunIdIncrementer())
				.start(flow)
				.end()
				.build();
	}

##条件分岐するパターン
2017-11-18_151605.jpg

タスク処理の中にStepContributionにExitStatusを設定することでタスク処理結果を登録できます。

@Component
public class Task1 implements Tasklet {

	@Override
	public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
		
		if (isCheckOK()) {
			// 成功
			contribution.setExitStatus(ExitStatus.COMPLETED);
		} else {
			// 失敗
			contribution.setExitStatus(ExitStatus.FAILED);
		}
		
		return RepeatStatus.FINISHED;
	}

    ......
}

タスクの結果を取得し、onで条件分岐します。

	.....

	@Bean
	public Job job(Step step1, Step step2, Step step3) throws Exception {
		// step1 -> OK -> step2
		//          NG -> step3
		return jobBuilderFactory
				.get("job")
				.incrementer(new RunIdIncrementer())
				.start(step1).on(ExitStatus.COMPLETED.getExitCode()).to(step2)
				.from(step1).on(ExitStatus.FAILED.getExitCode()).to(step3)
				.end()
				.build();
	}

条件によって後続タスクがなければ、failを使えば処理終了できます。

	.....

	@Bean
	public Job job(Step step1, Step step2) throws Exception {
		// step1 -> OK -> step2
		//          NG -> end
		return jobBuilderFactory
				.get("job")
				.incrementer(new RunIdIncrementer())
				.start(step1).on(ExitStatus.COMPLETED.getExitCode()).to(step2)
				.from(step2).on(ExitStatus.FAILED.getExitCode()).fail()
				.end()
				.build();
	}

##並行処理するパターン(split)
2017-11-18_145234.jpg

非同期の処理パターンです。以下のようにflowのsplitを利用します。

	......

	@Bean
	public Job job(Step step1, Step step2, Step step3) throws Exception {
		// step1をflow1に登録
		Flow flow1 = new FlowBuilder<Flow>("flow1").start(new FlowBuilder<Flow>("step1").from(step1).end()).build();
		// 並行処理のstep2、step3をflow2に登録
		Flow flow2 = new FlowBuilder<Flow>("flow2").start(new FlowBuilder<Flow>("step2").from(step2).end())
				.split(new SimpleAsyncTaskExecutor()).add(new FlowBuilder<Flow>("step3").from(step3).end()).build();
		
		// flow1 -> flow2の順にjobBuilderに登録
		return jobBuilderFactory
				.get("job")
				.incrementer(new RunIdIncrementer())
				.start(flow1)
				.next(flow2)
				.end()
				.build();
	}

##並行処理するパターン(partition)
2017-11-18_182539.jpg

partitionはsplitと異なり、スレッド毎に違う処理を記述できません。並行処理が1つのステップになり、処理量に応じてそのステップが複製され、マルチスレッドで処理します。
必要なもの:
 1.masterタスク:slaveタスクの箱
 2.slaveタスク:複製される対象タスク
 3.handler:slaveスレッドを何個まで生成するかをコントロール
 4.partitioner:slave処理の入力情報を設定(例:処理範囲)

@Configuration
@EnableBatchProcessing
public class BatchConfig {

	@Autowired
	private JobBuilderFactory jobBuilderFactory;
	@Autowired
	private StepBuilderFactory stepBuilderFactory;

	@Autowired
	private DemoPartitioner demoPartitioner;
	
	@Autowired
	private SlaveTask slaveTask;

	@Bean
	public Step slaveStep() {
		return stepBuilderFactory.get("slaveStep").tasklet(slaveTask).build();
	}

	
	@Bean
	public Step masterStep() {
		// masterにslave、handler、partitionerを設定
		return stepBuilderFactory.get("masterStep").partitioner(slaveStep().getName(), demoPartitioner)
				.partitionHandler(handler()).build();
	}
	
	@Bean
	public Job job() {
		return jobBuilderFactory.get("job")
			.incrementer(new RunIdIncrementer())
			.start(masterStep())
			.build();
	}

	@Bean
	public PartitionHandler handler() {
		TaskExecutorPartitionHandler handler = new TaskExecutorPartitionHandler();
		handler.setGridSize(10);
		handler.setTaskExecutor(taskExecutor());
		handler.setStep(slaveStep());
		try {
			handler.afterPropertiesSet();
		} catch (Exception e) {
			e.printStackTrace();
		}
		return handler;
	}

	@Bean
	public SimpleAsyncTaskExecutor taskExecutor() {
		return new SimpleAsyncTaskExecutor();
	}
}

partitionerクラスにて、ExecutionContextに各スレッド処理のインプット情報を設定します。
下記の例:Thread1は1~10、Threadは11~20...

@Component
public class DemoPartitioner implements Partitioner {

	@Override
	public Map<String, ExecutionContext> partition(int gridSize) {

		Map<String, ExecutionContext> map = new HashMap<String, ExecutionContext>();

		int range = 10;
		int from = 1;
		int to = range;

		for (int i = 1; i <= gridSize; i++) {
			ExecutionContext context = new ExecutionContext();

			context.putString("name", "thread" + i);
			context.putInt("from", from);
			context.putInt("to", to);

			map.put("partition" + i, context);

			from = to + 1;
			to += range;

		}
		return map;
	}
}

slaveタスクにおいて、処理のインプット情報をExecutionContextから取得します。

@Component
public class SlaveTask implements Tasklet {

	@Override
	public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
		String name = (String)chunkContext.getStepContext().getStepExecution().getExecutionContext().get("name");
		int fromId = (int)chunkContext.getStepContext().getStepExecution().getExecutionContext().get("from");
		int toId = (int)chunkContext.getStepContext().getStepExecution().getExecutionContext().get("to");
		
		System.out.println(name + ":" + fromId + "~" + toId);
		return RepeatStatus.FINISHED;
	}
}

上記の実行結果

thread1:1~10
thread4:31~40
thread7:61~70
thread6:51~60
thread3:21~30
thread10:91~100
thread9:81~90
thread2:11~20
thread8:71~80
thread5:41~50

参考資料:
https://sites.google.com/site/soracane/home/springnitsuite/spring-batch

52
61
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
52
61

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?