LoginSignup
47
19

More than 3 years have passed since last update.

Rustの `Arc` を読む(5): Arcを読む

Last updated at Posted at 2019-12-31

概要: Rustの Arc 型の実装は宝の宝庫です。そこで、これを隅から隅まで解説してみます。

第5回(最終回)「Arcを読む」では、 RcArc の差分を読んでいきます。

はじめに

第2回~第3回にかけて、 Arc のシングルスレッド版である Rc の実装を読んできました。残るは Arc 特有の部分です。

第1回/第2回 で説明した Arc/Rc の基本的な仕組みを仮定します。またアトミック変数とメモリ順序に言及するので、第4回の内容を適宜参照するといいでしょう。

まとめ

  • Arc はスレッドの休眠処理が不要でアトミック操作にしか依存しないため、直感に反して alloc クレートで定義されている。これは Rc が定義されているのと同じ場所である。
  • Arc は共有ポイントである一方、たまたま参照カウントが1のときに所有権を取り出す機能もあるため、 Arc<T>: SendArc<T>: SyncT: Send + Sync を要求する必要がある。
  • 原則としてカウンタ操作は競合状態を防ぐためRead-Modify-Write (RMW) アトミック操作を使って行われる。
  • 原則として、インクリメント時はRelaxed, デクリメント時はRelease操作として行い、カウンタが0になったときはAcquire fenceを呼ぶことで必要な同期が達成できる。
  • get_mut を安全に実装するために、弱参照カウンタにはロック機構が実装されている。
  • 公開インターフェースである strong_count/weak_count は内部処理と異なり SeqCst 順序が使われている。その実装は単なる内部カウンタのload/storeよりもやや興味深い実装になっている。

ソースコード

以下の1.39.0の実装を読んでいきます。

定義されている場所

少し意外かもしれませんが、 ArcRc と同じ alloc クレートに定義されています。

第3回でも述べたように、Rustの標準ライブラリは以下の3段階のクレートに分割されています。

  • core ... CPUの基本機能のみに依存
  • alloc ... core + malloc/freeに依存
  • std ... alloc + OSの諸機能に依存

アトミック命令はOSの機能ではなくCPUの機能であり、基本的にどのCPUでも実装されているかエミュレート可能 (シングルコアのCPUの場合は通常の命令で代替可能) であるため、Rustでは基本的に利用可能とみなされます。ただmalloc/freeには依存するので、Arcstd ではなく alloc で定義されています。

なお、Mutexはスピンロックかどうかによって定義可能な場所が異なります。

  • 標準ライブラリのMutexは、ロックが確保できるまでスレッドを休眠させることがあります。スレッドの休眠はOSのスケジューラーへの指示によって実現されるため、 std でのみ定義されます。
  • parking_lot の定義するMutexも同様の理由で std のある環境でのみ利用可能です。
  • spin の定義するMutexはスピンロックであり、スレッドの休眠をともなわないため、 core のある環境であればいつでも利用可能です。

Send/Sync

第3回と同様、 Send/Sync の実装を見てみましょう。

unsafe impl<T: ?Sized + Sync + Send> Send for Arc<T> {}
unsafe impl<T: ?Sized + Sync + Send> Sync for Arc<T> {}

unsafe impl<T: ?Sized + Sync + Send> Send for Weak<T> {}
unsafe impl<T: ?Sized + Sync + Send> Sync for Weak<T> {}

Rc と異なり、 Arc の実装自体はマルチスレッドを考慮して作られているので、中身のスレッド安全性に注意すればよいはずです。 Send所有権を他スレッドに送っても安全かどうかで、 Sync共有権を他スレッドに与えても安全かどうかでした。

  • Arc<T> を受け取ったスレッドは中の &T にアクセスできます(← T: Sync)。また、 try_unwrapT を取得できる可能性があります (← T: Send)。したがって、 Arc<T>: Send であるためには T: Send + Sync が必要です。
  • &Arc<T> を受け取ったスレッドはこれをcloneして Arc<T> を得ることができます。したがって、 Arc<T>: Sync であるためには T: Send + Sync が必要です。
  • Weak<T> を受け取ったスレッドはupgradeによって Arc<T> を取得できる可能性があります。したがって、 Weak<T>: Send であるためには T: Send + Sync が必要です。
  • &Weak<T> を受け取ったスレッドはupgradeによって Arc<T> を取得できる可能性があります。したがって、 Weak<T>: Sync であるためには T: Send + Sync が必要です。

同期をとる必要があるのはどんなときか

以降ではメモリ順序指定の根拠を追っていきたいので、同期の必要性についてあらかじめ確認しておきます。この節は読み飛ばして後で必要に応じて戻ってくるのがいいかもしれません。

