#スレッドプールを利用する
###スレッドプールを使うときに覚えておく事
- スレッドプールはタスクが同質で独立しているときにいちばん有効に使う事ができる。
- 実行中の全てのタスクが、ワークキューの上にいるタスクの結果を待つ場合、スレッドの飢餓状態デッドロックがおきる。
下記のThreadDeadLockクラスでは必ずデッドロックが発生する。ExecutorServiceとFutureは非常に便利だが、getメソッド使用するとサブタスクの処理が完了するまでメインスレッドをブロックし、サブタスクが完了してから値が返される。
public class ThreadDeadLock{
ExecutorService exec = Executors.newSingleThreadExecutor();
public class RenderPageTask implements Callable<String>{
public String Call() throws Exception{
Future<String> header, footer;
header = exec.submit(new LoadFileTask("header.html"));
footer = exec.submit(new LoadFileTask("footer.html"));
String page = renderBody();
return header.get() + page + footer.get(); //デッドロックが発生、タスクはサブタスクの結果を待ち続ける。
}
}
}
- スレッドプールが大きすぎる事と、小さすぎることを避ける。
- 計算が主なタスクでは、仮にスレッド数Nとすると「N+1」スレッドが最も効率的に利用できる。(ページフォールトなどで休止する為に一つ余分にスレッドを用意する)
###ThreadPoolExecutorを構成する
- ThreadPoolExecutorはExecutorsクラスが提供しているファクトリメソッド(newCachedThreadPool,newFixedThreadPool...)が返すExecutorの実装クラスの共通のベースクラス
- Coreプールサイズ(常に維持しようとするスレッド数)、最大プールサイズ、キープアライブ時間の3つがスレッドの作成と破壊を支配する
- newFixedThreadPoolはコアプールサイズと最大プールサイズの両方をリクエストされたプールサイズにセット、タイムアウトは無限
- newCachedThreadPoolはコアプールサイズ0,最大プールサイズ無限、タイムアウト1分
- newSingleThreadPoolはタスクが並行に実行されない事を保証し、スレッド拘束によるスレッドセーフ性を満たす
- ThreadPoolExecutorでは実行を待つタスクを入れるためにBlockingQueueが使われる。キューが満杯のときにリクエストされたタスクをどうするかは飽和ポリシーがある。
- スレッドプールがスレッドを作るときにはThreadFactoryインタフェースを用いる
public interface ThreadFactory{
Thread newThread(Runnable r);
}
###ThreadPoolExecutorを拡張する
- ThreadPoolExecutorは拡張される前提で設計されているのでサブクラスがオーバライドできる"フック"を提供している。beforeExecute,afterExecute,terminatedなどのフックをオーバライドして、ThreadPoolExecutorの機能を拡張できる。
- beforeExecutor,afterExecutorはタスクを実行するスレッドの中で呼ばれるため、ログ記録、監視、統計収集の目的に使える。
public class TimingThreadPool extends ThreadPoolExecutor{
private final ThreadLocal<Long> startTime = new ThreadLocal<Long>();
private final Logger log = Logger.getLogger("TimingThreadPool");
private final AtomicLong numTasks = new AtomicLong();
private final AtomicLong totalTime = new AtomicLong();
protected void beforeExecute(Thread t, Runnable r){
super.beforeExecute(t,r);
log.fine(String.format("Thread $s: start %s",t));
startTime.set(System.nanoTime());
}
protected void afterExecute(Runnable r, Throwable t){
try{
long endTime = System.nanoTime();
long taskTime = endTime - startTime.get();
numTasks.addAndGet(taskTime);
log.fine(String.format("Thread %s:end %s, time = %dns",t,r,taskTime));
}finally{
super.afterExecute(r,t);
}
}
protected void terminated(){
try{
log.info(String.format("Terminated: avg time=%dns",totalTime.get()/numTasks.get()));
}finally{
super.terminated();
}
}