LoginSignup
3
0

More than 1 year has passed since last update.

GoのSync.mutexを内部実装から理解したい

Last updated at Posted at 2021-06-18

始める前に注意点

GolangのG,P,scheduleについての理解があるとより分かりやすくなるかと思います。
GやPについての解説も書いていきたいと思っていますが、下記に参考になる記事を貼っておきます。

Schedule
https://qiita.com/tobi-c/items/41e6c786c858a513f67e
https://qiita.com/takc923/items/de68671ea889d8df6904
https://qiita.com/niconegoto/items/3952d3c53d00fccc363b
https://dtyler.io/articles/2021/03/28/goroutine_preemption/
G,M,Pについて
https://niconegoto.hatenadiary.jp/entry/2017/04/11/092810
https://christina04.hatenablog.com/entry/why-goroutine-is-good

sync.Mutexについて

複数のGoroutineがリソースを取り合う時に使うやつ。
例えばmapとかを複数のgoroutineから使いたい時m.lockみたいにするあれ。
mutexでやりたいことはわかっても、実際どのように動いているのかわからなかったので内部構造を見てみます。

通常モードとstarvingモードについて

二つのモードがあり、starvingモードは名前の通り飢餓状態の時です。
何が飢餓なのかというと、ロック解除待ちのGoroutineのことです。
1ms以上ロック解除を待つGoroutineが発生するとMutexが飢餓モードに移行します。

starvingモードは何を解決したか

ソースコード中の説明を引用すると、

Starvation mode is important to prevent pathological cases of tail latency

とあります。

tailLantencyとはプロセスの平均待ち時間ではなく、latencyが長いプロセスを順に並べて1,2,3...99としたら98とか99とかのlatencyのことです。

下記のコードで,
1.16では、g1Count is 30, g2Count is 10
1.8(飢餓モード導入以前)ではg1Count is 4020, g2Count is 10となっていたことから、
g2の待ち時間が短くなっているといえるので、飢餓モードによってtailLatencyが改善されていると言えるはずです。

package main

import (
    "fmt"
    "sync"
    "time"
)

func main() {
    done := make(chan bool, 1)
    var mu sync.Mutex

    var g1Count int
    var g2Count int

    // goroutine 1
    go func() {
        for {
            select {
            case <-done:
                return
            default:
                mu.Lock()
                g1Count++
                time.Sleep(100 * time.Microsecond)
                mu.Unlock()
            }
        }
    }()

    // goroutine 2
    for i := 0; i < 10; i++ {
        time.Sleep(100 * time.Microsecond)
        mu.Lock()
        g2Count++
        mu.Unlock()
    }
    fmt.Printf("g1Count is %d, g2Count is %d", g1Count, g2Count)
    done <- true
}

Sync.Mutex本体

sync.MutexnにはLock,Unlockがありますが、Lockだけでは挙動を理解しきれず、Unlockも見ていかないといけないところもあるので、
Lockを見ていく中で、Unlockを必要に応じてみるという形にしていきます。

まず通常モードの動きから解説していきます。
ここからはGoroutineのことを略して、Gと呼んでいきます。
mutex.goのLock()が入口。

mutex.go
type Mutex struct {
    state int32
    sema  uint32
}

const (
    mutexLocked = 1 << iota // mutex is locked
    mutexWoken
    mutexStarving
    mutexWaiterShift = iota
)

func (m *Mutex) Lock() {
    // Fast path: grab unlocked mutex.
    if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
        if race.Enabled {
            race.Acquire(unsafe.Pointer(m))
        }
        return
    }
    // Slow path (outlined so that the fast path can be inlined)
    m.lockSlow()
}

いきなりatomic.CompareAndSwapInt32と見慣れないやつが出てきました。
何をしているかというと、
compareAndSwap(value,old,new)で、
valueとoldを比べ、同じならtrueを返し、valueにnewを入れる
value != oldならfalse
所謂CASと訳されるやつです。
参考
https://www.geeksforgeeks.org/atomic-compareandswapint32-function-in-golang-with-examples/