まず、Rustのメモリモデル (C++20のメモリモデル) ではアトミック操作はRelaxedでも各メモリ領域ごとには一貫した順序で行われるので、同期が必要なのは複数のメモリ領域がかかわった場合のみです。より具体的に言ってしまえばstrongとvalueの関係weakとヒープ領域の関係を保証したいときに同期が必要です。それは具体的には以下のケースです。

  • valueへの読み取りアクセスは、valueのdrop処理とhappens before関係にある必要がある。そのためには……
    • strongを減らす操作をするときは、valueへのアクセスを終えた後にRelease同期を行う必要がある。
    • strongを1以上から0にする操作をするときは、dropを開始する前にAcquire同期を行う必要がある。
  • ヒープ領域への全てのアクセスは、ヒープ領域の解放処理とhappens before関係にある必要がある。そのためには……
    • weakを減らす操作をするときは、ヒープ領域へのアクセスを終えた後にRelease同期を行う必要がある。
    • weakを1以上から0にする操作をするときは、ヒープ領域の解放を開始する前にAcquire同期を行う必要がある。
  • valueへの読み取りアクセスは、後続するvalueへの書き込みアクセスとhappens before関係にある必要がある。そのためには……
    • strongを減らす操作をするときは、valueへのアクセスを終えた後にRelease同期を行う必要がある。
    • 書き込みアクセスのためにstrongが1であることを確認する操作をするときは、valueへの書き込みアクセスを開始する前にAcquire同期を行う必要がある。
  • valueへの書き込みアクセスは、後続するvalueへの読み取りアクセスとhappens before関係にある必要がある。書き込みアクセスを終えた時点で専有状態が保証されているので、これは自然に成立する。

強参照のライフサイクル

第2回の流れに合わせて、まずは強参照のライフサイクルを追っていきます。

初期化

初期化時はまだシングルスレッドなので、特に興味深いことはありません。

Arc::clone

impl<T: ?Sized> Clone for Arc<T> {
    fn clone(&self) -> Arc<T> {
        // Using a relaxed ordering is alright here, as knowledge of the
        // original reference prevents other threads from erroneously deleting
        // the object.
        //
        // As explained in the [Boost documentation][1], Increasing the
        // reference counter can always be done with memory_order_relaxed: New
        // references to an object can only be formed from an existing
        // reference, and passing an existing reference from one thread to
        // another must already provide any required synchronization.
        //
        // [1]: (www.boost.org/doc/libs/1_55_0/doc/html/atomic/usage_examples.html)
        let old_size = self.inner().strong.fetch_add(1, Relaxed);

        // However we need to guard against massive refcounts in case someone
        // is `mem::forget`ing Arcs. If we don't do this the count can overflow
        // and users will use-after free. We racily saturate to `isize::MAX` on
        // the assumption that there aren't ~2 billion threads incrementing
        // the reference count at once. This branch will never be taken in
        // any realistic program.
        //
        // We abort because such a program is incredibly degenerate, and we
        // don't care to support it.
        if old_size > MAX_REFCOUNT {
            unsafe {
                abort();
            }
        }

        Self::from_inner(self.ptr)
    }
}

/// A soft limit on the amount of references that may be made to an `Arc`.
///
/// Going above this limit will abort your program (although not
/// necessarily) at _exactly_ `MAX_REFCOUNT + 1` references.
const MAX_REFCOUNT: usize = (isize::MAX) as usize;

まず、カウンタをインクリメントするために RMW (Read-Modify-Write) アトミック操作である fetch_add を使っています。これは第4回の冒頭で説明した例にそっくりですね。ここのReadとWriteを別々に行ってしまうと以下の状況が発生してしまいます。

  • スレッド1がカウンタを読む。
  • スレッド2がカウンタを読む。
  • スレッド1がカウンタに書き込む。
  • スレッド2がカウンタに書き込む。 (→スレッド1のインクリメント効果が消滅する)

これを防ぐには fetch_add で読み取りと書き込みを一気に行う必要があります。

一方メモリ順序としては最弱の Relaxed が指定されています。これを考えるには Acquire にしなくてよい理由と Release にしなくてよい理由をそれぞれ考えればよさそうです。

  • Acquire をするときは、 value への先行する処理の終了を待ちたいときです。 Arc::clone が保証するのは value への共有アクセス権ですが、これを確保するために必要なのは「初期化が完了していること」で、これは言うまでもなく成立しています。したがってAcquire同期をする必要はありません。
  • Release をするときは、 value への後続する処理を待たせたいときです。そのような処理としては「value への専有アクセス」「value のdrop」の2つがありますが、どちらに対する同期も Arc::clone のタイミングではなく、そうして作った Arc がdropされるタイミングで行う必要があり、 Arc::clone のタイミングでわざわざ行う必要はありません。

以上の理由から Relaxed で十分です。

