Golang源码笔记:sync.Mutex源码阅读

结构体介绍

在平常调用sync库的时候,都会直接声明一个Mutex结构体,结构体主要有两个熟悉state表示当前锁的状态,主要有锁定状态(mutexLocked),唤醒状态(mutexWoken),饥饿状态(mutexStarving),
这三种状态后面会有说明,还有一个mutexWaiterShift代表获取当前锁的goroutine等待数目。
上面可以看到有饥饿模式,所以锁也有两种模式,一种正常模式,一种饥饿模式,两者的区别主要如下:

  1. 正常都是FIFO模式,在队列中第一个等待的goroutine唤醒后获取锁,但是如果此时有新的goroutine来竞争锁,此时新的goroutine有优势获取锁,因为新的goroutine正在占据CPU使用;
  2. 饥饿模式下,严格按照FIFO模式,新的goroutine获取不到当前锁,主动权直接交给队列第一个goroutine

type Mutex struct {
    state int32
    sema  uint32
}

// A Locker represents an object that can be locked and unlocked.
type Locker interface {
    Lock()
    Unlock()
}

竞争锁Lock

下面是获取锁,在第一次使用的时候,此时状态可能直接CAS替换的,如果竞争不是很激烈的话,锁的消耗还是可以忍受的。替换失败主要是当前锁有了等待数目,或者锁的状态不是初始值,
所以主要看下lockslow函数。


// Lock locks m.
// If the lock is already in use, the calling goroutine
// blocks until the mutex is available.
func (m *Mutex) Lock() {
    // 原子替换锁的状态
    if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
        if race.Enabled {
            race.Acquire(unsafe.Pointer(m))
        }
        return
    }

    m.lockSlow()
}

下面是lockslow函数,看此函数前,要明白两种状态,一种是锁的状态,一种是获取锁的goroutine所处的状态,有点迷,下面源码注释很清晰,可以看看。
需要说明的是,自旋状态进入有点苛刻,不过具体可以看看源码runtime.sync_runtime_canSpin函数,就不给源码了,编译是用//go:linkname sync_runtime_canSpin sync.runtime_canSpin
来做别名的,所以要注意这一点。具体进入自旋的条件有三个:

  1. 已自旋次数小于4
  2. CPU数目大于1
  3. GOMAXPROCS大于1
  4. 至少有一个其他的p队列,并且处理的运行队列为空