そして、atomicなので一つのGしか同時にCASをできません。
G1とG2でLockを争っていることを考えてみます。最初はm.state=0として、G1が先にCASを成功させたとすると、G2のCASではm.state=mutexLocked=1となっているので、CASは成功しません。

なのでLockではm.state=0,ロックがかかってないときに一番最初にm.state=1としたやつがロックを取得したことになり、
そのままreturnできます。

ロックを取得できなかったGはlockSlowへ進むこととなります。ここからが本番。

書かれているコメントも参考になるので長いとは思いますが、一度全体を貼っておきます。

mutex.go
func (m *Mutex) lockSlow() {
    var waitStartTime int64
    starving := false
    awoke := false
    iter := 0
    old := m.state
    for {
               ----spinパート

        // Don't spin in starvation mode, ownership is handed off to waiters
        // so we won't be able to acquire the mutex anyway.
        if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
            // Active spinning makes sense.
            // Try to set mutexWoken flag to inform Unlock
            // to not wake other blocked goroutines.
            if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
                atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
                awoke = true
            }
            runtime_doSpin()
            iter++
            old = m.state
            continue
        }

                ---spin終わり

                ---stateいろいろ弄っている
        new := old
        // Don't try to acquire starving mutex, new arriving goroutines must queue.
        if old&mutexStarving == 0 {
            new |= mutexLocked
        }
        if old&(mutexLocked|mutexStarving) != 0 {
            new += 1 << mutexWaiterShift
        }
        // The current goroutine switches mutex to starvation mode.
        // But if the mutex is currently unlocked, don't do the switch.
        // Unlock expects that starving mutex has waiters, which will not
        // be true in this case.
        if starving && old&mutexLocked != 0 {
            new |= mutexStarving
        }
        if awoke {
            // The goroutine has been woken from sleep,
            // so we need to reset the flag in either case.
            if new&mutexWoken == 0 {
                throw("sync: inconsistent mutex state")
            }
            new &^= mutexWoken
        }
                --state弄り終了

                -------ロック取得パート
        if atomic.CompareAndSwapInt32(&m.state, old, new) {
            if old&(mutexLocked|mutexStarving) == 0 {
                break // locked the mutex with CAS
            }
            // If we were already waiting before, queue at the front of the queue.
            queueLifo := waitStartTime != 0
            if waitStartTime == 0 {
                waitStartTime = runtime_nanotime()
            }

                        -----ロック取得できていないので睡眠パート....
            runtime_SemacquireMutex(&m.sema, queueLifo, 1)

                        -----睡眠から目覚める
            starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
            old = m.state
            if old&mutexStarving != 0 {
                // If this goroutine was woken and mutex is in starvation mode,
                // ownership was handed off to us but mutex is in somewhat
                // inconsistent state: mutexLocked is not set and we are still
                // accounted as waiter. Fix that.
                if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
                    throw("sync: inconsistent mutex state")
                }
                delta := int32(mutexLocked - 1<<mutexWaiterShift)
                if !starving || old>>mutexWaiterShift == 1 {
                    // Exit starvation mode.
                    // Critical to do it here and consider wait time.
                    // Starvation mode is so inefficient, that two goroutines
                    // can go lock-step infinitely once they switch mutex
                    // to starvation mode.
                    delta -= mutexStarving
                }
                atomic.AddInt32(&m.state, delta)
                break
            }
            awoke = true
            iter = 0
        } else {
            old = m.state
        }
    }

}

パートごとに分けてみました。
基本的にロックを取得しようとするGはロックを取得できるまでforループでぐるぐる回ることになります。
まずはspinパートから見ていきましょう。

spinパート

mutex.go

        if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
            if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
                atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
                awoke = true
            }
            runtime_doSpin()
            iter++
            old = m.state
            continue
        }

ここで、stateについて少し説明します。
stateは int32となっていました。
int32なので32bitです。
lockedが1ビット目,wokenが2ビット目,starvingが3ビット目でそれぞれ表されています。残りの4ビット~32ビット目はwaiterの個数です。