さて、単に fetch_add をするだけだとカウンタがオーバーフローしてしまう可能性があります。そこでオーバーフローを防ぐ必要があります。

オーバーフローを防ぐにはCAS loopで以下のように実装する手もあります。

let mut old_counter = strong.load(Relaxed);
loop {
    if old_counter >= MAX_REFCOUNT {
        // abort
    }
    let c = strong.compare_and_swap(old_counter, old_counter + 1);
    if c == old_counter {
        break;
    } else {
        old_counter = c;
    }
}

しかし、このように実装すると余計な命令をたくさん発行する必要が出てしまいます。そこでRustでは fetch_add を使うかわりに、 MAX_REFCOUNT を以下のようにカウンタの最大値の約半分に設定しています:

// usizeの最大値の約半分!
const MAX_REFCOUNT: usize = (isize::MAX) as usize;

その上で、以下のように「fetch_add した後に、カウンタ溢れをチェック」しています。

let old_size = self.inner().strong.fetch_add(1, Relaxed);

if old_size > MAX_REFCOUNT {
    unsafe {
        abort();
    }
}

この手法自体は競合状態があり、 fetch_addif 間で留まるスレッドが大量にあった場合に MAX_REFCOUNT よりも少しばかり大きい値が観測される可能性があります。しかし、 MAX_REFCOUNTusize の半分なので、カウンタが本当にオーバーフローするには異常な量のスレッドが同時にこの位置に留まる必要があります。そんなにスレッドがあったらそもそもメモリなり何なりが足りなくなること必至なので、事実上発生しないと考えて問題ないわけです。

Arc::drop

unsafe impl<#[may_dangle] T: ?Sized> Drop for Arc<T> {
    fn drop(&mut self) {
        // Because `fetch_sub` is already atomic, we do not need to synchronize
        // with other threads unless we are going to delete the object. This
        // same logic applies to the below `fetch_sub` to the `weak` count.
        if self.inner().strong.fetch_sub(1, Release) != 1 {
            return;
        }

        // This fence is needed to prevent reordering of use of the data and
        // deletion of the data.  Because it is marked `Release`, the decreasing
        // of the reference count synchronizes with this `Acquire` fence. This
        // means that use of the data happens before decreasing the reference
        // count, which happens before this fence, which happens before the
        // deletion of the data.
        //
        // As explained in the [Boost documentation][1],
        //
        // > It is important to enforce any possible access to the object in one
        // > thread (through an existing reference) to *happen before* deleting
        // > the object in a different thread. This is achieved by a "release"
        // > operation after dropping a reference (any access to the object
        // > through this reference must obviously happened before), and an
        // > "acquire" operation before deleting the object.
        //
        // In particular, while the contents of an Arc are usually immutable, it's
        // possible to have interior writes to something like a Mutex<T>. Since a
        // Mutex is not acquired when it is deleted, we can't rely on its
        // synchronization logic to make writes in thread A visible to a destructor
        // running in thread B.
        //
        // Also note that the Acquire fence here could probably be replaced with an
        // Acquire load, which could improve performance in highly-contended
        // situations. See [2].
        //
        // [1]: (www.boost.org/doc/libs/1_55_0/doc/html/atomic/usage_examples.html)
        // [2]: (https://github.com/rust-lang/rust/pull/41714)
        atomic::fence(Acquire);

        unsafe {
            self.drop_slow();
        }
    }
}

fetch_sub でアトミックにデクリメントしています。アトミックでなければいけない理由はいつも通りです。

ここでは以下のように同期を行っています。

  • fetch_sub 自身は Release 同期を行う。
  • fetch_sub の結果(直前の値)が1だった場合はさらに Acquire 同期を行う。

これは value の読み取りと value のdropの間にhappens before関係を成立させるためです。 value の読み取りは fetch_sub の直前まで行われ、 value のdropは fetch_sub によって値が0になったあとに行われるので、この2つの fetch_sub の間で同期を取ればよさそうです。

具体例で考えてみます。スレッド1, 2, 3がそれぞれ Arc を1個ずつdropしたとします。そして、 strong のmodification orderが「スレッド3の fetch_sub」→「スレッド2の fetch_sub」→「スレッド1の fetch_sub」だったとします。このとき以下の操作があらわれます。

  • スレッド1
    • value に読み取りアクセスする。
    • strong.fetch_sub(1) ... 1から0になった。
    • Acquireフェンス。
    • value のdropを開始する。
  • スレッド2
    • value に読み取りアクセスする。
    • strong.fetch_sub(1) ... 2から1になった。
  • スレッド3
    • value に読み取りアクセスする。
    • strong.fetch_sub(1) ... 3から2になった。

第4回で導入した記法を用いると、以下のようなグラフになります。

