0
2

More than 3 years have passed since last update.

[Java]鋳型にはめるマルチスレッド

Last updated at Posted at 2020-01-29

はじめに

バッチ処理などで、時間のかかる似たような処理をマルチスレッドを使用して、
時間短縮させたいことありますよね。

例えば、単純化して考えるために、
1処理5分かかるものを10処理することを考えます。
1スレッドだと 5 * 10 = 50分かかりますが、
10スレッドだと5分で終わりそうな気がします。

そういった場合にスレッドセーフな処理で記述するというのは、
なかなか骨が折れます。
次回以降、似たような場面で心理的負荷を下げるために、
鋳型のようなものを作成しておきたいと思って書きました。

実装例

とりあえずこの形ならスレッドセーフに書けるという大雑把な枠組みです。
欲しい情報の肝以外は省略しているつもりなので、
自分の必要な処理を埋め込んでいくと、
完成するような「鋳型」をイメージしています。

MultiThreadExecute.java
/**
* マルチスレッド化するクラス
* ここでスレッドを作成し実行させる
*/
public class MultiThreadExecute {
    public static void main(String[] args) {
        MultiThreadExecute logic = new MultiThreadExecute();
        int status = logic.run();
    }

    /** スレッド作成数 */
    private static const THREAD_COUNT = 5;

    protected int run() {

        // 処理対象データ取得
        int dataSize = ...(省略)...;

        // 並行実行処理
        // 終端処理を含めるためスレッド数分領域を増やしておく
        final BlockingQueue<DataDto> taskQueue =
            new LinkedBlockingQueue<DataDto>(dataSize + THREAD_COUNT);

        // キューに追加
        for (DataDto data : (省略どこかから取得)) {
            taskQueue.add(data);
        }

        final OnExecListener listener = new OnExecListener();
        final ExecutorService executor = Executors.newFixedThreadPool(THREAD_COUNT);
        for (int i = 0; i < THREAD_COUNT; i++) {
            // 終端ということが分かるようにしておく
            taskQueue.add(空のDataDtoなどを入れる);

            // 子スレッドの実行
            final LogicTask task = new LogicTask(taskQueue, listener);
            executor.execute(task);
        }

        // スレッドの終了を待機
        executor.shutdown();
        while (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
        }

        // 処理件数などは以下のように取得
        // listener.getTotalCount()
    }
}
DataDto.java
public class DataDto {
    // 処理をするために渡す引数を格納
}
Listener.java
/**
 * 処理終了検知
 */
private static interface Listener {
    /**
    * 処理後に欲しい情報を設計し、作成する
    * interfaceをきっちり定義することで、
    * 欲しい情報を柔軟に対応できるようにする
    */
    void execute();
}
OnExecListener.java
/**
 * 処理終了リスナー
 */
private static class OnExecListener implements Listener {

    private int totalCount = 0;

    @Override
    public void execute(){
        synchronized (this) {
            totalCount++;
        }
    }

    synchronized int getTotalCount() {
        return totalCount;
    }
}
LogicTask.java
private static class LogicTask implements Runnable {
    private final BlockingQueue<DataDto> taskQueue;
    private final Listener listener;

    /**
    * コンストラクタ
    */
    LogicTask(BlockingQueue<DataDto> taskQueue, Listener listener) {
        this.taskQueue = taskQueue;
        this.listener = listener;
    }

    /**
    * タスクの実行
    */
    @Override
    public void run() {
        try {
            while (true) {
                if (終了判定) {
                    break;
                }

                final DataDto data = taskQueue.take();

                // 時間のかかる処理を実行
                exec(data);
            }
        } catch (// 省略) {
            // 省略
        }
    }

    private void exec(DataDto data){
        // 省略
        return;
    }
}

思ったこと

  • スレッドセーフにマルチスレッド処理を記述するのは気を遣う
  • 自分なりの「鋳型」を増やしていくことで1回きりにしない!
0
2
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
2