概要
Collections FrameworkのメンバーのインターフェースQueue
と、そのサブインターフェースBlockingQueue
、TransferQueue
のおさらいメモです。
特にBlockingQueueの実装クラスLinkedBlockingQueue
とTransferQueueの実装クラスLinkedTransferQueue
についてサンプルコードを書いて確認しました。
インタフェースBlockingQueueやTransferQueueの実装クラスを利用すると、Producer-Consumerパターンの実装が簡単になります。
[BlockingQueue] (https://docs.oracle.com/javase/jp/11/docs/api/java.base/java/util/concurrent/BlockingQueue.html)のJavaDocより引用
BlockingQueueの実装は、主にプロデューサとコンシューマの間のキューで使用するように設計されていますが、加えてCollectionインタフェースもサポートします。
BlockingQueueは、複数のプロデューサおよび複数のコンシューマで安全に使用できることに注意してください。
環境
- Windows 10 Professional
- OpenJDK 11.0.2
参考
- [Collections Frameworkの概要] (https://docs.oracle.com/javase/jp/11/docs/api/java.base/java/util/doc-files/coll-reference.html)
- [java.util.concurrent 第 1 回 並行コレクションによるマルチスレッド・プログラミング] (https://www.ibm.com/developerworks/jp/java/library/j-5things4.html)
- [java.util.concurrent 第 2 回 並行プログラミングとはスマートな作業であり、困難な作業ではありません] (https://www.ibm.com/developerworks/jp/java/library/j-5things5.html)
おさらい
[インタフェースQueue<E>] (https://docs.oracle.com/javase/jp/11/docs/api/java.base/java/util/Queue.html)
導入バージョン : 1.5
基本的なCollection操作に加えて、キューは追加の挿入、抽出および検査操作を提供します。これらのメソッドにはそれぞれ、2つの形式があります。1つは操作が失敗したときに例外をスローし、もう1つは特殊な値(操作に応じてnullまたはfalseのいずれか)を返します。
キュー操作メソッド(失敗時に例外をスロー)
操作 | メソッドのシグネチャ | 失敗時にスローする例外 |
---|---|---|
挿入 | boolean add(E e) | IllegalStateException |
取得・削除 | E remove() | NoSuchElementException |
取得 | E element() | NoSuchElementException |
add
キューに要素を追加した場合はtrue、使用可能な空き領域がない場合はIllegalStateExceptionをスロー
remove
キューの先頭を取得および削除、キューが空の場合はNoSuchElementExceptionをスロー
element
キューの先頭を取得、キューが空の場合はNoSuchElementExceptionをスロー
キュー操作メソッド(特殊な値を返す)
操作 | メソッドのシグネチャ | 返される値 |
---|---|---|
挿入 | boolean offer(E e) | キューに追加できなかった場合はfalse |
取得・削除 | E poll() | キューが空の場合はnull |
取得 | E peek() | キューが空の場合はnull |
offer
キューに要素を追加した場合はtrue、それ以外はfalse
poll
キューの先頭を取得および削除、キューが空の場合はnullを返す
peek
キューの先頭を取得、キューが空の場合はnullを返す
[インタフェースBlockingQueue<E>] (https://docs.oracle.com/javase/jp/11/docs/api/java.base/java/util/concurrent/BlockingQueue.html)
導入バージョン : 1.5
要素の取得時にキューが空でなくなるまで待機したり、要素の格納時にキュー内に空きが生じるまで待機する操作を追加でサポートしたりするQueueです。
ブロッキングする操作
操作 | メソッドのシグネチャ |
---|---|
挿入 | void put(E e) throws InterruptedException |
取得・削除 | E take() throws InterruptedException |
取得 | - |
put
キューに要素を追加、キューに空きが生じるまで待機、待機中に割込みが発生した場合はInterruptedExceptionをスロー
take
キューの先頭を取得および削除、要素が取得できるまで待機、待機中に割込みが発生した場合はInterruptedExceptionをスロー
タイムアウトする操作
操作 | メソッドのシグネチャ |
---|---|
挿入 | boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException |
取得・削除 | E poll(long timeout, TimeUnit unit) throws InterruptedException |
取得 | - |
offer
キューを要素に挿入、指定された待機時間までキューに空きが生じるのを待機
poll
キューの先頭を取得および削除、指定された待機時間まで要素が利用可能になるのを待機
キューの空き容量を確認
操作 | メソッドのシグネチャ |
---|---|
空き容量の確認 | int remainingCapacity() |
remainingCapacity
キューがブロックせずに受け入れることができる追加要素の数を返します。組込み制限が存在しない場合はInteger.MAX_VALUEを返します。
[クラスLinkedBlockingQueue<E>] (https://docs.oracle.com/javase/jp/11/docs/api/java.base/java/util/concurrent/LinkedBlockingQueue.html)
導入バージョン : 1.5
リンク・ノードに基づく、オプションで制限付きになるブロッキング・キューです。 このキューはFIFO (先入れ先出し)で要素を順序付けします。 このキューの先頭は、キューに入っていた時間がもっとも長い要素です。
[クラスPriorityBlockingQueue<E>] (https://docs.oracle.com/javase/jp/11/docs/api/java.base/java/util/concurrent/PriorityBlockingQueue.html)
導入バージョン : 1.5
クラスPriorityQueueと同じ順序付けルールを使用するとともにブロッキング取得オペレーションを提供する、制限なしのブロッキング・キューです。
コンストラクタ
コンストラクタにキューの順序付けを行うコンパレータを渡します。
public PriorityBlockingQueue(int initialCapacity,
Comparator<? super E> comparator)
public PriorityBlockingQueue(Collection<? extends E> c)
[インタフェースTransferQueue<E>] (https://docs.oracle.com/javase/jp/11/docs/api/java.base/java/util/concurrent/TransferQueue.html)
導入バージョン : 1.7
コンシューマが要素を受け取るまでプロデューサが待機するBlockingQueue。
ブロッキングする操作
操作 | メソッドのシグネチャ |
---|---|
挿入 | void transfer(E e) throws InterruptedException |
transfer
要素の受信を待機しているコンシューマがあれば直ちに転送、それ以外の場合はコンシューマに受信されるまで待機
タイムアウトしない操作
操作 | メソッドのシグネチャ |
---|---|
挿入 | boolean tryTransfer(E e) |
tryTransfer
要素の受信を待機しているコンシューマがあれば直ちに転送、それ以外の場合はfalseを返す
タイムアウトする操作
操作 | メソッドのシグネチャ |
---|---|
挿入 | boolean tryTransfer(E e, long timeout, TimeUnit unit) throws InterruptedException |
tryTransfer
要素の受信を待機しているコンシューマがあれば直ちに転送、それ以外の場合は指定された待機時間までコンシューマに受信されるのを待機、指定された待機時間が経過した場合はfalseを返す
待機するコンシューマを確認
操作 | メソッドのシグネチャ |
---|---|
待機コンシューマの確認 | boolean hasWaitingConsumer() |
待機コンシューマの確認 | int getWaitingConsumerCount() |
hasWaitingConsumer
BlockingQueue.takeまたはpollで要素の受信を待機しているコンシューマが1つ以上あればtrueを返す
getWaitingConsumerCount
BlockingQueue.takeまたはpollで要素の受信を待機しているコンシューマの推定数を返す
[クラスLinkedTransferQueue<E>] (https://docs.oracle.com/javase/jp/11/docs/api/java.base/java/util/concurrent/LinkedTransferQueue.html)
導入バージョン : 1.7
リンク・ノードに基づく、制限なしのTransferQueueです。 このキューは、指定された任意のプロデューサに関して、FIFO (先入れ先出し)で要素を順序付けします。 キューの先頭は、特定のプロデューサに関して、もっとも長い時間キューに入っていた要素です。
サンプルコード
LinkedBlockingQueueのサンプルコード
このサンプルは1プロデューサーがBlockingQueueに定期的に要素(このサンプルではInteger型の連番)を追加し、3コンシューマが定期的にBlockingQueueから要素を取り出すというものです。
無期限に実行するのではなくプロデューサーがキューに100回要素を追加したら終了するようになっています。
また、この記事の主題とは直接関係ありませんが、プロデューサー・コンシューマの実装にはScheduledExecutorServiceを利用しています。
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
public class BlockingDemo {
private final BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(10);
private final ScheduledExecutorService producer = Executors.newSingleThreadScheduledExecutor();
private final ScheduledExecutorService consumer = Executors.newScheduledThreadPool(3);
private final DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy/MM/dd HH:mm:ss.SSS");
public static void main(String[] args) throws Exception {
BlockingDemo demo = new BlockingDemo();
demo.execute();
}
void execute() throws Exception {
println("main start");
CountDownLatch doneSignal = new CountDownLatch(100);
// 2秒後に起動、以後1秒間隔で実行
println("create producer task");
producer.scheduleAtFixedRate(new ProducerTask(queue, doneSignal), 2, 1, TimeUnit.SECONDS);
println("create consumer task");
consumer.scheduleAtFixedRate(new ConsumerTask(queue, "1"), 20, 2, TimeUnit.SECONDS);
consumer.scheduleAtFixedRate(new ConsumerTask(queue, "2"), 20, 3, TimeUnit.SECONDS);
consumer.scheduleAtFixedRate(new ConsumerTask(queue, "3"), 20, 3, TimeUnit.SECONDS);
doneSignal.await();
shutdown(producer);
shutdown(consumer);
println("main end");
}
class ProducerTask implements Runnable {
private final BlockingQueue<Integer> queue;
private final CountDownLatch doneSignal;
private final AtomicInteger counter = new AtomicInteger(0);
private final DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy/MM/dd HH:mm:ss.SSS");
ProducerTask(BlockingQueue<Integer> queue, CountDownLatch doneSignal) {
this.queue = queue;
this.doneSignal = doneSignal;
}
@Override
public void run() {
try {
Integer e = Integer.valueOf(counter.incrementAndGet());
queue.put(e);
System.out.println(String.format("[%s] producer -> [%3d]", formatter.format(LocalDateTime.now()), e));
doneSignal.countDown();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
class ConsumerTask implements Runnable {
private final BlockingQueue<Integer> queue;
private final String name;
private final DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy/MM/dd HH:mm:ss.SSS");
ConsumerTask(BlockingQueue<Integer> queue, String name) {
this.queue = queue;
this.name = name;
}
@Override
public void run() {
try {
//Integer e = queue.take();
Integer e = queue.poll(1, TimeUnit.SECONDS);
if (e != null) {
System.out.println(String.format("[%s] [%3d] <- consumer(%s)", formatter.format(LocalDateTime.now()), e, name));
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
void shutdown(ScheduledExecutorService service) {
println("shutdown start");
// Disable new tasks from being submitted
service.shutdown();
try {
// Wait a while for existing tasks to terminate
if (!service.awaitTermination(60, TimeUnit.SECONDS)) {
// Cancel currently executing tasks
service.shutdownNow();
// Wait a while for tasks to respond to being cancelled
if (!service.awaitTermination(60, TimeUnit.SECONDS)) {
println("Pool did not terminate");
}
}
} catch (InterruptedException e) {
System.err.println(e);
// (Re-)Cancel if current thread also interrupted
service.shutdownNow();
// Preserve interrupt status
Thread.currentThread().interrupt();
}
println("shutdown end");
}
void println(String message) {
System.out.println(String.format("[%s] %s", formatter.format(LocalDateTime.now()), message));
}
}
- shutdownメソッドの処理は[インタフェースExecutorService] (https://docs.oracle.com/javase/jp/11/docs/api/java.base/java/util/concurrent/ExecutorService.html)の使用例を参照しました。
コンストラクタ
コンストラクタでキューの容量を決めることができます。
このサンプルではキューに追加できる要素の最大数を10に設定しています。
private final BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(10);
容量を明示しないときはInteger.MAX_VALUEが指定されたのと同じになります。
private final BlockingQueue<String> queue = new LinkedBlockingQueue<String>();
キューに要素を挿入する
キューに空きができるまで無期限にブロッキングするputメソッドを利用しています。
putメソッドのブロッキング中に割り込みが入るとInterruptedExceptionが発生します。
try {
Integer e = Integer.valueOf(counter.incrementAndGet());
queue.put(e);
doneSignal.countDown();
} catch (InterruptedException e) {
e.printStackTrace();
}
キューから要素を取り出す
キューから要素を取り出せるまで1秒間待機するpollメソッドを利用しています。コメントアウトしていますが、takeメソッドを使うと取り出せるまで無期限にブロッキングします。
try {
//Integer e = queue.take();
Integer e = queue.poll(1, TimeUnit.SECONDS);
if (e != null) {
// do something
}
} catch (InterruptedException e) {
e.printStackTrace();
}
LinkedTransferQueueのサンプルコード
このサンプルは1プロデューサーがTransferQueueに定期的に要素(このサンプルではInteger型の連番)を追加し、3コンシューマが定期的にTransferQueueから要素を取り出すというものです。
このサンプルもプロデューサーがキューに100回要素を追加したら終了するようになっています。
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedTransferQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TransferQueue;
import java.util.concurrent.atomic.AtomicInteger;
public class TransferDemo {
private final TransferQueue<Integer> queue = new LinkedTransferQueue<>();
private final ScheduledExecutorService producer = Executors.newSingleThreadScheduledExecutor();
private final ScheduledExecutorService consumer = Executors.newScheduledThreadPool(3);
private final DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy/MM/dd HH:mm:ss.SSS");
public static void main(String[] args) throws Exception {
TransferDemo demo = new TransferDemo();
demo.execute();
}
void execute() throws Exception {
println("main start");
CountDownLatch doneSignal = new CountDownLatch(100);
println("create producer task");
producer.scheduleAtFixedRate(new ProducerTask(queue, doneSignal), 2, 1, TimeUnit.SECONDS);
println("create consumer task");
consumer.scheduleAtFixedRate(new ConsumerTask(queue, "1"), 10, 3, TimeUnit.SECONDS);
consumer.scheduleAtFixedRate(new ConsumerTask(queue, "2"), 10, 3, TimeUnit.SECONDS);
consumer.scheduleAtFixedRate(new ConsumerTask(queue, "3"), 10, 3, TimeUnit.SECONDS);
doneSignal.await();
shutdown(producer);
shutdown(consumer);
println("main stop");
}
class ProducerTask implements Runnable {
private final TransferQueue<Integer> queue;
private final CountDownLatch doneSignal;
private final AtomicInteger counter = new AtomicInteger(0);
private final DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy/MM/dd HH:mm:ss.SSS");
ProducerTask(TransferQueue<Integer> queue, CountDownLatch doneSignal) {
this.queue = queue;
this.doneSignal = doneSignal;
}
@Override
public void run() {
try {
Integer e = Integer.valueOf(counter.incrementAndGet());
queue.transfer(e);
System.out.println(String.format("[%s] producer -> [%3d]", formatter.format(LocalDateTime.now()), e));
doneSignal.countDown();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
class ConsumerTask implements Runnable {
private final TransferQueue<Integer> queue;
private final String name;
private final DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy/MM/dd HH:mm:ss.SSS");
ConsumerTask(TransferQueue<Integer> queue, String name) {
this.queue = queue;
this.name = name;
}
@Override
public void run() {
try {
// Integer e = queue.take();
Integer e = queue.poll(1, TimeUnit.SECONDS);
if (e != null) {
System.out.println(String.format("[%s] [%3d] <- consumer(%s)", formatter.format(LocalDateTime.now()), e, name));
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
void shutdown(ScheduledExecutorService service) {
// Disable new tasks from being submitted
service.shutdown();
try {
// Wait a while for existing tasks to terminate
if (!service.awaitTermination(60, TimeUnit.SECONDS)) {
// Cancel currently executing tasks
service.shutdownNow();
// Wait a while for tasks to respond to being cancelled
if (!service.awaitTermination(60, TimeUnit.SECONDS)) {
System.err.println("Pool did not terminate");
}
}
} catch (InterruptedException e) {
System.err.println(e);
// (Re-)Cancel if current thread also interrupted
service.shutdownNow();
// Preserve interrupt status
Thread.currentThread().interrupt();
}
System.out.println("shutdown");
}
void println(String message) {
System.out.println(String.format("[%s] %s", formatter.format(LocalDateTime.now()), message));
}
}
コンストラクタ
容量に制限はありません。
private final TransferQueue<Integer> queue = new LinkedTransferQueue<>();
キューに要素を挿入する
要素がコンシューマに受信されるまで無期限にブロッキングするtransferメソッドを利用しています。
transferメソッドのブロッキング中に割り込みが入るとInterruptedExceptionが発生します。
try {
Integer e = Integer.valueOf(counter.incrementAndGet());
queue.transfer(e);
doneSignal.countDown();
} catch (InterruptedException e) {
e.printStackTrace();
}
キューから要素を取り出す
キューから要素を取り出せるまで1秒間待機するpollメソッドを利用しています。コメントアウトしていますが、takeメソッドを使うと取り出せるまで無期限にブロッキングします。
try {
// Integer e = queue.take();
Integer e = queue.poll(1, TimeUnit.SECONDS);
if (e != null) {
// do something
}
} catch (InterruptedException e) {
e.printStackTrace();
}