図: `Arc::drop` の操作からなるグラフ

  • スレッド1-スレッド1は同じスレッドなので(sequenced beforeなので) happens beforeが成立します。
  • スレッド2-スレッド1はmemory-fence同期によりhappens beforeが成立します。
  • スレッド3-スレッド1もmemory-fetch同期によりhappens beforeが成立します。このときrelease sequenceが使われていることに注意してください。

fenceを使うことで、特定の条件のときだけ同期を成立させていることも注目に値します。

弱参照のライフサイクル

弱参照のライフサイクルも同様です。と言いたいところですが実は1点重要な部分があります。

get_mut 問題

get_mut は、 &mut Arc<T> から &mut T の取得を試みる関数でした。この関数は「strong == 1 かつ weak == 1」と、両方のカウンタに対する制約をチェックする必要があります。これがマルチスレッド環境では問題になります。というのも、この2つのカウンタを同時にアトミックに処理する方法がないからです。(ごく一部のアーキテクチャではDouble CASと呼ばれる種類の命令があるほか、ワード長の倍のアトミックを使って解決することができる可能性もありますが、汎用的とはいえません)

strong を先にチェックする場合

strong を先にチェックするとどうでしょうか。

if self.strong.load(Relaxed) != 1 {
    return None;
}
// ←競合条件
if self.weak.load(Relaxed) != 1 {
    return None;
}
fence(Acquire);
return Some(unsafe { &mut *self.value.get() });

この場合、strongをチェックしてからweakをチェックするまでの間に、以下が発生する可能性があります。

  • 他のスレッドが持っていた Weak<T>Arc<T> にアップグレードされる。
  • さらに元となった Weak<T> がdropされ、弱参照カウンタが1になる。

こうすると、チェックは成功するにもかかわらず実際には他にも Arc<T> が存在する状態になってしまいます。

weak を先にチェックする場合

では逆に weak を先にチェックするとどうでしょうか。

if self.weak.load(Relaxed) != 1 {
    return None;
}
// ←競合条件
if self.strong.load(Relaxed) != 1 {
    return None;
}
fence(Acquire);
return Some(unsafe { &mut *self.value.get() });

この場合、strongをチェックしてからweakをチェックするまでの間に、以下が発生する可能性があります。

  • 他のスレッドが持っていた Arc<T>Weak<T> にダウングレードされる。
  • さらに元となった Arc<T> がdropされ、強参照カウンタが1になる。

こうすると、チェックは成功するにもかかわらず実際には他にも Weak<T> が存在する状態になってしまいます。

strong をチェックして変更する場合

ReadではなくRMW操作を使えば競合状態を回避できるでしょうか。 strong を水増ししても仕方ないので、1だったら0にするのが良さそうです。

if self.strong.compare_and_swap(1, 0, Relaxed) != 1 {
    return None;
}
if self.weak.load(Relaxed) != 1 {
    self.strong.fetch_add(1, Relaxed);
    return None;
}
self.strong.fetch_add(1, Acquire);
return Some(unsafe { &mut *self.value.get() });

こうすると、競合条件によってチェックをすり抜けること自体はありません。 strong が1だったら一時的に0にしているので、その間に試みられたアップグレードは全て失敗します。

しかしこれはユーザーから観測される挙動に問題があります。Arc<T> がまだ存在していてもアップグレードが気まぐれで失敗することになりますし、あるときアップグレードに失敗しても、その先ずっとアップグレードに失敗するとは限らないことになります。他のスレッドから見ると強参照が0なだけなので、「get_mut の最中」と「本当に Arc<T> がなくなった状態」の区別はつきません。そのため、リトライによって「気まぐれな失敗」を隠蔽することもできません。

weak をチェックして変更する場合

weak をいじっても、ダウングレードはいつでも成功するので、競合状態を防ぐことはできません。

ロックによる解決

以上のように、 get_mut 自身を工夫するだけでは安全にポインタの唯一性をチェックすることはできません。そこで weak の定義を見直し、 weak のロックを可能にします。このことは ArcInner の定義に説明があります。

struct ArcInner<T: ?Sized> {
    strong: atomic::AtomicUsize,

    // the value usize::MAX acts as a sentinel for temporarily "locking" the
    // ability to upgrade weak pointers or downgrade strong ones; this is used
    // to avoid races in `make_mut` and `get_mut`.
    weak: atomic::AtomicUsize,

    data: T,
}

usize の最大値をロックされた状態として特別に扱うことになっています。そのため、 weakを扱う処理はロック状態を念頭に置いて書かれる必要があります

Arc::downgrade

さっそく Arc::downgrade から見ていきましょう。 https://github.com/rust-lang/rust/blob/1.39.0/src/liballoc/sync.rs#L600-L629