func (m *Mutex) lockSlow() {
    var waitStartTime int64 // 等待的时间
    starving := false   // 当前是否为饥饿模式
    awoke := false  // 当前是否处于唤醒状态
    iter := 0   // 自旋次数
    old := m.state  // 复制锁的状态
    for {
        // 锁还没有释放并且不是饥饿状态
        // 饥饿模式不进入自旋,因为饥饿模式锁的主动权给在等待队列第一个,所以进入自旋的话没有意义
        if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
            // 三种条件
            // 1. 当前groutine没有处于唤醒状态
            // 2. 锁没有处于mutexWoken唤醒状态
            // 3. waiter等待goutine数不等于0
            // 满足上述三种状态,尝试CAS更改锁的状态为唤醒状态,并设置当前goutine为唤醒状态
            if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
                atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
                awoke = true
            }
            runtime_doSpin()
            iter++
            old = m.state
            continue
        }
        // 下面为锁状态的更新,old 为下面CAS更改锁之前的当前时刻的状态
        new := old
        // 如果old state不是饥饿模式,则尝试获取锁,也就是设置锁为mutexLocked状态
        // 如果是饥饿模式,则严格按照FIFO队列排队获取,不主动去尝试获取锁
        if old&mutexStarving == 0 {
            new |= mutexLocked
        }
        // old state为锁定或者处于饥饿模式,waiter等待队列+1
        if old&(mutexLocked|mutexStarving) != 0 {
            new += 1 << mutexWaiterShift
        }
        // 如果当前goroutine为饥饿状态,并且锁的状态是锁定状态
        // 则设置锁为饥饿状态
        if starving && old&mutexLocked != 0 {
            new |= mutexStarving
        }
        if awoke {
            // 当前goutine处于唤醒状态便关闭锁的唤醒状态
            if new&mutexWoken == 0 {
                throw("sync: inconsistent mutex state")
            }
            new &^= mutexWoken
        }
        // 上面new这个状态的总结下,基本上以下几种情况
        // 1. 锁处于饥饿模式,则new不存在获取锁定状态,new 只是等待队列+1,进入饥饿状态等待排队获取锁
        // 2. 当前goroutine处于饥饿模式,锁不是饥饿模式,则new为 mutexLocked 并且等待队列+1,如果锁
        // 未被释放则设置锁状态为mutexStarving,进入饥饿模式。锁已经被释放,则当前goroutine尝试获取锁
        // 3. 当前goutine为唤醒,锁状态去除唤醒状态
        if atomic.CompareAndSwapInt32(&m.state, old, new) {
            // 如果old state不是饥饿状态并且被释放了,那么当前goroutine已经获取锁
            if old&(mutexLocked|mutexStarving) == 0 {
                break // locked the mutex with CAS
            }
            // 设置当前goroutine获取锁的等待时间
            // queueLifo代表是否是新的goroutine
            queueLifo := waitStartTime != 0
            if waitStartTime == 0 {
                waitStartTime = runtime_nanotime()
            }
            // 信号量获取,如果是新的goutine,加入队列尾部
            // 如果是唤醒的goroutine,加入队列头部
            runtime_SemacquireMutex(&m.sema, queueLifo, 1)
            // 下面执行代表本goroutine已经被唤醒
            // 如果等待超过1ms则当前goroutine进入饥饿模式
            starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
            old = m.state
            // 锁处于饥饿模式
            if old&mutexStarving != 0 {
                // 如果当前锁已经为锁定状态或者标记为唤醒,又或者锁的等待协程数为0,则是非法状态转换
                if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
                    throw("sync: inconsistent mutex state")
                }
                // waiter -1 / 设置锁定
                delta := int32(mutexLocked - 1<<mutexWaiterShift)
                if !starving || old>>mutexWaiterShift == 1 {
                    // 如果当前goroutine不是饥饿模式,或者本次goroutine为最后一个等待者,退出饥饿模式
                    delta -= mutexStarving
                }
                // 获得锁退出
                atomic.AddInt32(&m.state, delta)
                break
            }
            // 正常模式下,当前goutine唤醒,自旋次数初始为0
            awoke = true
            iter = 0
        } else {
            // 重新开始
            old = m.state
        }
    }

}

释放锁Unlock

相对于获得锁,释放锁的代码要简单许多,先是释放当前锁状态,然后如果有等待的队列情况下,查看是否为饥饿状态,如果饥饿状态直接递交主动权到waiter队列第一个
goroutine,正常状态下,就是释放当前信号量,唤醒第一个goroutine竞争锁。

func (m *Mutex) Unlock() {
    if race.Enabled {
        _ = m.state
        race.Release(unsafe.Pointer(m))
    }

    // 释放当前锁,更改状态
    new := atomic.AddInt32(&m.state, -mutexLocked)
    if new != 0 {
        // 锁不是0的状况,说明有等待释放队列
        m.unlockSlow(new)
    }
}

func (m *Mutex) unlockSlow(new int32) {
    // 非法状态,当前锁已经被释放
    if (new+mutexLocked)&mutexLocked == 0 {
        throw("sync: unlock of unlocked mutex")
    }

    if new&mutexStarving == 0 {
        // 正常模式下
        old := new
        for {
            // 当前没有等待者 或者 锁处于锁定/唤醒/饥饿三种状态之一,直接返回
            // 进入当前,正常状态下是锁已经被释放了
            // 所以上面三个状态说明锁有改变
            if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
                return
            }
            // 减去一个等待数量,并唤醒锁
            new = (old - 1<<mutexWaiterShift) | mutexWoken
            if atomic.CompareAndSwapInt32(&m.state, old, new) {
                // 信号量释放,这样队列头部第一个会被唤醒
                runtime_Semrelease(&m.sema, false, 1)
                return
            }
            old = m.state
        }
    } else {
        // 饥饿模式直接将主动权给waiter第一个
        runtime_Semrelease(&m.sema, true, 1)
    }
}

不足

由于还没去了解goroutine的调度逻辑,所以很多不是很好说明,例如信号量的具体逻辑处理,后续文章再做这方面补充。

发表回复

您的电子邮箱地址不会被公开。 必填项已用 * 标注

1 + 8 =