lockedとstarvingはそれぞれ一番上で説明したようにモードのことで、wokenもモードのようなものです。これは一番最後の説明とします。
waiterの個数とはruntime_SemacquireMutexで眠りについたGの個数のことです。
眠っているGはmutex.goのforループで動いているGと違い眠っているのでロック争いには参加できません。参加できるのは起きた時です。
後々わかってくると思うので、今は32ビットでいろいろ表しているということだけ留めておいてください。

上記を踏まえて、spinを抜けられる条件について考えていきましょう。
forループの前にold := m.stateとなっています。(forループ中にもちょくちょくoldはm.stateで更新されます。他のGからの更新があるため状態がどんどんかわっていくためです。)
old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) がfalseであればspinから抜けられるのですが、

old&(mutexLocked|mutexStarving) == mutexLockedは、
&のビット演算なので、old = ???、mutexLocked|mutexStarving = ...101(1,2ビット目がonの状態)なので、
mutexLockedと == になるためには old & ....101 -> .....001になればいいので、

oldがmutexStarvingフラグが立っている(old & ....101で ....1??となる)
oldがロック解除状態(old & ....101で..??0となる)
もしくはcanSpinできない
時にspinから抜けることができます。

canSpinは、下記の通りになっていて、大雑把に言えば、余裕があるときにGをspinさせていられるという条件です。
余裕がないときとは、PのqueueにGがあり、他のGがschedule待ちだったり、そもそもcpuの個数が少なかったりといった感じです。
spin中はGがspinに専有されてしまうので、ブロック状態になっている余裕があるかどうかですね。

canspin.go
// Active spinning for sync.Mutex.
//go:linkname sync_runtime_canSpin sync.runtime_canSpin
//go:nosplit
func sync_runtime_canSpin(i int) bool {
    // sync.Mutex is cooperative, so we are conservative with spinning.
    // Spin only few times and only if running on a multicore machine and
    // GOMAXPROCS>1 and there is at least one other running P and local runq is empty.
    // As opposed to runtime mutex we don't do passive spinning here,
    // because there can be work on global runq or on other Ps.
    if i >= active_spin || ncpu <= 1 || gomaxprocs <= int32(sched.npidle+sched.nmspinning)+1 {
        return false
    }
    if p := getg().m.p.ptr(); !runqempty(p) {
        return false
    }
    return true
}

spin中で使われている、doSpinですが、本体は

asm.go
runtime·procyield
code:go
 TEXT runtime·procyield(SB),NOSPLIT,$0-0
    MOVL    cycles+0(FP), AX
 again:
    PAUSE
    SUBL    $1, AX
    JNZ again
    RET

上記の通りprocyieldの引数のcyclesの数だけ、PAUSE時間分待ちます。
pausseの参考
https://stackoverflow.com/questions/4725676/how-does-x86-pause-instruction-work-in-spinlock-and-can-it-be-used-in-other-sc

state準備パート

state準備パートでは新しいstateになったらどんなstateか?という準備をしていきます。
実際に準備したstateのnewが反映されるのは準備が終わった後で、CASが実行されるときです。

starvingとawokeのところは後程やっていくので、今回は下記の二つを見ていきましょう。

mutex.go
        if old&mutexStarving == 0 {
            new |= mutexLocked
        }

        if old&(mutexLocked|mutexStarving) != 0 {
            new += 1 << mutexWaiterShift
        }

まず一番目ですが、今は通常モードなので new |= mutexLockedとなります。
二番目はすでにロック済みだった場合、もしくはstarvingModeだった場合です。
その場合は、ロックを獲得できないことがわかっているので睡眠状態(後述しますが、sema.goのwaitqueueに入っている状態のことです。)に入ります。
そのためwaiterの数を一人増やしています。

mutex.go
waiterShift=3
new += 1 << mutexWaiterShift
->
1<<3となっているので 0001を1000としている
なぜかといえばstateの1,2,3ビット目はそれぞれフラグに使用されていて以降のビットがwaiterの人数を表していたからです 

