LoginSignup
0
1

More than 1 year has passed since last update.

Javaで同時実行スレッド数を制御するコンポーネントを作ってみた

Posted at

訳あって同時実行数を制御する処理が必要そうなので・・・Javaで実現するとしたら「こんな感じかな〜」というのを作ってみました。完全に個人メモです。

もっと良いやり方や、OSSのこれ使えば同じことできるよ〜みたいなのあれば是非コメントを!!

やりたいこと

  • 同時に処理を行うスレッド数(=最大同時実行スレッド数)を指定できること
  • 「最大同時実行スレッド数」に達している際は空きがでるまでキューインングできること(+キュー数も指定できること)
  • キューイング状態で待つ最大時間(=無限に待たないようにタイムアウト)を指定できること
  • 「最大同時実行スレッド数」+「キュー数」を超える状態を検知したら拒否時の処理を実装できること

作ってみたコンポーネント

package com.example;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;

public class ConcurrencyControlTemplate {

  private final Semaphore semaphore;
  private final BlockingQueue<Thread> permittedThreads;

  private long queueingTimeout = 30000;
  private TimeUnit timeoutUnit = TimeUnit.MILLISECONDS;

  public ConcurrencyControlTemplate(int concurrency, int queueSize, boolean fair) {
    this.semaphore = new Semaphore(concurrency, fair);
    this.permittedThreads = new LinkedBlockingQueue<>(concurrency + queueSize);
  }

  public void setQueueingTimeout(long queueingTimeout) {
    this.queueingTimeout = queueingTimeout;
  }

  public void setTimeoutUnit(TimeUnit timeoutUnit) {
    this.timeoutUnit = timeoutUnit;
  }

  public void execute(Runnable task, Runnable dinedHandler) {
    executeWithResult(() -> {
      task.run();
      return null;
    }, () -> {
      dinedHandler.run();
      return null;
    });
  }

  public <T> T executeWithResult(Supplier<T> task, Supplier<T> dinedHandler) {
    boolean shouldRelease = false;
    Thread current = Thread.currentThread();
    T returnValue;
    if (!permittedThreads.offer(current)) {
      return dinedHandler.get();
    }
    try {
      if (semaphore.tryAcquire(queueingTimeout, timeoutUnit)) {
        shouldRelease = true;
        returnValue = task.get();
      } else {
        returnValue = dinedHandler.get();
      }
    } catch (InterruptedException e) {
      Thread.currentThread().interrupt();
      returnValue = dinedHandler.get();
    } finally {
      permittedThreads.remove(current);
      if (shouldRelease) {
        semaphore.release();
      }
    }
    return returnValue;
  }

}

作ったコンポーネントの使い方

ConcurrencyControlTemplate concurrencyControlTemplate = new ConcurrencyControlTemplate(
    8, // 同時実行を許可するスレッド数
    8, // 同時実行を許可するスレッド数を超えている際にキューイングするスレッド数
    true // 実行権を与える順番を厳密に制御するか否かを指定(true: 厳密に制御する、false: 厳密に制御しない)
);
concurrencyControlTemplate.setQueueingTimeout(10); // キューイング状態で待機を続ける最大時間(例:10秒) ※デフォルトは30秒
concurrencyControlTemplate.setTimeoutUnit(TimeUnit.SECONDS);

concurrencyControlTemplate.execute(() -> {
  System.out.println("同実行制御下で行う処理を実装する");

  // ...

}, () -> {
  System.out.println("キューを含む同時実行上限を超えた時の処理(=リクエスト拒否時に処理)を実装する");

  // ...

});

参考にしたもの

補足

APサーバや各種サーバ機能の設定で最大スレッド数やキュー数を指定できるものの、リジェクト時の動作が期待通りでなかったり・(思ったように)カスタマイズできないものが多かったので・・・自分で作ってみました。

0
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
1