impl<T: ?Sized> Arc<T> {
    pub fn downgrade(this: &Self) -> Weak<T> {
        // This Relaxed is OK because we're checking the value in the CAS
        // below.
        let mut cur = this.inner().weak.load(Relaxed);

        loop {
            // check if the weak counter is currently "locked"; if so, spin.
            if cur == usize::MAX {
                cur = this.inner().weak.load(Relaxed);
                continue;
            }

            // NOTE: this code currently ignores the possibility of overflow
            // into usize::MAX; in general both Rc and Arc need to be adjusted
            // to deal with overflow.

            // Unlike with Clone(), we need this to be an Acquire read to
            // synchronize with the write coming from `is_unique`, so that the
            // events prior to that write happen before this read.
            match this.inner().weak.compare_exchange_weak(cur, cur + 1, Acquire, Relaxed) {
                Ok(_) => {
                    // Make sure we do not create a dangling Weak
                    debug_assert!(!is_dangling(this.ptr));
                    return Weak { ptr: this.ptr };
                }
                Err(old) => cur = old,
            }
        }
    }
}

ロックに対応するため、CAS loopとして実装されています。ロックされている場合にリトライすることを除くと以下と同等です。

this.inner().weak.fetch_add(1, Acquire);
// Make sure we do not create a dangling Weak
debug_assert!(!is_dangling(this.ptr));
return Weak { ptr: this.ptr };

ここで Relaxed ではなく Acquire が使われている理由は謎です。 is_unique がどうのと書いてありますが、 is_unique は同期に対して透過的に振る舞うはずです。そもそも同期が必要なのは他のメモリ領域 (strong, value) との順序を強制する必要がある場合ですが、ここでは weak をインクリメントするにあたって strong, value が特定の状態であることを保証する必要がありません。 (特に、すでに strong > 0 であることは既存の Arc の存在から保証されており、ここで別途同期する必要はない)

NOTEに記載の通りオーバーフローがチェックされていないので、 downgradeforget を繰り返し行うと問題が起きそうです。

Weak::clone

Weak::cloneArc::downgrade と近い処理をしていますが、実装はよりシンプルです。 https://github.com/rust-lang/rust/blob/1.39.0/src/liballoc/sync.rs#L1599-L1634

impl<T: ?Sized> Clone for Weak<T> {
    fn clone(&self) -> Weak<T> {
        let inner = if let Some(inner) = self.inner() {
            inner
        } else {
            return Weak { ptr: self.ptr };
        };
        // See comments in Arc::clone() for why this is relaxed.  This can use a
        // fetch_add (ignoring the lock) because the weak count is only locked
        // where are *no other* weak pointers in existence. (So we can't be
        // running this code in that case).
        let old_size = inner.weak.fetch_add(1, Relaxed);

        // See comments in Arc::clone() for why we do this (for mem::forget).
        if old_size > MAX_REFCOUNT {
            unsafe {
                abort();
            }
        }

        return Weak { ptr: self.ptr };
    }
}

最初の if let 分岐は第3回で説明済みなので省略します。

Arc::downgrade と異なり、ロックされていないことは保証されています。weak カウンタのロックは Weak が存在しないときにしかかからないからです。それ以外のポイントは Arc::clone のところで説明済みでほとんど考えることはありません。

Weak::upgrade

Weak の存在意義といっても過言ではない Weak::upgrade の実装は以下のようになっています。 https://github.com/rust-lang/rust/blob/1.39.0/src/liballoc/sync.rs#L1429-L1485

impl<T: ?Sized> Weak<T> {
    pub fn upgrade(&self) -> Option<Arc<T>> {
        // We use a CAS loop to increment the strong count instead of a
        // fetch_add because once the count hits 0 it must never be above 0.
        let inner = self.inner()?;

        // Relaxed load because any write of 0 that we can observe
        // leaves the field in a permanently zero state (so a
        // "stale" read of 0 is fine), and any other value is
        // confirmed via the CAS below.
        let mut n = inner.strong.load(Relaxed);

        loop {
            if n == 0 {
                return None;
            }

            // See comments in `Arc::clone` for why we do this (for `mem::forget`).
            if n > MAX_REFCOUNT {
                unsafe {
                    abort();
                }
            }

            // Relaxed is valid for the same reason it is on Arc's Clone impl
            match inner.strong.compare_exchange_weak(n, n + 1, Relaxed, Relaxed) {
                Ok(_) => return Some(Arc::from_inner(self.ptr)), // null checked above
                Err(old) => n = old,
            }
        }
    }
}

ここでもCAS loopが使われていますが、これはweakのロックとは別の理由です。強参照カウンタが0かどうかで分岐する必要があるため、 fetch_add ではなくCAS loopを使う必要があります。それ以外は Arc::clone と同じです。

Weak::drop

Weak::drop https://github.com/rust-lang/rust/blob/1.39.0/src/liballoc/sync.rs#L1658-L1706Arc::drop と同様です。

