2
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

LinkedBlockingQueueのdrainToは安全なのか、ソースを追った

2
Last updated at Posted at 2017-11-29

概要

業務で、LinkedBlockingQueueから、一時点の状態を取得する必要に迫られたので、drainToを使うことにしました。

drainToは、

このキューから利用可能なすべての要素を削除し、それらを指定されたコレクションに追加します。

というメソッドです。

ただ、ヘルプを見ると、

オペレーションの進行中に指定されたコレクションが変更された場合の、このオペレーションの動作は定義されていません。

掲載元

という、不安な一文があり、問題が無いかどうか調査しました。

2017/11/30追記
※コメントで、指定されたコレクションとは、引数で渡されたコレクションを指す、とご指摘いただきました。
よって、引数で渡されるコレクションがスレッドセーフであれば、問題ないようです。
ただし、以下の調査内容はそのまま残します。

ソース

LinkedBlockingQueue.java
/**
* @throws UnsupportedOperationException {@inheritDoc}
* @throws ClassCastException            {@inheritDoc}
* @throws NullPointerException          {@inheritDoc}
* @throws IllegalArgumentException      {@inheritDoc}
*/
public int drainTo(Collection<? super E> c, int maxElements) {
    if (c == null)
        throw new NullPointerException();
    if (c == this)
        throw new IllegalArgumentException();
    if (maxElements <= 0)
        return 0;
    boolean signalNotFull = false;
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lock();
    try {
        int n = Math.min(maxElements, count.get());
        // count.get provides visibility to first n Nodes
        Node<E> h = head;
        int i = 0;
        try {
            while (i < n) {
                Node<E> p = h.next;
                c.add(p.item);
                p.item = null;
                h.next = h;
                h = p;
                ++i;
            }
            return n;
        } finally {
            // Restore invariants even if c.add() threw
            if (i > 0) {
                // assert h.item == null;
                head = h;
                signalNotFull = (count.getAndAdd(-i) == capacity);
            }
        }
    } finally {
        takeLock.unlock();
        if (signalNotFull)
            signalNotFull();
    }
}

なお、実際に使っているのは、引数一つのdrainToですが、

LinkedBlockingQueue.java
return drainTo(c, Integer.MAX_VALUE);

と、オーバーロードしている同メソッドを呼び出しているだけなので、こちらを確認すれば問題ないです。

変数

takeLock

データの取り出しに、ロックをかけるための、ロックオブジェクトです。

putLock

上記ソースには出ませんが、データの追加にロックをかけるための、ロックオブジェクトです。

count

AtomicInteger型のフィールドです。
データ変更のたびに、クラス内で手動変更しているようです。
(単にQueue.sizeを返しているわけではない模様)

解説

必要な部分だけ、上から順に見ていきます。

  • ロック獲得
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();

データ取得を開始する前に、取得ロックを獲得します。
*追加ロックはいらないの?*思いましたが、下記処理によって不要となっています。

  • 件数取得
int n = Math.min(maxElements, count.get());

データ取得を始める前に、先頭から何件取得するか決めています。
こうすることにより、drainToの処理中に追加されるデータを無視することができるようになっています。

結論

  • データの取り出しは、drainToの中でロックを獲得しているため、安全に行われる
  • データの追加は、最初に取得データ件数を求めているため、drainToとは無関係に、安全に行われる
  • よって、LinkedBlockingQueue.drainToは、マルチスレッド環境下においても、安全に実行できる

と言えるようです。

余談(感想)

個人的には、

まとめてデータ取得するなら、その間の追加もロックかけるべきだろ!

と考えてしまいます。

「先にデータ件数を決めてしまう」というアプローチもあるのかと、正直感心しました。
すごくいい勉強になりました。

2
1
2

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?