Help us understand the problem. What is going on with this article?

スレッド数がタスク状態によって増減し、スレッド数に上限を設定したExecutorServiceを作りたい

Javaの世界ではThreadクラスを使うことで手軽にスレッドを作れます。しかし、ライフサイクル等を適切に管理するのは難しいため、Executor等を使うことを推奨されています。

Effective Javaの項目68に「スレッドよりエグゼキューターとタスクを選ぶ」とあります。

さて、Executor(実際にはExecutorServiceScheduledExecutorServiceのインターフェースを使うことになると思います)のインスタンスが必要な場合は、Executorsにファクトリーメソッドが用意されているのでこれを経由して使うのがお手軽で、ほとんどの用途ではこれを利用するだけで解決してしまいます。

ここではExecutorsでは提供されていない、「スレッド数がタスク状態によって増減し、スレッド数に上限を設定したExecutorService」の作り方を説明します。
私の試行錯誤をなぞらえる形で説明していくので、結局どうすればいいのよ?だけ必要な人は一番最後の章までスクロールしてください。

ExecutorServiceのファクトリーメソッド

ScheduledExecutorServiceは置いておいて、ExecutorServiceのインスタンスを作るファクトリーメソッドとしては以下のようなものがあります。

public static ExecutorService newSingleThreadExecutor()
public static ExecutorService newFixedThreadPool(int nThreads)
public static ExecutorService newCachedThreadPool()
SingleThreadExecutor
単一のスレッドでタスクを実行する。タスクをシーケンシャルに実行したい場合などに利用
FixedThreadPool
指定された固定数のスレッドでタスクを実行する。演算処理の並列化などに利用。
CachedThreadPool
必要に応じてスレッドが作られ、作成されたスレッドは一定時間キャッシュされ使い回される、しばらく使われていないスレッドは終了する。並列度の要求が時間とともに変化したり、待ち時間の多いIO処理などに利用

ファクトリーメソッドの実装

これらのファクトリーメソッドの実装をみてみましょう。

public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}
public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}
public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}

まず、SingleThreadExecutor では FinalizableDelegatedExecutorService でラップされているのが気になりますが、実装をみてみると

static class FinalizableDelegatedExecutorService
    extends DelegatedExecutorService {
    FinalizableDelegatedExecutorService(ExecutorService executor) {
        super(executor);
    }
    protected void finalize() {
        super.shutdown();
    }
}

finalize()shutdown()をコールするようにしただけのラップクラスのようですね。
しかし、なぜSingleThreadExecutorだけ?本題ではないのでおいておきます。

いずれのExecutorもThreadPoolExecutorクラスを使っていることがわかります。パラメータの違いで動作が変わるようです。

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
         Executors.defaultThreadFactory(), defaultHandler);
}

スレッド数がタスク状態によって増減し、スレッド数に上限を設定したExecutorServiceを作りたい

やっと本題、基本的にIO待ちなどが発生するようなタスクを多数実行しようと思うと、CachedThreadPoolを使うことになるかなと思います。
しかし、前に書いた定義を見ると、スレッド数の上限がInteger.MAX_VALUEになっています。
IO待ちのスレッドの場合、CPUは空いているためその分別のスレッドを立ち上げる意味はあります。
しかし、当然スレッド数に応じてメモリなどのコストもそれなりにかかりますので、あまりたくさんのスレッドが立ち上がるのは避けたい場合もあるでしょう。

であればFixedThreadPoolか?となりますが、ピーク時の並列度最大値はある程度欲しいのだが、いらないときはスレッド数を減らしてほしい……そういう要求は普通にあるかと思います。
というわけで、CachedThreadPoolのスレッド数上限設定版がほしくなります。

ThreadPoolExecutorの引数を変えてみる

ちょっと引数を変えればいけるやろ?

CachedThreadPoolがこうだから……

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}

Integer.MAX_VALUEを変更しちゃえばええやろ

