訳あって同時実行数を制御する処理が必要そうなので・・・Javaで実現するとしたら「こんな感じかな〜」というのを作ってみました。完全に個人メモです。
もっと良いやり方や、OSSのこれ使えば同じことできるよ〜みたいなのあれば是非コメントを!!
やりたいこと
- 同時に処理を行うスレッド数(=最大同時実行スレッド数)を指定できること
- 「最大同時実行スレッド数」に達している際は空きがでるまでキューインングできること(+キュー数も指定できること)
- キューイング状態で待つ最大時間(=無限に待たないようにタイムアウト)を指定できること
- 「最大同時実行スレッド数」+「キュー数」を超える状態を検知したら拒否時の処理を実装できること
作ってみたコンポーネント
package com.example;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
public class ConcurrencyControlTemplate {
private final Semaphore semaphore;
private final BlockingQueue<Thread> permittedThreads;
private long queueingTimeout = 30000;
private TimeUnit timeoutUnit = TimeUnit.MILLISECONDS;
public ConcurrencyControlTemplate(int concurrency, int queueSize, boolean fair) {
this.semaphore = new Semaphore(concurrency, fair);
this.permittedThreads = new LinkedBlockingQueue<>(concurrency + queueSize);
}
public void setQueueingTimeout(long queueingTimeout) {
this.queueingTimeout = queueingTimeout;
}
public void setTimeoutUnit(TimeUnit timeoutUnit) {
this.timeoutUnit = timeoutUnit;
}
public void execute(Runnable task, Runnable dinedHandler) {
executeWithResult(() -> {
task.run();
return null;
}, () -> {
dinedHandler.run();
return null;
});
}
public <T> T executeWithResult(Supplier<T> task, Supplier<T> dinedHandler) {
boolean shouldRelease = false;
Thread current = Thread.currentThread();
T returnValue;
if (!permittedThreads.offer(current)) {
return dinedHandler.get();
}
try {
if (semaphore.tryAcquire(queueingTimeout, timeoutUnit)) {
shouldRelease = true;
returnValue = task.get();
} else {
returnValue = dinedHandler.get();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
returnValue = dinedHandler.get();
} finally {
permittedThreads.remove(current);
if (shouldRelease) {
semaphore.release();
}
}
return returnValue;
}
}
作ったコンポーネントの使い方
ConcurrencyControlTemplate concurrencyControlTemplate = new ConcurrencyControlTemplate(
8, // 同時実行を許可するスレッド数
8, // 同時実行を許可するスレッド数を超えている際にキューイングするスレッド数
true // 実行権を与える順番を厳密に制御するか否かを指定(true: 厳密に制御する、false: 厳密に制御しない)
);
concurrencyControlTemplate.setQueueingTimeout(10); // キューイング状態で待機を続ける最大時間(例:10秒) ※デフォルトは30秒
concurrencyControlTemplate.setTimeoutUnit(TimeUnit.SECONDS);
concurrencyControlTemplate.execute(() -> {
System.out.println("同実行制御下で行う処理を実装する");
// ...
}, () -> {
System.out.println("キューを含む同時実行上限を超えた時の処理(=リクエスト拒否時に処理)を実装する");
// ...
});
参考にしたもの
- Tomcatの「SemaphoreValve」
補足
APサーバや各種サーバ機能の設定で最大スレッド数やキュー数を指定できるものの、リジェクト時の動作が期待通りでなかったり・(思ったように)カスタマイズできないものが多かったので・・・自分で作ってみました。