impl<T: ?Sized> Drop for Weak<T> {
    fn drop(&mut self) {
        // If we find out that we were the last weak pointer, then its time to
        // deallocate the data entirely. See the discussion in Arc::drop() about
        // the memory orderings
        //
        // It's not necessary to check for the locked state here, because the
        // weak count can only be locked if there was precisely one weak ref,
        // meaning that drop could only subsequently run ON that remaining weak
        // ref, which can only happen after the lock is released.
        let inner = if let Some(inner) = self.inner() {
            inner
        } else {
            return
        };

        if inner.weak.fetch_sub(1, Release) == 1 {
            atomic::fence(Acquire);
            unsafe {
                Global.dealloc(self.ptr.cast(), Layout::for_value(self.ptr.as_ref()))
            }
        }
    }
}

弱参照カウンタのロックは Weak がないときにしか起こらないので、ロック対策をする必要はありません。残りの挙動は Arc::drop と相似で、「強参照カウンタではなく弱参照カウンタが対象であること」「value ではなくヒープ領域が対象であること」だけが異なります。

コピーオンライトのための処理

Rc と同様、 Arc にもコピーオンライトのための3つの関数 try_unwrap, get_mut, make_mut が存在します。

try_unwrap

try_unwrapArc::drop と似ています。 https://github.com/rust-lang/rust/blob/1.39.0/src/liballoc/sync.rs#L348-L388

impl<T> Arc<T> {
    pub fn try_unwrap(this: Self) -> Result<T, Self> {
        // See `drop` for why all these atomics are like this
        if this.inner().strong.compare_exchange(1, 0, Release, Relaxed).is_err() {
            return Err(this);
        }

        atomic::fence(Acquire);

        unsafe {
            let elem = ptr::read(&this.ptr.as_ref().data);

            // Make a weak pointer to clean up the implicit strong-weak reference
            let _weak = Weak { ptr: this.ptr };
            mem::forget(this);

            Ok(elem)
        }
    }
}

Arc::try_unwrapArc::drop の違いは以下の2点だけです。

  • 強参照カウントが1のときだけ動作する。
  • T をdropせずに、所有権を取り出して返す。

この前者の条件を実現するために、 strong.fetch_sub(1, Release) のかわりに strong.compare_exchange(1, 0, Release, Relaxed) を使っています。

ところで、Release順序を保つ必要はあるのでしょうか? Arc::drop がデクリメントをReleaseで行っているのは、 value への読み取りアクセスの終了を実際に value をdropするスレッドに同期するためでした。 Arc::try_unwrap では、デクリメントに成功したときは必ずそのスレッドで value の所有権を取り出すので、自分自身がデクリメントをReleaseで行う必要はなさそうです。わざわざ危険を冒してまでこの部分のメモリ順序を弱めるほどではないという判断かもしれません。

なお、Acquireフェンスのほうは相変らず必要です。他のスレッドの Arc::drop と同期する必要があるからです。

get_mut

すでに説明したように get_mut の実装はややタフで、以下のようになっています。 https://github.com/rust-lang/rust/blob/1.39.0/src/liballoc/sync.rs#L1077-L1092 https://github.com/rust-lang/rust/blob/1.39.0/src/liballoc/sync.rs#L1131-L1153

impl<T: ?Sized> Arc<T> {
    pub fn get_mut(this: &mut Self) -> Option<&mut T> {
        if this.is_unique() {
            // This unsafety is ok because we're guaranteed that the pointer
            // returned is the *only* pointer that will ever be returned to T. Our
            // reference count is guaranteed to be 1 at this point, and we required
            // the Arc itself to be `mut`, so we're returning the only possible
            // reference to the inner data.
            unsafe {
                Some(Arc::get_mut_unchecked(this))
            }
        } else {
            None
        }
    }

    fn is_unique(&mut self) -> bool {
        // lock the weak pointer count if we appear to be the sole weak pointer
        // holder.
        //
        // The acquire label here ensures a happens-before relationship with any
        // writes to `strong` (in particular in `Weak::upgrade`) prior to decrements
        // of the `weak` count (via `Weak::drop`, which uses release).  If the upgraded
        // weak ref was never dropped, the CAS here will fail so we do not care to synchronize.
        if self.inner().weak.compare_exchange(1, usize::MAX, Acquire, Relaxed).is_ok() {
            // This needs to be an `Acquire` to synchronize with the decrement of the `strong`
            // counter in `drop` -- the only access that happens when any but the last reference
            // is being dropped.
            let unique = self.inner().strong.load(Acquire) == 1;

            // The release write here synchronizes with a read in `downgrade`,
            // effectively preventing the above read of `strong` from happening
            // after the write.
            self.inner().weak.store(1, Release); // release the lock
            unique
        } else {
            false
        }
    }
}