new ThreadPoolExecutor(0, 4, 60L, TimeUnit.SECONDS,
                       new SynchronousQueue<Runnable>());

第3第4引数がスレッドのキャッシュ時間だからここ変更してテストしよう。

テスト

@Test
public void test() throws Exception {
    final int poolSize = 4;
    final ThreadPoolExecutor executor = new ThreadPoolExecutor(0, poolSize,
            1L, TimeUnit.SECONDS,
            new SynchronousQueue<>());
    final CountDownLatch endLatch = new CountDownLatch(poolSize);
    for (int i = 0; i < poolSize; i++) {
        final CountDownLatch startLatch = new CountDownLatch(1);
        executor.execute(() -> {
            startLatch.countDown();
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            endLatch.countDown();
        });
        startLatch.await();
        assertThat(executor.getActiveCount(), is(i + 1));
        assertThat(executor.getPoolSize(), is(i + 1));
    }
    endLatch.await();
    Thread.sleep(100);
    assertThat(executor.getActiveCount(), is(0));
    assertThat(executor.getPoolSize(), is(poolSize));
    Thread.sleep(1000);
    assertThat(executor.getActiveCount(), is(0));
    assertThat(executor.getPoolSize(), is(0));
}

積んだ分だけのスレッドが増えていき、タスクが完了するとactiveではないスレッドが残る、キャッシュ時間を超えるとプールされていたスレッドも終了する。
うんうん、いいぞ

次はちゃんと並列度が上限内に収まっていることを確認しましょうかね。

@Test
public void test2() throws Exception {
    final int poolSize = 4;
    final ThreadPoolExecutor executor = new ThreadPoolExecutor(0, poolSize,
            1L, TimeUnit.SECONDS,
            new SynchronousQueue<>());
    final CountDownLatch endLatch = new CountDownLatch(poolSize);
    final CountDownLatch startLatch = new CountDownLatch(poolSize);
    for (int i = 0; i < poolSize + 1; i++) {
        executor.execute(() -> {
            startLatch.countDown();
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
    }
    startLatch.await();
    assertThat(executor.getActiveCount(), is(poolSize));
    assertThat(executor.getPoolSize(), is(poolSize));
}
java.util.concurrent.RejectedExecutionException: Task example.ThreadTest$$Lambda$1/739498517@311d617d rejected from java.util.concurrent.ThreadPoolExecutor@7c53a9eb[Running, pool size = 4, active threads = 4, queued tasks = 0, completed tasks = 0]

……なん……だと……

ちょっと考え直そう

FixedThreadPoolの方をもう一度みると、最後の引数が違う。

CachedThreadPoolはSynchronousQueueを指定していた。
よくわからず使っていたけどSynchronousQueueってどう言うものか知らないで使ってた。
説明をみると、容量ゼロのブロッキングキューで読み出そうとしているスレッドがいる場合にのみofferが成功するというキューだった。

そうか、スレッド数の上限なしだからそもそもキューに容量はいらないんだな。

つまり、FixedThreadPoolで使われているLinkedBlockingQueue使えば……

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}
java.lang.AssertionError: 
Expected: is <2>
     but: was <1>
Expected :is <2>

Actual   :<1>

orz
今度はスレッドが増えていかないよ

勘だけで使おうとするなよ

ThreadPoolExecutorのJavadocにちゃんと説明がかかれてるやん

コアおよび最大プール・サイズ
ThreadPoolExecutorは、corePoolSize (getCorePoolSize()を参照)とmaximumPoolSize (getMaximumPoolSize()を参照)によって設定された境界に従って、プール・サイズ(getPoolSize()を参照)を自動的に調整します。新しいタスクがexecute(Runnable)メソッドで送信され、corePoolSizeより少ないスレッドが実行されている場合は、他のワーカー・スレッドがアイドル状態であっても、要求を処理するために新しいスレッドが作成されます。corePoolSizeよりも多く、maximumPoolSizeよりも少ない数のスレッドが実行中である場合、新しいスレッドが作成されるのはキューがいっぱいである場合だけです。corePoolSizeとmaximumPoolSizeを同じ値に設定すると、固定サイズのスレッド・プールが作成されます。maximumPoolSizeをInteger.MAX_VALUEなどの実質的にアンバウンド形式である値に設定すると、プールに任意の数の並行タスクを格納することができます。コア・プール・サイズと最大プール・サイズは構築時にのみ設定されるのがもっとも一般的ですが、setCorePoolSize(int)およびsetMaximumPoolSize(int)を使用して動的に変更することもできます。

実装もみてみよう

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    /*
     * Proceed in 3 steps:
     *
     * 1. If fewer than corePoolSize threads are running, try to
     * start a new thread with the given command as its first
     * task.  The call to addWorker atomically checks runState and
     * workerCount, and so prevents false alarms that would add
     * threads when it shouldn't, by returning false.
     *
     * 2. If a task can be successfully queued, then we still need
     * to double-check whether we should have added a thread
     * (because existing ones died since last checking) or that
     * the pool shut down since entry into this method. So we
     * recheck state and if necessary roll back the enqueuing if
     * stopped, or start a new thread if there are none.
     *
     * 3. If we cannot queue task, then we try to add a new
     * thread.  If it fails, we know we are shut down or saturated
     * and so reject the task.
     */
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    else if (!addWorker(command, false))
        reject(command);
}

