概要: Rustの Arc
型の実装は宝の宝庫です。そこで、これを隅から隅まで解説してみます。
第5回(最終回)「Arcを読む」では、 Rc
と Arc
の差分を読んでいきます。
- 第1回 Arc/Rcの基本
- 第2回 Rcを読む基本編
- 第3回 Rcを読む発展編
- 第4回 アトミック変数とメモリ順序
- 第5回 Arcを読む
はじめに
第2回~第3回にかけて、 Arc
のシングルスレッド版である Rc
の実装を読んできました。残るは Arc
特有の部分です。
第1回/第2回 で説明した Arc
/Rc
の基本的な仕組みを仮定します。またアトミック変数とメモリ順序に言及するので、第4回の内容を適宜参照するといいでしょう。
まとめ
-
Arc
はスレッドの休眠処理が不要でアトミック操作にしか依存しないため、直感に反してalloc
クレートで定義されている。これはRc
が定義されているのと同じ場所である。 -
Arc
は共有ポイントである一方、たまたま参照カウントが1のときに所有権を取り出す機能もあるため、Arc<T>: Send
もArc<T>: Sync
もT: Send + Sync
を要求する必要がある。 - 原則としてカウンタ操作は競合状態を防ぐためRead-Modify-Write (RMW) アトミック操作を使って行われる。
- 原則として、インクリメント時はRelaxed, デクリメント時はRelease操作として行い、カウンタが0になったときはAcquire fenceを呼ぶことで必要な同期が達成できる。
-
get_mut
を安全に実装するために、弱参照カウンタにはロック機構が実装されている。 - 公開インターフェースである
strong_count
/weak_count
は内部処理と異なりSeqCst
順序が使われている。その実装は単なる内部カウンタのload/storeよりもやや興味深い実装になっている。
ソースコード
以下の1.39.0の実装を読んでいきます。
定義されている場所
少し意外かもしれませんが、 Arc
は Rc
と同じ alloc
クレートに定義されています。
第3回でも述べたように、Rustの標準ライブラリは以下の3段階のクレートに分割されています。
-
core
... CPUの基本機能のみに依存 -
alloc
...core
+ malloc/freeに依存 -
std
...alloc
+ OSの諸機能に依存
アトミック命令はOSの機能ではなくCPUの機能であり、基本的にどのCPUでも実装されているかエミュレート可能 (シングルコアのCPUの場合は通常の命令で代替可能) であるため、Rustでは基本的に利用可能とみなされます。ただmalloc/freeには依存するので、Arc
は std
ではなく alloc
で定義されています。
なお、Mutexはスピンロックかどうかによって定義可能な場所が異なります。
- 標準ライブラリのMutexは、ロックが確保できるまでスレッドを休眠させることがあります。スレッドの休眠はOSのスケジューラーへの指示によって実現されるため、
std
でのみ定義されます。 -
parking_lot
の定義するMutexも同様の理由でstd
のある環境でのみ利用可能です。 -
spin
の定義するMutexはスピンロックであり、スレッドの休眠をともなわないため、core
のある環境であればいつでも利用可能です。
Send
/Sync
第3回と同様、 Send
/Sync
の実装を見てみましょう。
unsafe impl<T: ?Sized + Sync + Send> Send for Arc<T> {}
unsafe impl<T: ?Sized + Sync + Send> Sync for Arc<T> {}
unsafe impl<T: ?Sized + Sync + Send> Send for Weak<T> {}
unsafe impl<T: ?Sized + Sync + Send> Sync for Weak<T> {}
Rc
と異なり、 Arc
の実装自体はマルチスレッドを考慮して作られているので、中身のスレッド安全性に注意すればよいはずです。 Send
は所有権を他スレッドに送っても安全かどうかで、 Sync
は共有権を他スレッドに与えても安全かどうかでした。
-
Arc<T>
を受け取ったスレッドは中の&T
にアクセスできます(←T: Sync
)。また、try_unwrap
でT
を取得できる可能性があります (←T: Send
)。したがって、Arc<T>: Send
であるためにはT: Send + Sync
が必要です。 -
&Arc<T>
を受け取ったスレッドはこれをcloneしてArc<T>
を得ることができます。したがって、Arc<T>: Sync
であるためにはT: Send + Sync
が必要です。 -
Weak<T>
を受け取ったスレッドはupgradeによってArc<T>
を取得できる可能性があります。したがって、Weak<T>: Send
であるためにはT: Send + Sync
が必要です。 -
&Weak<T>
を受け取ったスレッドはupgradeによってArc<T>
を取得できる可能性があります。したがって、Weak<T>: Sync
であるためにはT: Send + Sync
が必要です。
同期をとる必要があるのはどんなときか
以降ではメモリ順序指定の根拠を追っていきたいので、同期の必要性についてあらかじめ確認しておきます。この節は読み飛ばして後で必要に応じて戻ってくるのがいいかもしれません。
まず、Rustのメモリモデル (C++20のメモリモデル) ではアトミック操作はRelaxedでも各メモリ領域ごとには一貫した順序で行われるので、同期が必要なのは複数のメモリ領域がかかわった場合のみです。より具体的に言ってしまえばstrongとvalueの関係、weakとヒープ領域の関係を保証したいときに同期が必要です。それは具体的には以下のケースです。
- valueへの読み取りアクセスは、valueのdrop処理とhappens before関係にある必要がある。そのためには……
- strongを減らす操作をするときは、valueへのアクセスを終えた後にRelease同期を行う必要がある。
- strongを1以上から0にする操作をするときは、dropを開始する前にAcquire同期を行う必要がある。
- ヒープ領域への全てのアクセスは、ヒープ領域の解放処理とhappens before関係にある必要がある。そのためには……
- weakを減らす操作をするときは、ヒープ領域へのアクセスを終えた後にRelease同期を行う必要がある。
- weakを1以上から0にする操作をするときは、ヒープ領域の解放を開始する前にAcquire同期を行う必要がある。
- valueへの読み取りアクセスは、後続するvalueへの書き込みアクセスとhappens before関係にある必要がある。そのためには……
- strongを減らす操作をするときは、valueへのアクセスを終えた後にRelease同期を行う必要がある。
- 書き込みアクセスのためにstrongが1であることを確認する操作をするときは、valueへの書き込みアクセスを開始する前にAcquire同期を行う必要がある。
- valueへの書き込みアクセスは、後続するvalueへの読み取りアクセスとhappens before関係にある必要がある。書き込みアクセスを終えた時点で専有状態が保証されているので、これは自然に成立する。
強参照のライフサイクル
第2回の流れに合わせて、まずは強参照のライフサイクルを追っていきます。
初期化
初期化時はまだシングルスレッドなので、特に興味深いことはありません。
Arc::clone
impl<T: ?Sized> Clone for Arc<T> {
fn clone(&self) -> Arc<T> {
// Using a relaxed ordering is alright here, as knowledge of the
// original reference prevents other threads from erroneously deleting
// the object.
//
// As explained in the [Boost documentation][1], Increasing the
// reference counter can always be done with memory_order_relaxed: New
// references to an object can only be formed from an existing
// reference, and passing an existing reference from one thread to
// another must already provide any required synchronization.
//
// [1]: (www.boost.org/doc/libs/1_55_0/doc/html/atomic/usage_examples.html)
let old_size = self.inner().strong.fetch_add(1, Relaxed);
// However we need to guard against massive refcounts in case someone
// is `mem::forget`ing Arcs. If we don't do this the count can overflow
// and users will use-after free. We racily saturate to `isize::MAX` on
// the assumption that there aren't ~2 billion threads incrementing
// the reference count at once. This branch will never be taken in
// any realistic program.
//
// We abort because such a program is incredibly degenerate, and we
// don't care to support it.
if old_size > MAX_REFCOUNT {
unsafe {
abort();
}
}
Self::from_inner(self.ptr)
}
}
/// A soft limit on the amount of references that may be made to an `Arc`.
///
/// Going above this limit will abort your program (although not
/// necessarily) at _exactly_ `MAX_REFCOUNT + 1` references.
const MAX_REFCOUNT: usize = (isize::MAX) as usize;
まず、カウンタをインクリメントするために RMW (Read-Modify-Write) アトミック操作である fetch_add
を使っています。これは第4回の冒頭で説明した例にそっくりですね。ここのReadとWriteを別々に行ってしまうと以下の状況が発生してしまいます。
- スレッド1がカウンタを読む。
- スレッド2がカウンタを読む。
- スレッド1がカウンタに書き込む。
- スレッド2がカウンタに書き込む。 (→スレッド1のインクリメント効果が消滅する)
これを防ぐには fetch_add
で読み取りと書き込みを一気に行う必要があります。
一方メモリ順序としては最弱の Relaxed
が指定されています。これを考えるには Acquire
にしなくてよい理由と Release
にしなくてよい理由をそれぞれ考えればよさそうです。
-
Acquire
をするときは、value
への先行する処理の終了を待ちたいときです。Arc::clone
が保証するのはvalue
への共有アクセス権ですが、これを確保するために必要なのは「初期化が完了していること」で、これは言うまでもなく成立しています。したがってAcquire同期をする必要はありません。 -
Release
をするときは、value
への後続する処理を待たせたいときです。そのような処理としては「value
への専有アクセス」「value
のdrop」の2つがありますが、どちらに対する同期もArc::clone
のタイミングではなく、そうして作ったArc
がdropされるタイミングで行う必要があり、Arc::clone
のタイミングでわざわざ行う必要はありません。
以上の理由から Relaxed
で十分です。
さて、単に fetch_add
をするだけだとカウンタがオーバーフローしてしまう可能性があります。そこでオーバーフローを防ぐ必要があります。
オーバーフローを防ぐにはCAS loopで以下のように実装する手もあります。
let mut old_counter = strong.load(Relaxed);
loop {
if old_counter >= MAX_REFCOUNT {
// abort
}
let c = strong.compare_and_swap(old_counter, old_counter + 1);
if c == old_counter {
break;
} else {
old_counter = c;
}
}
しかし、このように実装すると余計な命令をたくさん発行する必要が出てしまいます。そこでRustでは fetch_add
を使うかわりに、 MAX_REFCOUNT
を以下のようにカウンタの最大値の約半分に設定しています:
// usizeの最大値の約半分!
const MAX_REFCOUNT: usize = (isize::MAX) as usize;
その上で、以下のように「fetch_add
した後に、カウンタ溢れをチェック」しています。
let old_size = self.inner().strong.fetch_add(1, Relaxed);
if old_size > MAX_REFCOUNT {
unsafe {
abort();
}
}
この手法自体は競合状態があり、 fetch_add
~ if
間で留まるスレッドが大量にあった場合に MAX_REFCOUNT
よりも少しばかり大きい値が観測される可能性があります。しかし、 MAX_REFCOUNT
は usize
の半分なので、カウンタが本当にオーバーフローするには異常な量のスレッドが同時にこの位置に留まる必要があります。そんなにスレッドがあったらそもそもメモリなり何なりが足りなくなること必至なので、事実上発生しないと考えて問題ないわけです。
Arc::drop
unsafe impl<#[may_dangle] T: ?Sized> Drop for Arc<T> {
fn drop(&mut self) {
// Because `fetch_sub` is already atomic, we do not need to synchronize
// with other threads unless we are going to delete the object. This
// same logic applies to the below `fetch_sub` to the `weak` count.
if self.inner().strong.fetch_sub(1, Release) != 1 {
return;
}
// This fence is needed to prevent reordering of use of the data and
// deletion of the data. Because it is marked `Release`, the decreasing
// of the reference count synchronizes with this `Acquire` fence. This
// means that use of the data happens before decreasing the reference
// count, which happens before this fence, which happens before the
// deletion of the data.
//
// As explained in the [Boost documentation][1],
//
// > It is important to enforce any possible access to the object in one
// > thread (through an existing reference) to *happen before* deleting
// > the object in a different thread. This is achieved by a "release"
// > operation after dropping a reference (any access to the object
// > through this reference must obviously happened before), and an
// > "acquire" operation before deleting the object.
//
// In particular, while the contents of an Arc are usually immutable, it's
// possible to have interior writes to something like a Mutex<T>. Since a
// Mutex is not acquired when it is deleted, we can't rely on its
// synchronization logic to make writes in thread A visible to a destructor
// running in thread B.
//
// Also note that the Acquire fence here could probably be replaced with an
// Acquire load, which could improve performance in highly-contended
// situations. See [2].
//
// [1]: (www.boost.org/doc/libs/1_55_0/doc/html/atomic/usage_examples.html)
// [2]: (https://github.com/rust-lang/rust/pull/41714)
atomic::fence(Acquire);
unsafe {
self.drop_slow();
}
}
}
fetch_sub
でアトミックにデクリメントしています。アトミックでなければいけない理由はいつも通りです。
ここでは以下のように同期を行っています。
-
fetch_sub
自身はRelease
同期を行う。 -
fetch_sub
の結果(直前の値)が1だった場合はさらにAcquire
同期を行う。
これは value
の読み取りと value
のdropの間にhappens before関係を成立させるためです。 value
の読み取りは fetch_sub
の直前まで行われ、 value
のdropは fetch_sub
によって値が0になったあとに行われるので、この2つの fetch_sub
の間で同期を取ればよさそうです。
具体例で考えてみます。スレッド1, 2, 3がそれぞれ Arc
を1個ずつdropしたとします。そして、 strong
のmodification orderが「スレッド3の fetch_sub
」→「スレッド2の fetch_sub
」→「スレッド1の fetch_sub
」だったとします。このとき以下の操作があらわれます。
- スレッド1
-
value
に読み取りアクセスする。 -
strong.fetch_sub(1)
... 1から0になった。 - Acquireフェンス。
-
value
のdropを開始する。
-
- スレッド2
-
value
に読み取りアクセスする。 -
strong.fetch_sub(1)
... 2から1になった。
-
- スレッド3
-
value
に読み取りアクセスする。 -
strong.fetch_sub(1)
... 3から2になった。
-
第4回で導入した記法を用いると、以下のようなグラフになります。
- スレッド1-スレッド1は同じスレッドなので(sequenced beforeなので) happens beforeが成立します。
- スレッド2-スレッド1はmemory-fence同期によりhappens beforeが成立します。
- スレッド3-スレッド1もmemory-fetch同期によりhappens beforeが成立します。このときrelease sequenceが使われていることに注意してください。
fenceを使うことで、特定の条件のときだけ同期を成立させていることも注目に値します。
弱参照のライフサイクル
弱参照のライフサイクルも同様です。と言いたいところですが実は1点重要な部分があります。
get_mut
問題
get_mut
は、 &mut Arc<T>
から &mut T
の取得を試みる関数でした。この関数は「strong == 1
かつ weak == 1
」と、両方のカウンタに対する制約をチェックする必要があります。これがマルチスレッド環境では問題になります。というのも、この2つのカウンタを同時にアトミックに処理する方法がないからです。(ごく一部のアーキテクチャではDouble CASと呼ばれる種類の命令があるほか、ワード長の倍のアトミックを使って解決することができる可能性もありますが、汎用的とはいえません)
strong
を先にチェックする場合
strong
を先にチェックするとどうでしょうか。
if self.strong.load(Relaxed) != 1 {
return None;
}
// ←競合条件
if self.weak.load(Relaxed) != 1 {
return None;
}
fence(Acquire);
return Some(unsafe { &mut *self.value.get() });
この場合、strongをチェックしてからweakをチェックするまでの間に、以下が発生する可能性があります。
- 他のスレッドが持っていた
Weak<T>
がArc<T>
にアップグレードされる。 - さらに元となった
Weak<T>
がdropされ、弱参照カウンタが1になる。
こうすると、チェックは成功するにもかかわらず実際には他にも Arc<T>
が存在する状態になってしまいます。
weak
を先にチェックする場合
では逆に weak
を先にチェックするとどうでしょうか。
if self.weak.load(Relaxed) != 1 {
return None;
}
// ←競合条件
if self.strong.load(Relaxed) != 1 {
return None;
}
fence(Acquire);
return Some(unsafe { &mut *self.value.get() });
この場合、strongをチェックしてからweakをチェックするまでの間に、以下が発生する可能性があります。
- 他のスレッドが持っていた
Arc<T>
がWeak<T>
にダウングレードされる。 - さらに元となった
Arc<T>
がdropされ、強参照カウンタが1になる。
こうすると、チェックは成功するにもかかわらず実際には他にも Weak<T>
が存在する状態になってしまいます。
strong
をチェックして変更する場合
ReadではなくRMW操作を使えば競合状態を回避できるでしょうか。 strong
を水増ししても仕方ないので、1だったら0にするのが良さそうです。
if self.strong.compare_and_swap(1, 0, Relaxed) != 1 {
return None;
}
if self.weak.load(Relaxed) != 1 {
self.strong.fetch_add(1, Relaxed);
return None;
}
self.strong.fetch_add(1, Acquire);
return Some(unsafe { &mut *self.value.get() });
こうすると、競合条件によってチェックをすり抜けること自体はありません。 strong
が1だったら一時的に0にしているので、その間に試みられたアップグレードは全て失敗します。
しかしこれはユーザーから観測される挙動に問題があります。Arc<T>
がまだ存在していてもアップグレードが気まぐれで失敗することになりますし、あるときアップグレードに失敗しても、その先ずっとアップグレードに失敗するとは限らないことになります。他のスレッドから見ると強参照が0なだけなので、「get_mut
の最中」と「本当に Arc<T>
がなくなった状態」の区別はつきません。そのため、リトライによって「気まぐれな失敗」を隠蔽することもできません。
weak
をチェックして変更する場合
weak
をいじっても、ダウングレードはいつでも成功するので、競合状態を防ぐことはできません。
ロックによる解決
以上のように、 get_mut
自身を工夫するだけでは安全にポインタの唯一性をチェックすることはできません。そこで weak
の定義を見直し、 weak
のロックを可能にします。このことは ArcInner
の定義に説明があります。
struct ArcInner<T: ?Sized> {
strong: atomic::AtomicUsize,
// the value usize::MAX acts as a sentinel for temporarily "locking" the
// ability to upgrade weak pointers or downgrade strong ones; this is used
// to avoid races in `make_mut` and `get_mut`.
weak: atomic::AtomicUsize,
data: T,
}
usize
の最大値をロックされた状態として特別に扱うことになっています。そのため、 weakを扱う処理はロック状態を念頭に置いて書かれる必要があります。
Arc::downgrade
さっそく Arc::downgrade
から見ていきましょう。 https://github.com/rust-lang/rust/blob/1.39.0/src/liballoc/sync.rs#L600-L629
impl<T: ?Sized> Arc<T> {
pub fn downgrade(this: &Self) -> Weak<T> {
// This Relaxed is OK because we're checking the value in the CAS
// below.
let mut cur = this.inner().weak.load(Relaxed);
loop {
// check if the weak counter is currently "locked"; if so, spin.
if cur == usize::MAX {
cur = this.inner().weak.load(Relaxed);
continue;
}
// NOTE: this code currently ignores the possibility of overflow
// into usize::MAX; in general both Rc and Arc need to be adjusted
// to deal with overflow.
// Unlike with Clone(), we need this to be an Acquire read to
// synchronize with the write coming from `is_unique`, so that the
// events prior to that write happen before this read.
match this.inner().weak.compare_exchange_weak(cur, cur + 1, Acquire, Relaxed) {
Ok(_) => {
// Make sure we do not create a dangling Weak
debug_assert!(!is_dangling(this.ptr));
return Weak { ptr: this.ptr };
}
Err(old) => cur = old,
}
}
}
}
ロックに対応するため、CAS loopとして実装されています。ロックされている場合にリトライすることを除くと以下と同等です。
this.inner().weak.fetch_add(1, Acquire);
// Make sure we do not create a dangling Weak
debug_assert!(!is_dangling(this.ptr));
return Weak { ptr: this.ptr };
ここで Relaxed
ではなく Acquire
が使われている理由は謎です。 is_unique
がどうのと書いてありますが、 is_unique
は同期に対して透過的に振る舞うはずです。そもそも同期が必要なのは他のメモリ領域 (strong
, value
) との順序を強制する必要がある場合ですが、ここでは weak
をインクリメントするにあたって strong
, value
が特定の状態であることを保証する必要がありません。 (特に、すでに strong > 0
であることは既存の Arc
の存在から保証されており、ここで別途同期する必要はない)
NOTEに記載の通りオーバーフローがチェックされていないので、 downgrade
→forget
を繰り返し行うと問題が起きそうです。
Weak::clone
Weak::clone
は Arc::downgrade
と近い処理をしていますが、実装はよりシンプルです。 https://github.com/rust-lang/rust/blob/1.39.0/src/liballoc/sync.rs#L1599-L1634
impl<T: ?Sized> Clone for Weak<T> {
fn clone(&self) -> Weak<T> {
let inner = if let Some(inner) = self.inner() {
inner
} else {
return Weak { ptr: self.ptr };
};
// See comments in Arc::clone() for why this is relaxed. This can use a
// fetch_add (ignoring the lock) because the weak count is only locked
// where are *no other* weak pointers in existence. (So we can't be
// running this code in that case).
let old_size = inner.weak.fetch_add(1, Relaxed);
// See comments in Arc::clone() for why we do this (for mem::forget).
if old_size > MAX_REFCOUNT {
unsafe {
abort();
}
}
return Weak { ptr: self.ptr };
}
}
最初の if let
分岐は第3回で説明済みなので省略します。
Arc::downgrade
と異なり、ロックされていないことは保証されています。weak
カウンタのロックは Weak
が存在しないときにしかかからないからです。それ以外のポイントは Arc::clone
のところで説明済みでほとんど考えることはありません。
Weak::upgrade
Weak
の存在意義といっても過言ではない Weak::upgrade
の実装は以下のようになっています。 https://github.com/rust-lang/rust/blob/1.39.0/src/liballoc/sync.rs#L1429-L1485
impl<T: ?Sized> Weak<T> {
pub fn upgrade(&self) -> Option<Arc<T>> {
// We use a CAS loop to increment the strong count instead of a
// fetch_add because once the count hits 0 it must never be above 0.
let inner = self.inner()?;
// Relaxed load because any write of 0 that we can observe
// leaves the field in a permanently zero state (so a
// "stale" read of 0 is fine), and any other value is
// confirmed via the CAS below.
let mut n = inner.strong.load(Relaxed);
loop {
if n == 0 {
return None;
}
// See comments in `Arc::clone` for why we do this (for `mem::forget`).
if n > MAX_REFCOUNT {
unsafe {
abort();
}
}
// Relaxed is valid for the same reason it is on Arc's Clone impl
match inner.strong.compare_exchange_weak(n, n + 1, Relaxed, Relaxed) {
Ok(_) => return Some(Arc::from_inner(self.ptr)), // null checked above
Err(old) => n = old,
}
}
}
}
ここでもCAS loopが使われていますが、これはweakのロックとは別の理由です。強参照カウンタが0かどうかで分岐する必要があるため、 fetch_add
ではなくCAS loopを使う必要があります。それ以外は Arc::clone
と同じです。
Weak::drop
Weak::drop
https://github.com/rust-lang/rust/blob/1.39.0/src/liballoc/sync.rs#L1658-L1706 も Arc::drop
と同様です。
impl<T: ?Sized> Drop for Weak<T> {
fn drop(&mut self) {
// If we find out that we were the last weak pointer, then its time to
// deallocate the data entirely. See the discussion in Arc::drop() about
// the memory orderings
//
// It's not necessary to check for the locked state here, because the
// weak count can only be locked if there was precisely one weak ref,
// meaning that drop could only subsequently run ON that remaining weak
// ref, which can only happen after the lock is released.
let inner = if let Some(inner) = self.inner() {
inner
} else {
return
};
if inner.weak.fetch_sub(1, Release) == 1 {
atomic::fence(Acquire);
unsafe {
Global.dealloc(self.ptr.cast(), Layout::for_value(self.ptr.as_ref()))
}
}
}
}
弱参照カウンタのロックは Weak
がないときにしか起こらないので、ロック対策をする必要はありません。残りの挙動は Arc::drop
と相似で、「強参照カウンタではなく弱参照カウンタが対象であること」「value
ではなくヒープ領域が対象であること」だけが異なります。
コピーオンライトのための処理
Rc
と同様、 Arc
にもコピーオンライトのための3つの関数 try_unwrap
, get_mut
, make_mut
が存在します。
try_unwrap
try_unwrap
は Arc::drop
と似ています。 https://github.com/rust-lang/rust/blob/1.39.0/src/liballoc/sync.rs#L348-L388
impl<T> Arc<T> {
pub fn try_unwrap(this: Self) -> Result<T, Self> {
// See `drop` for why all these atomics are like this
if this.inner().strong.compare_exchange(1, 0, Release, Relaxed).is_err() {
return Err(this);
}
atomic::fence(Acquire);
unsafe {
let elem = ptr::read(&this.ptr.as_ref().data);
// Make a weak pointer to clean up the implicit strong-weak reference
let _weak = Weak { ptr: this.ptr };
mem::forget(this);
Ok(elem)
}
}
}
Arc::try_unwrap
と Arc::drop
の違いは以下の2点だけです。
- 強参照カウントが1のときだけ動作する。
-
T
をdropせずに、所有権を取り出して返す。
この前者の条件を実現するために、 strong.fetch_sub(1, Release)
のかわりに strong.compare_exchange(1, 0, Release, Relaxed)
を使っています。
ところで、Release順序を保つ必要はあるのでしょうか? Arc::drop
がデクリメントをReleaseで行っているのは、 value
への読み取りアクセスの終了を実際に value
をdropするスレッドに同期するためでした。 Arc::try_unwrap
では、デクリメントに成功したときは必ずそのスレッドで value
の所有権を取り出すので、自分自身がデクリメントをReleaseで行う必要はなさそうです。わざわざ危険を冒してまでこの部分のメモリ順序を弱めるほどではないという判断かもしれません。
なお、Acquireフェンスのほうは相変らず必要です。他のスレッドの Arc::drop
と同期する必要があるからです。
get_mut
すでに説明したように get_mut
の実装はややタフで、以下のようになっています。 https://github.com/rust-lang/rust/blob/1.39.0/src/liballoc/sync.rs#L1077-L1092 https://github.com/rust-lang/rust/blob/1.39.0/src/liballoc/sync.rs#L1131-L1153
impl<T: ?Sized> Arc<T> {
pub fn get_mut(this: &mut Self) -> Option<&mut T> {
if this.is_unique() {
// This unsafety is ok because we're guaranteed that the pointer
// returned is the *only* pointer that will ever be returned to T. Our
// reference count is guaranteed to be 1 at this point, and we required
// the Arc itself to be `mut`, so we're returning the only possible
// reference to the inner data.
unsafe {
Some(Arc::get_mut_unchecked(this))
}
} else {
None
}
}
fn is_unique(&mut self) -> bool {
// lock the weak pointer count if we appear to be the sole weak pointer
// holder.
//
// The acquire label here ensures a happens-before relationship with any
// writes to `strong` (in particular in `Weak::upgrade`) prior to decrements
// of the `weak` count (via `Weak::drop`, which uses release). If the upgraded
// weak ref was never dropped, the CAS here will fail so we do not care to synchronize.
if self.inner().weak.compare_exchange(1, usize::MAX, Acquire, Relaxed).is_ok() {
// This needs to be an `Acquire` to synchronize with the decrement of the `strong`
// counter in `drop` -- the only access that happens when any but the last reference
// is being dropped.
let unique = self.inner().strong.load(Acquire) == 1;
// The release write here synchronizes with a read in `downgrade`,
// effectively preventing the above read of `strong` from happening
// after the write.
self.inner().weak.store(1, Release); // release the lock
unique
} else {
false
}
}
}
すでに説明したように get_mut
が必要とする唯一性 (strong == 1 && weak == 1
) をチェックするには弱参照カウントをロックする必要があります。この「ロック」の処理だけ切り出すと以下のようになります。
if weak.compare_exchange(1, usize::MAX, Acquire, Relaxed).is_ok() {
// ...
weak.store(1, Release);
}
これは典型的なスピンロックにおける try_lock
~unlock
のパターンになっていますね。
さて、このロックはアンロック時に weak
にRMWではない単純書き込みを行っています。つまり、この操作によってそこまで続いてきたrelease sequenceが途切れることになります。これは問題ないのかという疑問が生じます。結論から言うと、その単純書き込み自身が Release
操作になっていること(+ロックがAcquireであること)で同期を肩代わりできるため、問題ありません。
そしてそのロック中では強参照カウンタをacquire loadしています。
さて、問題は is_unique
がtrueを返したときに、ほかに Arc
も Weak
もないと確実に言えるかどうかです。これは以下のように順番に推論することでわかります。
- まず、weakのmodification orderを考えます。weakのロックからアンロックまでの間にweakが変更されることはないので、
Weak
の生成/消滅に由来するweakのインクリメント/デクリメントは「ロック以前」「アンロック以後」に分類できます。ロック時にweak == 1
を観測しているので、ロック以前に行われたインクリメントは、対応するデクリメントもロック以前に行われていることが保証されます。デクリメントは常にReleaseで行われているので、ロックと(直接的または間接的に)同期されます。この保証はアンロックが起きるタイミングまで延長できます。 - 次に、strongのmodification orderを考えます。ロックはありませんが、
Arc
の生成/消滅に由来するstrongのインクリメント/デクリメントは「is_unique
内のロード以前」「is_unique
内のロード以後」に分類できます。is_unique
内のロードでstrong == 1
を観測しているので、このタイミング以前に行われた(他のArc
のための)インクリメントは、対応するデクリメントもロード以前に行われていることが保証されます。デクリメントは常にReleaseで行われているので、当該ロードと同期されます。 -
is_unique
内のロードはweak
のロック~アンロックの間に発生しているので、このタイミングでは「この時点で存在しうる全ての他のArc
/Weak
がdrop済み(正確には、dropは終わっていないかもしれないが、もはやヒープ領域へのアクセスはしない)」であることが保証されます。 - 他スレッドに
Arc
/Weak
がないので、それ以降は明示的に何かしない限りはこの状況が続くことが保証されます。
ひとたびユニーク性が保証できたら、あとは Rc::get_mut
の場合と同じ理屈で考えることができます。
make_mut
make_mut
は try_unwrap
と get_mut
の合わせ技的に振る舞うので、ロックは不要です。 https://github.com/rust-lang/rust/blob/1.39.0/src/liballoc/sync.rs#L995-L1047
impl<T: Clone> Arc<T> {
pub fn make_mut(this: &mut Self) -> &mut T {
// Note that we hold both a strong reference and a weak reference.
// Thus, releasing our strong reference only will not, by itself, cause
// the memory to be deallocated.
//
// Use Acquire to ensure that we see any writes to `weak` that happen
// before release writes (i.e., decrements) to `strong`. Since we hold a
// weak count, there's no chance the ArcInner itself could be
// deallocated.
if this.inner().strong.compare_exchange(1, 0, Acquire, Relaxed).is_err() {
// Another strong pointer exists; clone
*this = Arc::new((**this).clone());
} else if this.inner().weak.load(Relaxed) != 1 {
// Relaxed suffices in the above because this is fundamentally an
// optimization: we are always racing with weak pointers being
// dropped. Worst case, we end up allocated a new Arc unnecessarily.
// We removed the last strong ref, but there are additional weak
// refs remaining. We'll move the contents to a new Arc, and
// invalidate the other weak refs.
// Note that it is not possible for the read of `weak` to yield
// usize::MAX (i.e., locked), since the weak count can only be
// locked by a thread with a strong reference.
// Materialize our own implicit weak pointer, so that it can clean
// up the ArcInner as needed.
let weak = Weak { ptr: this.ptr };
// mark the data itself as already deallocated
unsafe {
// there is no data race in the implicit write caused by `read`
// here (due to zeroing) because data is no longer accessed by
// other threads (due to there being no more strong refs at this
// point).
let mut swap = Arc::new(ptr::read(&weak.ptr.as_ref().data));
mem::swap(this, &mut swap);
mem::forget(swap);
}
} else {
// We were the sole reference of either kind; bump back up the
// strong ref count.
this.inner().strong.store(1, Release);
}
// As with `get_mut()`, the unsafety is ok because our reference was
// either unique to begin with, or became one upon cloning the contents.
unsafe {
&mut this.ptr.as_mut().data
}
}
}
make_mut
はまず strong.compare_exchange(1, 0, Acquire, Relaxed)
を行うことで、 try_unwrap
できる状況かどうかを調べています。できなかったら諦めてcloneします。
↑のCASが成功したら try_unwrap
が可能ですが、 get_mut
が可能ならそちらにフォールバックします。get_mut
と異なり、一度0を入れた強参照に1を書き戻すのは参照がユニークだったときだけなので、このように書いてもユーザーが観測する挙動としておかしなことにはなりません。
strong_count
/weak_count
Arc
/Rc
は強参照カウントや弱参照カウントを取得する関数を露出しています。
impl<T: ?Sized> Arc<T> {
pub fn weak_count(this: &Self) -> usize {
let cnt = this.inner().weak.load(SeqCst);
// If the weak count is currently locked, the value of the
// count was 0 just before taking the lock.
if cnt == usize::MAX { 0 } else { cnt - 1 }
}
pub fn strong_count(this: &Self) -> usize {
this.inner().strong.load(SeqCst)
}
}
impl<T: ?Sized> Weak<T> {
#[unstable(feature = "weak_counts", issue = "57977")]
pub fn strong_count(&self) -> usize {
if let Some(inner) = self.inner() {
inner.strong.load(SeqCst)
} else {
0
}
}
#[unstable(feature = "weak_counts", issue = "57977")]
pub fn weak_count(&self) -> Option<usize> {
// Due to the implicit weak pointer added when any strong pointers are
// around, we cannot implement `weak_count` correctly since it
// necessarily requires accessing the strong count and weak count in an
// unsynchronized fashion. So this version is a bit racy.
self.inner().map(|inner| {
let strong = inner.strong.load(SeqCst);
let weak = inner.weak.load(SeqCst);
if strong == 0 {
// If the last `Arc` has *just* been dropped, it might not yet
// have removed the implicit weak count, so the value we get
// here might be 1 too high.
weak
} else {
// As long as there's still at least 1 `Arc` around, subtract
// the implicit weak pointer.
// Note that the last `Arc` might get dropped between the 2
// loads we do above, removing the implicit weak pointer. This
// means that the value might be 1 too low here. In order to not
// return 0 here (which would happen if we're the only weak
// pointer), we guard against that specifically.
cmp::max(1, weak - 1)
}
})
}
}
興味深いのは以下の4点です。
- メモリ順序として
SeqCst
を使っている点。アトミック操作を直接露出する観点から、できるだけ直感的に振る舞うようSeqCst
を選んだのかもしれない。 -
weak
がロックされていたときの処理。これらの関数は単なる読み取りしかないので、少し時系列をずらして「ロックされる直前の値」を返すことで、ロックの解放待ちを防ぐことができている。 -
Weak::weak_count
がOption
を返す点。ぶら下がりWeak
は、他のWeak
参照との同一性という観点から少々変わった挙動をするため、特定の整数を返すよりもNone
を返すのが妥当ということかもしれない。 -
Weak::weak_count
が必ずしも正確な値を返さないこと。weak
は内部的には強参照が1個以上あるときは1多い値を格納する仕組みになっていたが、このせいで「Weak
の個数」を数えるのが難しい。weak
とstrong
を同時に読むのは難しいからである。Weak::upgrade
を呼べば一貫した値を取得することも可能そうだけど、そこまでするほどでもなさそう。
まとめ
-
Arc
はスレッドの休眠処理が不要でアトミック操作にしか依存しないため、直感に反してalloc
クレートで定義されている。これはRc
が定義されているのと同じ場所である。 -
Arc
は共有ポイントである一方、たまたま参照カウントが1のときに所有権を取り出す機能もあるため、Arc<T>: Send
もArc<T>: Sync
もT: Send + Sync
を要求する必要がある。 - 原則としてカウンタ操作は競合状態を防ぐためRead-Modify-Write (RMW) アトミック操作を使って行われる。
- 原則として、インクリメント時はRelaxed, デクリメント時はRelease操作として行い、カウンタが0になったときはAcquire fenceを呼ぶことで必要な同期が達成できる。
-
get_mut
を安全に実装するために、弱参照カウンタにはロック機構が実装されている。 - 公開インターフェースである
strong_count
/weak_count
は内部処理と異なりSeqCst
順序が使われている。その実装は単なる内部カウンタのload/storeよりもやや興味深い実装になっている。
以上で「Rustの Arc
を読む」シリーズは終わりです。比較的短いソースから、予想以上に多くのことが学べたのではないでしょうか。(筆者自身も予想を超えてたくさん執筆したため驚いています。) これが皆さんの学習の助けになればさいわいです。