すでに説明したように get_mut が必要とする唯一性 (strong == 1 && weak == 1) をチェックするには弱参照カウントをロックする必要があります。この「ロック」の処理だけ切り出すと以下のようになります。

if weak.compare_exchange(1, usize::MAX, Acquire, Relaxed).is_ok() {
    // ...

    weak.store(1, Release);
}

これは典型的なスピンロックにおける try_lockunlock のパターンになっていますね。

さて、このロックはアンロック時に weak にRMWではない単純書き込みを行っています。つまり、この操作によってそこまで続いてきたrelease sequenceが途切れることになります。これは問題ないのかという疑問が生じます。結論から言うと、その単純書き込み自身が Release 操作になっていること(+ロックがAcquireであること)で同期を肩代わりできるため、問題ありません。

そしてそのロック中では強参照カウンタをacquire loadしています。

さて、問題は is_unique がtrueを返したときに、ほかに ArcWeak もないと確実に言えるかどうかです。これは以下のように順番に推論することでわかります。

  • まず、weakのmodification orderを考えます。weakのロックからアンロックまでの間にweakが変更されることはないので、 Weak の生成/消滅に由来するweakのインクリメント/デクリメントは「ロック以前」「アンロック以後」に分類できます。ロック時に weak == 1 を観測しているので、ロック以前に行われたインクリメントは、対応するデクリメントもロック以前に行われていることが保証されます。デクリメントは常にReleaseで行われているので、ロックと(直接的または間接的に)同期されます。この保証はアンロックが起きるタイミングまで延長できます。
  • 次に、strongのmodification orderを考えます。ロックはありませんが、 Arc の生成/消滅に由来するstrongのインクリメント/デクリメントは「is_unique 内のロード以前」「is_unique 内のロード以後」に分類できます。 is_unique 内のロードで strong == 1 を観測しているので、このタイミング以前に行われた(他のArcのための)インクリメントは、対応するデクリメントもロード以前に行われていることが保証されます。デクリメントは常にReleaseで行われているので、当該ロードと同期されます。
  • is_unique 内のロードは weak のロック~アンロックの間に発生しているので、このタイミングでは「この時点で存在しうる全ての他の Arc/Weak がdrop済み(正確には、dropは終わっていないかもしれないが、もはやヒープ領域へのアクセスはしない)」であることが保証されます。
  • 他スレッドに Arc/Weak がないので、それ以降は明示的に何かしない限りはこの状況が続くことが保証されます。

ひとたびユニーク性が保証できたら、あとは Rc::get_mut の場合と同じ理屈で考えることができます。

make_mut

make_muttry_unwrapget_mut の合わせ技的に振る舞うので、ロックは不要です。 https://github.com/rust-lang/rust/blob/1.39.0/src/liballoc/sync.rs#L995-L1047

impl<T: Clone> Arc<T> {
    pub fn make_mut(this: &mut Self) -> &mut T {
        // Note that we hold both a strong reference and a weak reference.
        // Thus, releasing our strong reference only will not, by itself, cause
        // the memory to be deallocated.
        //
        // Use Acquire to ensure that we see any writes to `weak` that happen
        // before release writes (i.e., decrements) to `strong`. Since we hold a
        // weak count, there's no chance the ArcInner itself could be
        // deallocated.
        if this.inner().strong.compare_exchange(1, 0, Acquire, Relaxed).is_err() {
            // Another strong pointer exists; clone
            *this = Arc::new((**this).clone());
        } else if this.inner().weak.load(Relaxed) != 1 {
            // Relaxed suffices in the above because this is fundamentally an
            // optimization: we are always racing with weak pointers being
            // dropped. Worst case, we end up allocated a new Arc unnecessarily.

            // We removed the last strong ref, but there are additional weak
            // refs remaining. We'll move the contents to a new Arc, and
            // invalidate the other weak refs.

            // Note that it is not possible for the read of `weak` to yield
            // usize::MAX (i.e., locked), since the weak count can only be
            // locked by a thread with a strong reference.

            // Materialize our own implicit weak pointer, so that it can clean
            // up the ArcInner as needed.
            let weak = Weak { ptr: this.ptr };

            // mark the data itself as already deallocated
            unsafe {
                // there is no data race in the implicit write caused by `read`
                // here (due to zeroing) because data is no longer accessed by
                // other threads (due to there being no more strong refs at this
                // point).
                let mut swap = Arc::new(ptr::read(&weak.ptr.as_ref().data));
                mem::swap(this, &mut swap);
                mem::forget(swap);
            }
        } else {
            // We were the sole reference of either kind; bump back up the
            // strong ref count.
            this.inner().strong.store(1, Release);
        }

        // As with `get_mut()`, the unsafety is ok because our reference was
        // either unique to begin with, or became one upon cloning the contents.
        unsafe {
            &mut this.ptr.as_mut().data
        }
    }
}