ThreadPoolExecutorのキューとスレッドプールの関係

簡単に整理すると

  • プールサイズがcorePoolSizeより小さければ状態によらず新しいスレッドが作られる
  • プールサイズがcorePoolSize以上ならタスクはキューに送られる
    • キューに入れられた場合、
      • プールサイズがゼロなら新しいスレッドが作られる
      • プールサイズが1以上ならスレッドは作られない
    • キューに入れられなかった場合、新しいスレッドが作られる。プールサイズがmaximumPoolSizeに達している場合など、新しいスレッドを作れなければrejectされる。

つまり、corePoolSizeを0、maximumPoolSizeを有効な上限値として指定したい場合、

  • SynchronousQueueを使うと、maximumPoolSizeに至るまではスレッド数が増えていくが、アクティブなスレッドがmaximumPoolSizeを超えた状態でタスクを積むとrejectされてしまう。
  • LinkedBlockingQueueを使うと、最初の一つのスレッドが作られたあとはスレッドが作られることがない

ということになる。

期待値の整理と対応するキューとは

作りたいExecutorは、

  • アイドル状態のスレッドがいればそのスレッドでタスクを実行する
  • アイドル状態のスレッドが存在せず、プールサイズが最大スレッド数未満の場合は新しいスレッドが作られる
  • アイドル状態のスレッドが存在せず、プールサイズが最大スレッド数の場合は、タスクはキューに積まれる

ThreadPoolExecutorを利用するなら、必要となるキューの特性は

  • 読み出そうとしているスレッドがあればofferが成功する
  • 読み出そうとしているスレッドはないが、プールサイズが最大スレッド数未満の場合はofferが失敗する
  • 読み出そうとしているスレッドはなく、プールサイズが最大スレッド数の場合はofferが成功する

という、ThreadPoolExecutorの状態に応じて振る舞いが変わるキューと言うことになる。
作るとなると、ExecutorとQueue両方に手を入れた上、相互参照させて、と、密結合でやな感じですね。

ただ、ThreadPoolExecutorのコンストラクタには、もう一つの引数として、RejectedExecutionHandlerを指定できるものが存在します。

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          RejectedExecutionHandler handler) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
         Executors.defaultThreadFactory(), handler);
}

