结构体介绍
在平常调用sync库的时候,都会直接声明一个Mutex结构体,结构体主要有两个熟悉state
表示当前锁的状态,主要有锁定状态(mutexLocked
),唤醒状态(mutexWoken
),饥饿状态(mutexStarving
),
这三种状态后面会有说明,还有一个mutexWaiterShift
代表获取当前锁的goroutine等待数目。
上面可以看到有饥饿模式,所以锁也有两种模式,一种正常模式,一种饥饿模式,两者的区别主要如下:
- 正常都是FIFO模式,在队列中第一个等待的goroutine唤醒后获取锁,但是如果此时有新的goroutine来竞争锁,此时新的goroutine有优势获取锁,因为新的goroutine正在占据CPU使用;
- 饥饿模式下,严格按照FIFO模式,新的goroutine获取不到当前锁,主动权直接交给队列第一个goroutine
type Mutex struct {
state int32
sema uint32
}
// A Locker represents an object that can be locked and unlocked.
type Locker interface {
Lock()
Unlock()
}
竞争锁Lock
下面是获取锁,在第一次使用的时候,此时状态可能直接CAS替换的,如果竞争不是很激烈的话,锁的消耗还是可以忍受的。替换失败主要是当前锁有了等待数目,或者锁的状态不是初始值,
所以主要看下lockslow
函数。
// Lock locks m.
// If the lock is already in use, the calling goroutine
// blocks until the mutex is available.
func (m *Mutex) Lock() {
// 原子替换锁的状态
if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
if race.Enabled {
race.Acquire(unsafe.Pointer(m))
}
return
}
m.lockSlow()
}
下面是lockslow
函数,看此函数前,要明白两种状态,一种是锁的状态,一种是获取锁的goroutine所处的状态,有点迷,下面源码注释很清晰,可以看看。
需要说明的是,自旋状态进入有点苛刻,不过具体可以看看源码runtime.sync_runtime_canSpin
函数,就不给源码了,编译是用//go:linkname sync_runtime_canSpin sync.runtime_canSpin
来做别名的,所以要注意这一点。具体进入自旋的条件有三个:
- 已自旋次数小于4
- CPU数目大于1
- GOMAXPROCS大于1
- 至少有一个其他的p队列,并且处理的运行队列为空
func (m *Mutex) lockSlow() {
var waitStartTime int64 // 等待的时间
starving := false // 当前是否为饥饿模式
awoke := false // 当前是否处于唤醒状态
iter := 0 // 自旋次数
old := m.state // 复制锁的状态
for {
// 锁还没有释放并且不是饥饿状态
// 饥饿模式不进入自旋,因为饥饿模式锁的主动权给在等待队列第一个,所以进入自旋的话没有意义
if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
// 三种条件
// 1. 当前groutine没有处于唤醒状态
// 2. 锁没有处于mutexWoken唤醒状态
// 3. waiter等待goutine数不等于0
// 满足上述三种状态,尝试CAS更改锁的状态为唤醒状态,并设置当前goutine为唤醒状态
if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
awoke = true
}
runtime_doSpin()
iter++
old = m.state
continue
}
// 下面为锁状态的更新,old 为下面CAS更改锁之前的当前时刻的状态
new := old
// 如果old state不是饥饿模式,则尝试获取锁,也就是设置锁为mutexLocked状态
// 如果是饥饿模式,则严格按照FIFO队列排队获取,不主动去尝试获取锁
if old&mutexStarving == 0 {
new |= mutexLocked
}
// old state为锁定或者处于饥饿模式,waiter等待队列+1
if old&(mutexLocked|mutexStarving) != 0 {
new += 1 << mutexWaiterShift
}
// 如果当前goroutine为饥饿状态,并且锁的状态是锁定状态
// 则设置锁为饥饿状态
if starving && old&mutexLocked != 0 {
new |= mutexStarving
}
if awoke {
// 当前goutine处于唤醒状态便关闭锁的唤醒状态
if new&mutexWoken == 0 {
throw("sync: inconsistent mutex state")
}
new &^= mutexWoken
}
// 上面new这个状态的总结下,基本上以下几种情况
// 1. 锁处于饥饿模式,则new不存在获取锁定状态,new 只是等待队列+1,进入饥饿状态等待排队获取锁
// 2. 当前goroutine处于饥饿模式,锁不是饥饿模式,则new为 mutexLocked 并且等待队列+1,如果锁
// 未被释放则设置锁状态为mutexStarving,进入饥饿模式。锁已经被释放,则当前goroutine尝试获取锁
// 3. 当前goutine为唤醒,锁状态去除唤醒状态
if atomic.CompareAndSwapInt32(&m.state, old, new) {
// 如果old state不是饥饿状态并且被释放了,那么当前goroutine已经获取锁
if old&(mutexLocked|mutexStarving) == 0 {
break // locked the mutex with CAS
}
// 设置当前goroutine获取锁的等待时间
// queueLifo代表是否是新的goroutine
queueLifo := waitStartTime != 0
if waitStartTime == 0 {
waitStartTime = runtime_nanotime()
}
// 信号量获取,如果是新的goutine,加入队列尾部
// 如果是唤醒的goroutine,加入队列头部
runtime_SemacquireMutex(&m.sema, queueLifo, 1)
// 下面执行代表本goroutine已经被唤醒
// 如果等待超过1ms则当前goroutine进入饥饿模式
starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
old = m.state
// 锁处于饥饿模式
if old&mutexStarving != 0 {
// 如果当前锁已经为锁定状态或者标记为唤醒,又或者锁的等待协程数为0,则是非法状态转换
if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
throw("sync: inconsistent mutex state")
}
// waiter -1 / 设置锁定
delta := int32(mutexLocked - 1<<mutexWaiterShift)
if !starving || old>>mutexWaiterShift == 1 {
// 如果当前goroutine不是饥饿模式,或者本次goroutine为最后一个等待者,退出饥饿模式
delta -= mutexStarving
}
// 获得锁退出
atomic.AddInt32(&m.state, delta)
break
}
// 正常模式下,当前goutine唤醒,自旋次数初始为0
awoke = true
iter = 0
} else {
// 重新开始
old = m.state
}
}
}
释放锁Unlock
相对于获得锁,释放锁的代码要简单许多,先是释放当前锁状态,然后如果有等待的队列情况下,查看是否为饥饿状态,如果饥饿状态直接递交主动权到waiter队列第一个
goroutine,正常状态下,就是释放当前信号量,唤醒第一个goroutine竞争锁。
func (m *Mutex) Unlock() {
if race.Enabled {
_ = m.state
race.Release(unsafe.Pointer(m))
}
// 释放当前锁,更改状态
new := atomic.AddInt32(&m.state, -mutexLocked)
if new != 0 {
// 锁不是0的状况,说明有等待释放队列
m.unlockSlow(new)
}
}
func (m *Mutex) unlockSlow(new int32) {
// 非法状态,当前锁已经被释放
if (new+mutexLocked)&mutexLocked == 0 {
throw("sync: unlock of unlocked mutex")
}
if new&mutexStarving == 0 {
// 正常模式下
old := new
for {
// 当前没有等待者 或者 锁处于锁定/唤醒/饥饿三种状态之一,直接返回
// 进入当前,正常状态下是锁已经被释放了
// 所以上面三个状态说明锁有改变
if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
return
}
// 减去一个等待数量,并唤醒锁
new = (old - 1<<mutexWaiterShift) | mutexWoken
if atomic.CompareAndSwapInt32(&m.state, old, new) {
// 信号量释放,这样队列头部第一个会被唤醒
runtime_Semrelease(&m.sema, false, 1)
return
}
old = m.state
}
} else {
// 饥饿模式直接将主动权给waiter第一个
runtime_Semrelease(&m.sema, true, 1)
}
}
不足
由于还没去了解goroutine的调度逻辑,所以很多不是很好说明,例如信号量的具体逻辑处理,后续文章再做这方面补充。