make_mut はまず strong.compare_exchange(1, 0, Acquire, Relaxed) を行うことで、 try_unwrap できる状況かどうかを調べています。できなかったら諦めてcloneします。

↑のCASが成功したら try_unwrap が可能ですが、 get_mut が可能ならそちらにフォールバックします。get_mut と異なり、一度0を入れた強参照に1を書き戻すのは参照がユニークだったときだけなので、このように書いてもユーザーが観測する挙動としておかしなことにはなりません。

strong_count/weak_count

Arc/Rc は強参照カウントや弱参照カウントを取得する関数を露出しています。

impl<T: ?Sized> Arc<T> {
    pub fn weak_count(this: &Self) -> usize {
        let cnt = this.inner().weak.load(SeqCst);
        // If the weak count is currently locked, the value of the
        // count was 0 just before taking the lock.
        if cnt == usize::MAX { 0 } else { cnt - 1 }
    }

    pub fn strong_count(this: &Self) -> usize {
        this.inner().strong.load(SeqCst)
    }
}

impl<T: ?Sized> Weak<T> {
    #[unstable(feature = "weak_counts", issue = "57977")]
    pub fn strong_count(&self) -> usize {
        if let Some(inner) = self.inner() {
            inner.strong.load(SeqCst)
        } else {
            0
        }
    }

    #[unstable(feature = "weak_counts", issue = "57977")]
    pub fn weak_count(&self) -> Option<usize> {
        // Due to the implicit weak pointer added when any strong pointers are
        // around, we cannot implement `weak_count` correctly since it
        // necessarily requires accessing the strong count and weak count in an
        // unsynchronized fashion. So this version is a bit racy.
        self.inner().map(|inner| {
            let strong = inner.strong.load(SeqCst);
            let weak = inner.weak.load(SeqCst);
            if strong == 0 {
                // If the last `Arc` has *just* been dropped, it might not yet
                // have removed the implicit weak count, so the value we get
                // here might be 1 too high.
                weak
            } else {
                // As long as there's still at least 1 `Arc` around, subtract
                // the implicit weak pointer.
                // Note that the last `Arc` might get dropped between the 2
                // loads we do above, removing the implicit weak pointer. This
                // means that the value might be 1 too low here. In order to not
                // return 0 here (which would happen if we're the only weak
                // pointer), we guard against that specifically.
                cmp::max(1, weak - 1)
            }
        })
    }
}

興味深いのは以下の4点です。

  • メモリ順序として SeqCst を使っている点。アトミック操作を直接露出する観点から、できるだけ直感的に振る舞うよう SeqCst を選んだのかもしれない。
  • weak がロックされていたときの処理。これらの関数は単なる読み取りしかないので、少し時系列をずらして「ロックされる直前の値」を返すことで、ロックの解放待ちを防ぐことができている。
  • Weak::weak_countOption を返す点。ぶら下がり Weak は、他の Weak 参照との同一性という観点から少々変わった挙動をするため、特定の整数を返すよりも None を返すのが妥当ということかもしれない。
  • Weak::weak_count が必ずしも正確な値を返さないこと。 weak は内部的には強参照が1個以上あるときは1多い値を格納する仕組みになっていたが、このせいで「Weak の個数」を数えるのが難しい。 weakstrong を同時に読むのは難しいからである。 Weak::upgrade を呼べば一貫した値を取得することも可能そうだけど、そこまでするほどでもなさそう。

まとめ

  • Arc はスレッドの休眠処理が不要でアトミック操作にしか依存しないため、直感に反して alloc クレートで定義されている。これは Rc が定義されているのと同じ場所である。
  • Arc は共有ポイントである一方、たまたま参照カウントが1のときに所有権を取り出す機能もあるため、 Arc<T>: SendArc<T>: SyncT: Send + Sync を要求する必要がある。
  • 原則としてカウンタ操作は競合状態を防ぐためRead-Modify-Write (RMW) アトミック操作を使って行われる。
  • 原則として、インクリメント時はRelaxed, デクリメント時はRelease操作として行い、カウンタが0になったときはAcquire fenceを呼ぶことで必要な同期が達成できる。
  • get_mut を安全に実装するために、弱参照カウンタにはロック機構が実装されている。
  • 公開インターフェースである strong_count/weak_count は内部処理と異なり SeqCst 順序が使われている。その実装は単なる内部カウンタのload/storeよりもやや興味深い実装になっている。

以上で「Rustの Arc を読む」シリーズは終わりです。比較的短いソースから、予想以上に多くのことが学べたのではないでしょうか。(筆者自身も予想を超えてたくさん執筆したため驚いています。) これが皆さんの学習の助けになればさいわいです。

47
19
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
47
19