79.過度な同期は避ける
- 過度な同期は、性能の劣化、デッドロック、非決定的な挙動を招く。
- 活性エラー(liveness failure)、安全性エラー(safety failure)を避けるためには、同期をとった処理の中でクライアントに処理させる権限を与えないようにするべき。
- 以下のプログラムは
ConcurrentModificationException
を出力する。
notifyElementAdded メソッドではobservers のイテレート処理を行い、その中で、SetObserverのaddedメソッドを実行し、observersに変更をかけようとしている。イテレート処理の最中のリストから要素を取り除くことはできないので、エラーが発生した。
package tryAny.effectiveJava;
import java.util.HashSet;
public class SynchroTest {
public static void main(String[] args) {
ObservableSet<Integer> set = new ObservableSet<>(new HashSet<>());
set.addObserver(new SetObserver<>() {
public void added(ObservableSet<Integer> s, Integer e) {
System.out.println(e);
if (e == 23) {
s.removeObserver(this);
}
}
});
for (int i = 0; i < 100; i++) {
set.add(i);
}
}
}
package tryAny.effectiveJava;
import java.util.Collection;
import java.util.Iterator;
import java.util.Set;
public class ForwardingSet<E> implements Set<E> {
private final Set<E> s;
public ForwardingSet(Set<E> s) {
this.s = s;
}
public void clear() {
s.clear();
}
public boolean contains(Object o) {
return s.contains(o);
}
public boolean isEmpty() {
return s.isEmpty();
}
public int size() {
return s.size();
}
public Iterator<E> iterator() {
return s.iterator();
}
public boolean add(E e) {
return s.add(e);
}
public boolean remove(Object o) {
return s.remove(o);
}
public boolean containsAll(Collection<?> c) {
return s.containsAll(c);
}
public boolean addAll(Collection<? extends E> c) {
return s.addAll(c);
}
public boolean removeAll(Collection<?> c) {
return s.removeAll(c);
}
public boolean retainAll(Collection<?> c) {
return s.retainAll(c);
}
public Object[] toArray() {
return s.toArray();
}
public <T> T[] toArray(T[] a) {
return s.toArray(a);
}
@Override
public boolean equals(Object o) {
return s.equals(o);
}
@Override
public int hashCode() {
return s.hashCode();
}
@Override
public String toString() {
return s.toString();
}
}
package tryAny.effectiveJava;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Set;
public class ObservableSet<E> extends ForwardingSet<E> {
public ObservableSet(Set<E> set) {
super(set);
}
private final List<SetObserver<E>> observers = new ArrayList<>();
public void addObserver(SetObserver<E> observer) {
synchronized (observers) {
observers.add(observer);
}
}
public boolean removeObserver(SetObserver<E> observer) {
synchronized (observers) {
return observers.remove(observer);
}
}
private void notifyElementAdded(E element) {
synchronized (observers) {
for (SetObserver<E> observer : observers)
observer.added(this, element);
}
}
@Override
public boolean add(E element) {
boolean added = super.add(element);
if (added)
notifyElementAdded(element);
return added;
}
@Override
public boolean addAll(Collection<? extends E> c) {
boolean result = false;
for (E element : c)
result |= add(element); // Calls notifyElementAdded
return result;
}
}
package tryAny.effectiveJava;
@FunctionalInterface
public interface SetObserver<E> {
// Invoked when an element is added to the observable set
void added(ObservableSet<E> set, E element);
}
- 以下のコードを実行するとデッドロックが発生する。バックグラウンドで走るスレッドは
s.removeObserver
においてobservers
のロックを取ろうとするが、メインスレッドがすでにobservers
のロックを取っているため、ロックが取れない。一方、メインスレッドバックグラウンドのスレッドの処理待ちとなるのでデッドロックとなる。
package tryAny.effectiveJava;
import java.util.HashSet;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class SynchroTest2 {
public static void main(String[] args) {
ObservableSet<Integer> set = new ObservableSet<>(new HashSet<>());
set.addObserver(new SetObserver<>() {
public void added(ObservableSet<Integer> s, Integer e) {
System.out.println(e);
if (e == 23) {
ExecutorService exec = Executors.newSingleThreadExecutor();
try {
exec.submit(() -> (s.removeObserver(this))).get();
} catch (ExecutionException | InterruptedException ex) {
throw new AssertionError(ex);
} finally {
exec.shutdown();
}
}
}
});
for (int i = 0; i < 100; i++) {
set.add(i);
}
// エラーは出ないがここには到達しない
System.out.println("finish");
}
}
- 上の
ConcurrentModificationException
が発生する例では、デッドロックは発生していない。これは、Java言語のロックはreentrant
なものだからである。reentrantなロックはマルチスレッドプログラミングをシンプルなものにするが、liveness failureをsafety failureにしうる。(?) - これらの問題への対処法としては、synchronizedブロックから未知のメソッドを取り除く方法がある。
// Alien method moved outside of synchronized block - open calls
private void notifyElementAdded(E element) {
List<SetObserver<E>> snapshot = null;
synchronized(observers) {
snapshot = new ArrayList<>(observers);
}
for (SetObserver<E> observer : snapshot)
observer.added(this, element);
}
また、さらに良い方法としては、synchronizedブロックを使わずに CopyOnWriteArrayList
を使う方法がある。
// Thread-safe observable set with CopyOnWriteArrayList
private final List<SetObserver<E>> observers =
new CopyOnWriteArrayList<>();
public void addObserver(SetObserver<E> observer) {
observers.add(observer);
}
public boolean removeObserver(SetObserver<E> observer)
{
return observers.remove(observer);
}
private void notifyElementAdded(E element) {
for (SetObserver<E> observer : observers)
observer.added(this, element);
}
-
synchronizedブロックの中での処理はできるだけ少なくするべき。長い時間がかかる処理が必要な場合は、Item78のガイドを破らずに、処理をsynchronizedブロックの外側に出すようにするべき。
-
過度な同期のコストは、ロックを取るためのCPU時間ではなく、並列化する機会を失うことや、全coreでのメモリに対する一貫性を担保するために課される処理が問題となる。また、過度な同期によってVMによるコード実行の最適化が行われないことも問題である。
-
mutableなクラスを書くにあたっては、2つの選択肢がある。
- 同期に関するコードは書かず、利用者に同期を任せる。
- スレッドセーフ(Item82)なクラスを書き、内部で同期を保つ。
-
StringBuffer
はほとんどシングルスレッドでしか使われないのに、内部で同期する仕組みになっている。そのため、同期をしない仕組みのStringBuilder
に取って代わられた。ランダムな数値を作成するjava.util.Random
も同様の理由でjava.util.concurrent.ThreadLocalRandom
に取って代わられた。 -
内部で同期するクラスをうまく作る技法は様々あるが、この本では述べられていない。
-
メソッドがstaticフィールドを変更し、複数のスレッドから呼ばれることがあるとすれば、そのstaticフィールドへのアクセスは内部で同期しておかなければならない。なぜなら、外部で同期をとることはできないからである。