RejectedExecutionHandlerはその名の通り、reject時にvoid rejectedExecution(Runnable r, ThreadPoolExecutor executor);が呼び出されるハンドラです。
デフォルトでは単にExceptionを投げるだけのハンドラが設定されています。
これが利用できそうですね。
つまり、BlockingQueue<Runnable>RejectedExecutionHandlerの両方のインターフェースを実装し、

  • 読み出そうとしているスレッドがあればofferが成功する
  • 読み出そうとしているスレッドがなければofferが失敗する
  • rejectedExecutionが呼ばれたときはキューに積み上げる

という動作をさせればよいです。

スレッド数がタスク状態によって増減し、スレッド数に上限を設定したExecutorService

これまでの考察を踏まえ作ったクラスが以下になります。Javaで書いたのですが、デリゲートにボイラープレートコードがめちゃくちゃ必要だったのでここで貼り付けられるようKotlinで書き直してます。

class ThreadWorkQueue(
        private val delegate: BlockingQueue<Runnable> = LinkedBlockingQueue()
) : BlockingQueue<Runnable> by delegate,
        RejectedExecutionHandler {
    private val idleThreads = AtomicInteger(0)

    override fun offer(runnable: Runnable): Boolean {
        return if (idleThreads.get() == 0) {
            false
        } else delegate.offer(runnable)
    }

    override fun take(): Runnable {
        idleThreads.incrementAndGet()
        try {
            return delegate.take()
        } finally {
            idleThreads.decrementAndGet()
        }
    }

    override fun poll(timeout: Long, unit: TimeUnit): Runnable? {
        idleThreads.incrementAndGet()
        try {
            return delegate.poll(timeout, unit)
        } finally {
            idleThreads.decrementAndGet()
        }
    }

    override fun rejectedExecution(r: Runnable, executor: ThreadPoolExecutor) {
        if (executor.isShutdown) {
            throw RejectedExecutionException("Task $r rejected from $executor")
        }
        delegate.offer(r)
    }
}

このクラスのインスタンスをThreadPoolExecutorにworkQueueとhandlerとして渡すことで所望の動作のExecutorServiceを作ることができました。
最終的に作ったファクトリーメソッドはこんな感じです。スレッド数の上限は利用可能なプロセッサ数、ただし最低は2スレッドとするという形で指定しています。
ここは実行させたいタスクの特性に応じていろいろ設定の仕方があると思います。

private static ExecutorService createParallelExecutor() {
    final ThreadWorkQueue queue = new ThreadWorkQueue();
    return new ThreadPoolExecutor(0, calculateMaximumPoolSize(),
            1L, TimeUnit.MINUTES, queue, queue);
}

private static int calculateMaximumPoolSize() {
    return Math.max(2, Runtime.getRuntime().availableProcessors());
}

以上、「スレッド数がタスク状態によって増減し、スレッド数に上限を設定したExecutorService」の作り方でした。

正直、所望の動作をするExecutorServiceは作れましたが、既存のExecutorsのファクトリメソッドで作るものと比較して、どれだけ効率的なのかとかの評価はできていません。
Executorの利用が推奨されるのは、難しいスレッドの管理を十分にテストされ、実績も十分な既存ライブラリに任せられる、というところが大きいので、下手に弄ってしまうとそのメリットが消し飛んでしまいます。
どうしても必要というのでなければExecutorsのファクトリーメソッドを使った方がよいとは思います。

ryo_mm2d
Yahoo! JAPANでAndroidアプリを作っています。趣味はソフト開発!結果にコミットするよりGithubにコミットする生活を送っています。 https://github.com/ohmae/curriculum-vitae
https://www.mm2d.net/
yahoo-japan-corp
Yahoo! JAPAN を運営しています。
https://www.yahoo.co.jp
Why not register and get more from Qiita?
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away
Comments
No comments
Sign up for free and join this conversation.
If you already have a Qiita account
Why do not you register as a user and use Qiita more conveniently?
You need to log in to use this function. Qiita can be used more conveniently after logging in.
You seem to be reading articles frequently this month. Qiita can be used more conveniently after logging in.
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away
ユーザーは見つかりませんでした