0.はじめに
Javaで並列操作における、wait()とnotify()による同期制御を行う下記のような待ち合わせの際に 「どちらのメソッドが遅くなるか」 自明でないようなユースケースの、律速段階の自動制御は通常できない仕様になっています。
メソッドAとメソッドB、場合によってはどちらが先に処理が終わるかは確信できない。そんなケースは往々にしてあるはずです。
1.SpringBatch
そんな同期作業の自動制御を可能にするのが、SpringBatchです。
Spring Batchで、同じサービスクラスのメソッドAとメソッドBを並列に実行し、両方が完了した後にメソッドCを実行するには、以下のような構成を取ることができます。
2.概要
メソッドAとメソッドBをTaskletまたはStepとして定義
Splitフローで並列実行
Wait(同期)してからメソッドCを実行するStepを定義
実装イメージ(Java)
@Configuration
public class BatchConfig {
@Autowired
private MyService myService;
@Bean
public Step stepA(StepBuilderFactory stepBuilderFactory) {
return stepBuilderFactory.get("stepA")
.tasklet((contribution, chunkContext) -> {
myService.methodA();
return RepeatStatus.FINISHED;
}).build();
}
@Bean
public Step stepB(StepBuilderFactory stepBuilderFactory) {
return stepBuilderFactory.get("stepB")
.tasklet((contribution, chunkContext) -> {
myService.methodB();
return RepeatStatus.FINISHED;
}).build();
}
@Bean
public Step stepC(StepBuilderFactory stepBuilderFactory) {
return stepBuilderFactory.get("stepC")
.tasklet((contribution, chunkContext) -> {
myService.methodC();
return RepeatStatus.FINISHED;
}).build();
}
@Bean
public Job parallelJob(JobBuilderFactory jobBuilderFactory,
Step stepA, Step stepB, Step stepC) {
Flow flow1 = new FlowBuilder<SimpleFlow>("flow1").start(stepA).build();
Flow flow2 = new FlowBuilder<SimpleFlow>("flow2").start(stepB).build();
Flow parallelFlow = new FlowBuilder<SimpleFlow>("parallelFlow")
.split(new SimpleAsyncTaskExecutor())
.add(flow1, flow2)
.build();
return jobBuilderFactory.get("parallelJob")
.start(parallelFlow)
.next(stepC) // AとBが終わってからC
.end()
.build();
}
}
ポイント
split()により flow1 と flow2 が並列実行されます。
stepC は split() の後に配置されており、AとBの両方が完了してから実行されます。
SimpleAsyncTaskExecutor は並列化のためのスレッドを提供します。
注意点
myService.methodA()とmethodB()がスレッドセーフである必要があります。
同じインスタンスから呼び出すため、状態を持たない(またはスレッドローカルを使う)設計が望ましいです。
3.コントローラーから起動する
Spring Batchのジョブをコントローラから手動で起動するには、JobLauncher を使います。以下に、上で定義した parallelJob を呼び出す @RestController のサンプルを示します。
コントローラ実装例
@RestController
@RequestMapping("/batch")
public class BatchController {
@Autowired
private JobLauncher jobLauncher;
@Autowired
@Qualifier("parallelJob")
private Job parallelJob;
@GetMapping("/run")
public ResponseEntity<String> runJob() {
try {
JobParameters params = new JobParametersBuilder()
.addLong("timestamp", System.currentTimeMillis()) // ユニークなパラメータを追加
.toJobParameters();
JobExecution execution = jobLauncher.run(parallelJob, params);
return ResponseEntity.ok("ジョブを起動しました。ステータス: " + execution.getStatus());
} catch (Exception e) {
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR)
.body("ジョブ起動中にエラーが発生しました: " + e.getMessage());
}
}
}
ポイント解説
JobLauncher:Spring Batchでジョブを起動するためのインターフェース。
JobParameters:同じジョブを複数回実行するために必要(タイムスタンプなどを使って毎回ユニークにする)。
/batch/run エンドポイントにアクセスすれば、parallelJob が実行されます。
4.非同期実行(メソッドAとBを走らせながら処理結果を待たずにレスポンスを返す)
非同期実行にしたい場合、Springの @Async を使うのがシンプルでおすすめです。これにより、ジョブ起動を別スレッドで実行してすぐにHTTPレスポンスを返すことができます。
1. @EnableAsync を設定(設定クラスなどで)
@Configuration
@EnableAsync
public class AsyncConfig {
// 必要に応じて Executor をカスタムできます
}
2. ジョブ起動処理を非同期化するサービスを作成
@Service
public class BatchJobService {
@Autowired
private JobLauncher jobLauncher;
@Autowired
@Qualifier("parallelJob")
private Job parallelJob;
@Async
public void runParallelJobAsync() {
try {
JobParameters params = new JobParametersBuilder()
.addLong("timestamp", System.currentTimeMillis())
.toJobParameters();
jobLauncher.run(parallelJob, params);
} catch (Exception e) {
// ログなどでエラーを記録
e.printStackTrace();
}
}
}
3. コントローラで非同期サービスを呼び出す
@RestController
@RequestMapping("/batch")
public class BatchController {
@Autowired
private BatchJobService batchJobService;
@GetMapping("/run")
public ResponseEntity<String> runJob() {
batchJobService.runParallelJobAsync(); // 非同期で実行
return ResponseEntity.ok("ジョブを非同期で起動しました");
}
}
ポイント
非同期で起動するので、コントローラは即レスポンスを返します。
実行結果やステータス確認が必要なら、ジョブIDを返して別エンドポイントで状態確認できるようにするのが一般的です。
5.状態確認とログ出力
状態確認とログ出力もやっておけば、バッチの実行状況をしっかり把握できます。
以下の2つを紹介します:
ジョブの状態確認エンドポイントの作成
ログ出力(コンソール or ファイル)で実行内容を記録
1. ジョブ状態確認エンドポイント
ジョブID(executionId)で確認できるようにする
まず、非同期実行時に JobExecution を返すようにして、executionId をクライアントに返すと便利です。
BatchJobService の修正
@Async
public Future<Long> runParallelJobAsync() {
try {
JobParameters params = new JobParametersBuilder()
.addLong("timestamp", System.currentTimeMillis())
.toJobParameters();
JobExecution execution = jobLauncher.run(parallelJob, params);
return new AsyncResult<>(execution.getId());
} catch (Exception e) {
e.printStackTrace();
return new AsyncResult<>(-1L);
}
}
BatchController の修正
@GetMapping("/run")
public ResponseEntity<String> runJob() throws ExecutionException, InterruptedException {
Future<Long> future = batchJobService.runParallelJobAsync();
Long jobExecutionId = future.get(); // 実際には非同期でも一旦取得
return ResponseEntity.ok("ジョブを非同期で起動しました。executionId: " + jobExecutionId);
}
状態確認用のエンドポイントを追加
@Autowired
private JobExplorer jobExplorer;
@GetMapping("/status/{executionId}")
public ResponseEntity<String> getJobStatus(@PathVariable Long executionId) {
JobExecution execution = jobExplorer.getJobExecution(executionId);
if (execution == null) {
return ResponseEntity.status(HttpStatus.NOT_FOUND).body("ジョブが見つかりません");
}
return ResponseEntity.ok("ジョブステータス: " + execution.getStatus());
}
JobExplorer はジョブの実行状況を取得できるインターフェースです。
状態は STARTING, STARTED, COMPLETED, FAILED などが返ります。
2. ログ出力(標準 or ファイル)
標準ログで出力(推奨)
バッチステップやサービス内で普通にログを出すだけでOK:
private static final Logger logger = LoggerFactory.getLogger(MyService.class);
public void methodA() {
logger.info("methodA 開始");
// 処理
logger.info("methodA 終了");
}
Spring Bootであれば、application.properties でログレベルやファイル出力も設定可能です:
logging.level.com.example.batch=INFO
logging.file.name=batch.log