ロック取得もしくは睡眠パート

stateを準備した後は、いよいよロックを獲得するときです。その部分のコードを見ていきましょう。

mutex.go
if atomic.CompareAndSwapInt32(&m.state, old, new) {
            if old&(mutexLocked|mutexStarving) == 0 {
                break // locked the mutex with CAS
            }

            runtime_SemacquireMutex(&m.sema, queueLifo, 1)



ここでもCASが出てきますね。
oldは現在のGのstate、newは準備パートで用意した変更予定のstateです。
次のようなシチュエーションを考えてみます。
spinからG1とG2がロック解除のため抜けたとします。(spinから抜ける条件を思い出してください。)
G1 g1Old = 0(mutexLockもなんのフラグもたっていない) g1New = mutexLocked
G2 g2Old = 0(mutexLockもなんのフラグもたっていない) g2New = mutexLocked
となり、G1がCASに成功したとします。

G2がCASをしようとするときは、m.state = g1Newとなっており、g1New != g2Oldなので、CASに成功できず、ロック取得することができません。
なのでここでもLock()と同じくatomicとなっています。
ここでロック取得というのは、あるGがstateにmutexLockedを反映させて、LockSLow()からbreakで抜けることをいいます。
CASに成功して、mutexLockedをStateに反映させたGは、下記の条件を満たし、LockSlow()から抜けることができます。

mutex.go
if old&(mutexLocked|mutexStarving) == 0 {
                break // locked the mutex with CAS
            }

ここまでが順調にLockSlowwでロック解除できた時の流れです(↑の例でいえばG1)
G2は、CASを取得できなかったので、elseに行き、oldを新しいstate(g1New)に更新してまたループに戻ります。

mutex.go
 old = m.state

さて、ロック取得もしくは睡眠パートと銘打っておきながら、睡眠パート(runtime_SemacquireMutex)までたどり着いていません。
睡眠はどのような時に起こるのでしょうか。
上で、

CASに成功して、mutexLockedをStateに反映させたG

と書きましたが、
ロック取得にはもうひとつ条件があって、oldにmutexLockedフラグが立っていないこと、という条件があります。

ここで一つシチュエーションを考えてみましょう。
spinから抜けることができる状態の一つにcanSpin条件を満たさないことがありました。
canSpin状態を満たさない、という条件でspinから出た時にstateは
state = mutexLocked もしくは state = mutexStarvingとなっています。
今回はmutexStarvingは考えないので通常モードでcanSpin状態を満たせずspinから抜けた時は、mutexLockedとなっていると考えられます。
mutexLockedなのでwaiterが増えるわけですが、waiter部分は正直あまり気にしないでもいいです。(starvingの説明の最後に少しだけwaiterを使いますが、基本waiter絡みは省いて説明します。)

ここでもG1,G2を使い、
G1 g1Old = mutexLocked g1New = mutexLocked+someWaiter
G2 g2Old = mutexLocked g2New = mutexLocked+someWaiter
となります。
このときCASに成功して、下記の条件に差し掛かってもoldにmutexLockedフラグが立っているので、
breakすることができません。このときに睡眠状態に入ります。

mutex.go
if old&(mutexLocked|mutexStarving) == 0 {
                break // locked the mutex with CAS
            }

睡眠パート(sema)

ここからは睡眠パートに入っていきます。

sema.go

runtime_SemacquireMutex(&m.sema, queueLifo, 1)

func sync_runtime_SemacquireMutex(addr *uint32, lifo bool, skipframes int) {
    semacquire1(addr, lifo, semaBlockProfile|semaMutexProfile, skipframes)
}

func semacquire1(addr *uint32, lifo bool, profile semaProfileFlags, skipframes int) {
    gp := getg()
    if gp != gp.m.curg {
        throw("semacquire not on the G stack")
    }

    // Easy case.
    if cansemacquire(addr) {
        return
    }

    // Harder case:
    //  increment waiter count
    //  try cansemacquire one more time, return if succeeded
    //  enqueue itself as a waiter
    //  sleep
    //  (waiter descriptor is dequeued by signaler)
    s := acquireSudog()
    root := semroot(addr)
    t0 := int64(0)
    s.releasetime = 0
    s.acquiretime = 0
    s.ticket = 0
    if profile&semaBlockProfile != 0 && blockprofilerate > 0 {
        t0 = cputicks()
        s.releasetime = -1
    }
    if profile&semaMutexProfile != 0 && mutexprofilerate > 0 {
        if t0 == 0 {
            t0 = cputicks()
        }
        s.acquiretime = t0
    }
    for {
        lockWithRank(&root.lock, lockRankRoot)
        // Add ourselves to nwait to disable "easy case" in semrelease.
        atomic.Xadd(&root.nwait, 1)
        // Check cansemacquire to avoid missed wakeup.
        if cansemacquire(addr) {
            atomic.Xadd(&root.nwait, -1)
            unlock(&root.lock)
            break
        }
        // Any semrelease after the cansemacquire knows we're waiting
        // (we set nwait above), so go to sleep.
        root.queue(addr, s, lifo)
        goparkunlock(&root.lock, waitReasonSemacquire, traceEvGoBlockSync, 4+skipframes)
        if s.ticket != 0 || cansemacquire(addr) {
            break
        }
    }
    if s.releasetime > 0 {
        blockevent(s.releasetime-t0, 3+skipframes)
    }
    releaseSudog(s)
}

まずはcansemacquireから見ていきましょう。

sema.go
func cansemacquire(addr *uint32) bool {
    for {
        v := atomic.Load(addr)
        if v == 0 {
            return false
        }
        if atomic.Cas(addr, v, v-1) {
            return true
        }
    }
}

ここでもCASが出てきます。
addrとはm.stateと並んで、mutex構造体にあった、m.semaのことです。
trueであれば、睡眠状態にならずにmutex.goにすぐに戻れます。
どんな時にtrueになり、どんな時にfalseになるのでしょうか。

v = m.semaが0のときは絶対にfalseになるのですが、そもそもsemaにいつ値が足されて、いつ0になっているのでしょうか。
ここらへんでUnlock側も見ていかないとわからなくなってしまうので、
canaquireについてunlockと合わせてみていきましょう。

sema.go
func semroot(addr *uint32) *semaRoot {
    return &semtable[(uintptr(unsafe.Pointer(addr))>>3)%semTabSize].root
}

type semaRoot struct {
    lock  mutex
    treap *sudog // root of balanced tree of unique waiters.
    nwait uint32 // Number of waiters. Read w/o the lock.
}

// Prime to not correlate with any user patterns.
const semTabSize = 251

var semtable [semTabSize]struct {
    root semaRoot
    pad  [cpu.CacheLinePadSize - unsafe.Sizeof(semaRoot{})]byte
}
sema.go
func semrelease(addr *uint32) {
    semrelease1(addr, false, 0)
}

func semrelease1(addr *uint32, handoff bool, skipframes int) {
    root := semroot(addr)
    atomic.Xadd(addr, 1)

    // Easy case: no waiters?
    // This check must happen after the xadd, to avoid a missed wakeup
    // (see loop in semacquire).
    if atomic.Load(&root.nwait) == 0 {
        return
    }


        //待っているGをPのQueueに追加する処理が入る

semaのUnlock側で使われるsemareleaseを見ると、releaseした時にaddrを+1しています。
なので、Unlock状態はsemaが1、Lock状態はsemaが0と考えるとよさそうです。
なのでv=0のとき絶対falseというのはロック済みだから、ということで、
atomic.Cas(addr, v, v-1)の部分はそのままv-1にしてロック解除状態にできたGのみcanaquireとみなせるということですね。(本筋のmutexのロックではなく、semaのロックを獲得したに過ぎないので注意、mutexのロック争いに負けた時に負けたGがさらにsemaで争っている...)

ついでに、&root.nwaitも見てみましょう、これは名前の通りsemaの待ち人数ですね。
releaseの時に、待っている人が0人だと何もしません。
待っている人がいるときはPのQueueに追加してschedule可能な状態にしてあげます。

semaquireに戻ります。
canaquireでsemaのロックを取得できなかったときですね。

sema.go
        s := acquireSudog()
    root := semroot(addr)
    ...本筋でない部分は省略
    for {
        lockWithRank(&root.lock, lockRankRoot)
        // Add ourselves to nwait to disable "easy case" in semrelease.
        atomic.Xadd(&root.nwait, 1)
        // Check cansemacquire to avoid missed wakeup.
        if cansemacquire(addr) {
            atomic.Xadd(&root.nwait, -1)
            unlock(&root.lock)
            break
        }
        // Any semrelease after the cansemacquire knows we're waiting
        // (we set nwait above), so go to sleep.
        root.queue(addr, s, lifo)
        goparkunlock(&root.lock, waitReasonSemacquire, traceEvGoBlockSync, 4+skipframes)
        if s.ticket != 0 || cansemacquire(addr) {
            break
        }
    }

sudoGとroot.queue(),root.dequeue()についての説明を少ししたいと思います。
sudoGについて
sudoGはGのWrapperとなっていて、channelやselectだったり排他処理が必要なところで主に使われています。
channel,特にselectでの使われ方が面白いのでselect理解編で見ていきたいと思っています。

semaでは、semaRootがTreapというデータ構造を使ってsudoGのtreeを作っています。
Treap参考
https://medium.com/carpanese/a-visual-introduction-to-treap-data-structure-part-1-6196d6cc12ee
sudoGだったり、semaRootについては下記がわかりやすい図を載せてくださっています。
https://github.com/cch123/golang-notes/blob/master/semaphore.md

sudoG.go
type sudog struct {
    g *g

    next *sudog
    prev *sudog
    elem unsafe.Pointer // data element (may point to stack)
    acquiretime int64
    releasetime int64
    ticket      uint32
    isSelect bool
    success bool
    parent   *sudog // semaRoot binary tree
    waitlink *sudog // g.waiting list or semaRoot
    waittail *sudog // semaRoot
    c        *hchan // channel
}

root.queue()とroot.dequeue()について
この二つは上で説明したtreapを使ってsudoGを出し入れするメソッドです。
とりあえずdequeueはtreapの最初からとってくることだけ覚えればここでは問題ないかと思います。

さて、睡眠パートの最後に入ります。
root.queueでGを包んだsudoGをsemaroot.queueに追加した後、
goparkunlock(&root.lock, waitReasonSemacquire, traceEvGoBlockSync, 4+skipframes)でGは眠りにつきます。
goparkはここでは深く説明しませんが、他のGにコンテキストスイッチを促すものです。
GはPのQueueに入っていなければschduleされないので、unlockでroot.queueからPのqueueに復帰させてもらえない限り、このGが動くことはありません。

ここまでで睡眠パートも終了です。

Unlockについて

ここからはUnlockを見ていきます。
眠ってしまったGはいつ目覚めることができるのでしょうか。
一旦視点をmutex.goに戻してみましょう。

mutex.go
func (m *Mutex) Unlock() {

    // Fast path: drop lock bit.
    new := atomic.AddInt32(&m.state, -mutexLocked)
    if new != 0 {
        // Outlined slow path to allow inlining the fast path.
        // To hide unlockSlow during tracing we skip one extra frame when tracing GoUnblock.
        m.unlockSlow(new)
    }
}

まずはstateからLockを除いています。
これによってspinからmutexLockedがない条件の脱出ができるわけですね。
次のnew != 0は starvingフラグが立ってたり、waiterがいたりした時の話です。
そのときはunlockSlow()へ行きます。

mutex.go
func (m *Mutex) unlockSlow(new int32) {
    if (new+mutexLocked)&mutexLocked == 0 {
        throw("sync: unlock of unlocked mutex")
    }
    if new&mutexStarving == 0 {
        old := new
        for {
            if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
                return
            }
            // Grab the right to wake someone.
            new = (old - 1<<mutexWaiterShift) | mutexWoken
            if atomic.CompareAndSwapInt32(&m.state, old, new) {
                runtime_Semrelease(&m.sema, false, 1)
                return
            }
            old = m.state
        }
    } else {

        runtime_Semrelease(&m.sema, true, 1)
    }
}

elseの条件の方はstarvingModeなので次回やります。
if条件の方で、
待っている人が0人だったらsemaから起こす意味ないのでreturn、
forループ中にstateに変化があり他のGがロックを取得してmutexLockedになっていても、起こせないのでreturn

もしsemaから起こせそうならば、waiterの数を-1してさらにCASが成功したらruntime_Semreleaseで起こしに行きます。
ここでCASを使っているのは、UnlockでCASに来るまではUnLock状態ですが、それまでに、Lock()側で、最新のm.stateがロック状態になっていることがあるからです。

次はruntime_Semreleaseを再訪しましょう。

sema.go
func semrelease1(addr *uint32, handoff bool, skipframes int) {
    root := semroot(addr)
    atomic.Xadd(addr, 1)

    // Easy case: no waiters?
    // This check must happen after the xadd, to avoid a missed wakeup
    // (see loop in semacquire).
    if atomic.Load(&root.nwait) == 0 {
        return
    }

    // Harder case: search for a waiter and wake it.
    lockWithRank(&root.lock, lockRankRoot)
    if atomic.Load(&root.nwait) == 0 {
        // The count is already consumed by another goroutine,
        // so no need to wake up another goroutine.
        unlock(&root.lock)
        return
    }

    s, t0 := root.dequeue(addr)
    if s != nil {
        atomic.Xadd(&root.nwait, -1)
    }
    unlock(&root.lock)
    if s != nil { // May be slow or even yield, so unlock first
        acquiretime := s.acquiretime
        if acquiretime != 0 {
            mutexevent(t0-acquiretime, 3+skipframes)
        }
        if s.ticket != 0 {
            throw("corrupted semaphore ticket")
        }
        if handoff && cansemacquire(addr) {
            s.ticket = 1
        }
        readyWithTime(s, 5+skipframes)
        if s.ticket == 1 && getg().m.locks == 0 {
            // See issue 33747 for discussion.
            goyield()
        }
    }
}


dequeueまでは以前説明しました。
dequeueはsemarootのqueueから先頭のsudoGを取り出します。

handoffというのはstarvingModeで使います。
starvingModeの時のここの動きが面白いのですが、それはstarvingModeの時やりましょう。

さて、readyWithTimeですが、これはsudoGからGを抜き出し、そのGをPのQueueに入れることで、schedule対象にします。
これで、PのQueueに復帰したGがsema.goに戻ってまたロック争いに戻ってくるという流れです。

ここまでで通常モードの解説は終わりです。
最後にシチュエーションを通してロックを取り合う動きを再確認していきます。

シチュエーションを通して動きを確認

通常モードでfor中の複数が取り合う場合
if atomic.CompareAndSwapInt32(&m.state, old, new)の部分で、一つしかロックを取得できないようになっているので安心です。

通常モードでfor中のやつと、新たに入ってきたGが取り合う場合
新たにロックの取り合いに入ってきたGもLock()の
if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) でチェックしているので安心。

ずっとUnlockされずにいた場合
この場合はcanSpin条件を満たせずに、runtime_SemacquireMutex(&m.sema, queueLifo, 1)に入り、
goparkunlock(&root.lock, waitReasonSemacquire, traceEvGoBlockSync, 4+skipframes)でsema.rootのqueueに入ります。
後にUnlockされた時にreadyWithTime(s, 5+skipframes)でPのQueueに復帰し、schdule()された後にロック争いに復帰します。

ここまで通常モードの動きを確認してきました。
次回はstarvingModeについて見ていきます。
starvingModeはUnLockの時に工夫が凝らされているので、こちらも一緒に見ていきましょう。

次回
GoのSync.mutexを内部実装から理解したい StarvingMode編

3
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
3
0