#はじめに
バッチ処理などで、時間のかかる似たような処理をマルチスレッドを使用して、
時間短縮させたいことありますよね。
例えば、単純化して考えるために、
1処理5分かかるものを10処理することを考えます。
1スレッドだと 5 * 10 = 50分かかりますが、
10スレッドだと5分で終わりそうな気がします。
そういった場合にスレッドセーフな処理で記述するというのは、
なかなか骨が折れます。
次回以降、似たような場面で心理的負荷を下げるために、
鋳型のようなものを作成しておきたいと思って書きました。
##実装例
とりあえずこの形ならスレッドセーフに書けるという大雑把な枠組みです。
欲しい情報の肝以外は省略しているつもりなので、
自分の必要な処理を埋め込んでいくと、
完成するような「鋳型」をイメージしています。
MultiThreadExecute.java
/**
* マルチスレッド化するクラス
* ここでスレッドを作成し実行させる
*/
public class MultiThreadExecute {
public static void main(String[] args) {
MultiThreadExecute logic = new MultiThreadExecute();
int status = logic.run();
}
/** スレッド作成数 */
private static const THREAD_COUNT = 5;
protected int run() {
// 処理対象データ取得
int dataSize = ...(省略)...;
// 並行実行処理
// 終端処理を含めるためスレッド数分領域を増やしておく
final BlockingQueue<DataDto> taskQueue =
new LinkedBlockingQueue<DataDto>(dataSize + THREAD_COUNT);
// キューに追加
for (DataDto data : (省略。どこかから取得)) {
taskQueue.add(data);
}
final OnExecListener listener = new OnExecListener();
final ExecutorService executor = Executors.newFixedThreadPool(THREAD_COUNT);
for (int i = 0; i < THREAD_COUNT; i++) {
// 終端ということが分かるようにしておく
taskQueue.add(空のDataDtoなどを入れる);
// 子スレッドの実行
final LogicTask task = new LogicTask(taskQueue, listener);
executor.execute(task);
}
// スレッドの終了を待機
executor.shutdown();
while (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
}
// 処理件数などは以下のように取得
// listener.getTotalCount()
}
}
DataDto.java
public class DataDto {
// 処理をするために渡す引数を格納
}
Listener.java
/**
* 処理終了検知
*/
private static interface Listener {
/**
* 処理後に欲しい情報を設計し、作成する
* interfaceをきっちり定義することで、
* 欲しい情報を柔軟に対応できるようにする
*/
void execute();
}
OnExecListener.java
/**
* 処理終了リスナー
*/
private static class OnExecListener implements Listener {
private int totalCount = 0;
@Override
public void execute(){
synchronized (this) {
totalCount++;
}
}
synchronized int getTotalCount() {
return totalCount;
}
}
LogicTask.java
private static class LogicTask implements Runnable {
private final BlockingQueue<DataDto> taskQueue;
private final Listener listener;
/**
* コンストラクタ
*/
LogicTask(BlockingQueue<DataDto> taskQueue, Listener listener) {
this.taskQueue = taskQueue;
this.listener = listener;
}
/**
* タスクの実行
*/
@Override
public void run() {
try {
while (true) {
if (終了判定) {
break;
}
final DataDto data = taskQueue.take();
// 時間のかかる処理を実行
exec(data);
}
} catch (// 省略) {
// 省略
}
}
private void exec(DataDto data){
// 省略
return;
}
}
#思ったこと
- スレッドセーフにマルチスレッド処理を記述するのは気を遣う
- 自分なりの「鋳型」を増やしていくことで1回きりにしない!