はじめに
Spring BOOTで排他制御を行う方法はいくつかありますが、比較的同じ時間帯にアクセスが想定される排他制御では、レスポンスの良さが求められることもあると思います。
今回は、そんなレスポンスの良さが魅力としてあるConcurrentHashMap・Semaphoreを採用した排他制御方法と、より堅牢な方法の2パターンを紹介します。
パターン1 Semaphoreを採用した排他制御方法
Semaphore を使用して Spring Batch のコントローラからの同一IDへのアクセスを制御し、ロック状態に応じて異なるレスポンスを返す例を紹介します。
まず、処理中の ID を管理するための ConcurrentHashMap と、各 ID に対応する Semaphore を用意します。
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
@RestController
public class BatchController {
private final ConcurrentHashMap<String, Semaphore> processingSemaphores = new ConcurrentHashMap<>();
private static final int MAX_PERMITS = 1; // 同時実行数を1に制限
@GetMapping("/process/{id}")
public ResponseEntity<String> process(@PathVariable String id) {
Semaphore semaphore = processingSemaphores.computeIfAbsent(id, k -> new Semaphore(MAX_PERMITS));
try {
if (semaphore.tryAcquire(1, TimeUnit.SECONDS)) {
try {
// ここに実際の処理を記述します (例: Spring Batch のジョブ起動など)
Thread.sleep(5000); // 処理をシミュレート
return ResponseEntity.ok("処理が完了しました (ID: " + id + ")");
} finally {
semaphore.release();
// 処理完了後、Semaphoreが不要になったら削除することも検討できます
// if (semaphore.availablePermits() == MAX_PERMITS) {
// processingSemaphores.remove(id);
// }
}
} else {
return ResponseEntity.status(HttpStatus.TOO_MANY_REQUESTS)
.body("現在処理中です。しばらくお待ちください (ID: " + id + ")");
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR)
.body("処理中にエラーが発生しました (ID: " + id + "): " + e.getMessage());
}
}
}
コードの説明:
processingSemaphores: ConcurrentHashMap は、処理中の ID をキーとし、対応する Semaphore を値として保持します。ConcurrentHashMap はスレッドセーフな Map 実装です。
MAX_PERMITS: Semaphore の初期パーミット数を 1 に設定することで、特定の ID に対する処理の同時実行数を 1 に制限します。
process(@PathVariable String id):
リクエストパスから ID を取得します。
processingSemaphores.computeIfAbsent(id, k -> new Semaphore(MAX_PERMITS)): 指定された ID に対応する Semaphore が存在しない場合は、新しい Semaphore(パーミット数 1)を作成して Map に格納し、その Semaphore を返します。既に存在する場合は、既存の Semaphore を返します。
semaphore.tryAcquire(1, TimeUnit.SECONDS): パーミットを 1 つ取得しようと試みます。指定されたタイムアウト(ここでは 1 秒)以内にパーミットを取得できれば true を、できなければ false を返します。
パーミット取得成功 (true):処理を実行します。finally ブロックで必ず semaphore.release() を呼び出し、パーミットを解放します。これにより、他のスレッドが処理を実行できるようになります。
パーミット取得失敗 (false):指定されたタイムアウト時間内にパーミットを取得できなかった場合、つまり、別のリクエストが同じ ID の処理をすでに実行中の場合は、HttpStatus.TOO_MANY_REQUESTS (429 Too Many Requests) のステータスコードと、「現在処理中です」というメッセージを含む ResponseEntity を返します。
InterruptedException の捕捉: tryAcquire() が割り込みされた場合に備えて例外処理を行います。
この例のポイント:
Semaphore を使用することで、特定の ID に対する処理の同時実行を厳密に 1 に制限できます。
tryAcquire() にタイムアウトを設定することで、ロックの取得を無期限に待つことを避け、一定時間で処理中であるというレスポンスを返すことができます。
ConcurrentHashMap を使用することで、複数の異なる ID へのアクセスが同時に発生しても、それぞれの ID ごとに排他制御を行うことができます。
注意点:
この例では、処理が完了した後も Semaphore のインスタンスを processingSemaphores に保持しています。もし、特定の ID に対する処理が頻繁に行われず、Semaphore の数が際限なく増える可能性がある場合は、処理完了後に Semaphore を削除するロジックを追加することを検討してください(コメントアウトしている部分がその例です)。ただし、削除する際には、他のスレッドが同時にアクセスしていないことを保証する必要があります。
実際の Spring Batch のジョブ起動処理は、// ここに実際の処理を記述します の部分に実装する必要があります。
エラーハンドリングは、より詳細な情報を含めるなど、アプリケーションの要件に合わせて実装してください。
この例は、Semaphore を使用してコントローラレベルでの排他制御を実現する基本的な方法を示しています。実際のアプリケーションでは、より複雑な要件に合わせて、このアプローチを拡張したり、他の排他制御のメカニズムと組み合わせたりする必要があるかもしれません。
ソース
- https://cloud.google.com/eventarc/docs/samples/eventarc-audit-storage-handler
- https://spring.io/blog/2023/03/16/kotlin-dsls-in-the-world-of-springdom
- https://stackoverflow.com/questions/53804678/create-tests-for-class-using-semaphores
- https://github.com/jhwon7508/health-check
パターン2 反応は遅いが、より堅牢な方法
SpringBatchでの同一処理IDの排他制御について、以下の実装方法をご紹介します:
- JobRepositoryの設定による方法
@Configuration
public class BatchConfiguration extends DefaultBatchConfigurer {
@Override
protected JobRepository createJobRepository() throws Exception {
JobRepositoryFactoryBean factory = new JobRepositoryFactoryBean();
factory.setIsolationLevelForCreate("ISOLATION_SERIALIZABLE");
// その他必要な設定
factory.afterPropertiesSet();
return factory.getObject();
}
}
- JobParametersIncrementerの利用
@Bean
public Job sampleJob(JobBuilderFactory jobBuilderFactory) {
return jobBuilderFactory.get("sampleJob")
.incrementer(new RunIdIncrementer()) // 実行毎に異なるパラメータを生成
.start(step1())
.build();
}
- カスタムJobParametersValidatorの実装
public class CustomJobParametersValidator implements JobParametersValidator {
@Autowired
private JobRepository jobRepository;
@Override
public void validate(JobParameters parameters) throws JobParametersInvalidException {
String jobId = parameters.getString("jobId");
// 実行中のジョブを検索
List<JobExecution> runningJobs = jobRepository.findRunningJobExecutions("sampleJob");
for (JobExecution execution : runningJobs) {
if (jobId.equals(execution.getJobParameters().getString("jobId"))) {
throw new JobParametersInvalidException(
"Job with ID " + jobId + " is already running");
}
}
}
}
- データベースレベルでの排他制御
CREATE TABLE BATCH_JOB_LOCK (
JOB_ID VARCHAR(100) PRIMARY KEY,
LOCK_TIME TIMESTAMP
);
@Component
public class JobLockManager {
@Autowired
private JdbcTemplate jdbcTemplate;
public boolean acquireLock(String jobId) {
try {
jdbcTemplate.update(
"INSERT INTO BATCH_JOB_LOCK (JOB_ID, LOCK_TIME) VALUES (?, ?)",
jobId, new Timestamp(System.currentTimeMillis())
);
return true;
} catch (Exception e) {
return false;
}
}
public void releaseLock(String jobId) {
jdbcTemplate.update("DELETE FROM BATCH_JOB_LOCK WHERE JOB_ID = ?", jobId);
}
}
これらの方法を組み合わせることで、より堅牢な排他制御を実現できます。具体的な実装方法は以下のポイントに注意してください:
- 処理の開始時に排他ロックを取得
- 処理の完了時にロックを解放
- エラー発生時のロック解放処理
- タイムアウト処理の実装
また、分散環境での運用を考慮する場合は、以下の点にも注意が必要です:
- クラスタ環境での一貫性の確保
- デッドロック対策
- ロックのタイムアウト設定
これらの実装により、同一処理IDの重複実行を防ぎ、安全な処理を実現することができます。
コントローラーでの排他制御の実装例を示します:
@RestController
@RequestMapping("/api/batch")
public class BatchController {
@Autowired
private JobLockManager jobLockManager;
@Autowired
private JobLauncher jobLauncher;
@Autowired
private Job sampleJob;
@PostMapping("/execute/{jobId}")
public ResponseEntity<?> executeBatch(@PathVariable String jobId) {
try {
// ロック状態のチェックと取得
if (!jobLockManager.acquireLock(jobId)) {
// すでにロックが取得されている場合
return ResponseEntity
.status(HttpStatus.CONFLICT)
.body(new ApiResponse(false, "Job is already running"));
}
try {
// ジョブパラメータの設定
JobParameters jobParameters = new JobParametersBuilder()
.addString("jobId", jobId)
.addDate("date", new Date())
.toJobParameters();
// ジョブの実行
JobExecution jobExecution = jobLauncher.run(sampleJob, jobParameters);
// 実行状態に応じたレスポンス
if (jobExecution.getStatus() == BatchStatus.COMPLETED) {
return ResponseEntity.ok(new ApiResponse(true, "Job completed successfully"));
} else {
return ResponseEntity
.status(HttpStatus.INTERNAL_SERVER_ERROR)
.body(new ApiResponse(false, "Job execution failed"));
}
} finally {
// 処理完了後のロック解放
jobLockManager.releaseLock(jobId);
}
} catch (Exception e) {
return ResponseEntity
.status(HttpStatus.INTERNAL_SERVER_ERROR)
.body(new ApiResponse(false, "Error: " + e.getMessage()));
}
}
// ロック状態確認用エンドポイント
@GetMapping("/status/{jobId}")
public ResponseEntity<?> checkJobStatus(@PathVariable String jobId) {
boolean isLocked = jobLockManager.isLocked(jobId);
return ResponseEntity.ok(new ApiResponse(true,
isLocked ? "Job is running" : "Job is not running"));
}
}
// レスポンス用クラス
@Data
@AllArgsConstructor
class ApiResponse {
private boolean success;
private String message;
}
また、JobLockManager
クラスに以下のメソッドを追加します:
@Component
public class JobLockManager {
@Autowired
private JdbcTemplate jdbcTemplate;
// ロック状態確認メソッド
public boolean isLocked(String jobId) {
int count = jdbcTemplate.queryForObject(
"SELECT COUNT(*) FROM BATCH_JOB_LOCK WHERE JOB_ID = ?",
Integer.class,
jobId
);
return count > 0;
}
// ロックの有効期限チェックを追加
public boolean isLockExpired(String jobId, long timeoutMinutes) {
return jdbcTemplate.queryForObject(
"SELECT COUNT(*) FROM BATCH_JOB_LOCK WHERE JOB_ID = ? " +
"AND LOCK_TIME < ?",
Integer.class,
jobId,
new Timestamp(System.currentTimeMillis() - timeoutMinutes * 60 * 1000)
) > 0;
}
}
この実装では:
- ジョブ実行時に排他ロックを確認・取得
- ロック取得済みの場合は409 Conflictを返す
- ロック取得成功時はジョブを実行
- 実行結果に応じて適切なレスポンスを返す
- 処理完了時にロックを解放
- 別エンドポイントでロック状態を確認可能
また、以下のような拡張も検討できます:
- ロックのタイムアウト処理
- 強制的なロック解放機能
- ロック取得待機機能
- 詳細なジョブ状態の取得機能
これにより、クライアント側で適切なハンドリングが可能になります。