每次新起一个gouroutine都会调用runtime.newproc
函数,在src/runtime/proc.go
中,代码如下:
func newproc(siz int32, fn *funcval) {
argp := add(unsafe.Pointer(&fn), sys.PtrSize) //计算参数位置
gp := getg()
pc := getcallerpc()
systemstack(func() {
// 当前肯定是g0,调用
newg := newproc1(fn, argp, siz, gp, pc)
_p_ := getg().m.p.ptr()
runqput(_p_, newg, true)
if mainStarted {
wakep()
}
})
}
上述可见是在系统栈内执行,也就是切换到g0的执行状态,首先调用newproc1
函数,这个函数主要的作用是拿到一个goroutine,也就是g,然后获取当前p,调用runqput将goroutine设置为下一个调用的goroutine,如果主函数main
已经启动,则尝试唤醒线程去调度,这里,笔者认为是开始的准备阶段,默认会根据系统cpu核数来初始化p的数量,在启动之后,此时对应处理的系统线程并没有创建,而是按需创建。
func wakep() {
// 查看是否有闲置的p
if atomic.Load(&sched.npidle) == 0 {
return
}
// be conservative about spinning threads
if atomic.Load(&sched.nmspinning) != 0 || !atomic.Cas(&sched.nmspinning, 0, 1) {
return
}
startm(nil, true)
}
func startm(_p_ *p, spinning bool) {
mp := acquirem()
lock(&sched.lock)
if _p_ == nil {
//如果没有指定p则去全局查看有没有闲置的p
_p_ = pidleget()
...
}
nmp := mget()
if nmp == nil {
id := mReserveID()
unlock(&sched.lock)
var fn func()
if spinning {
// The caller incremented nmspinning, so set m.spinning in the new M.
fn = mspinning
}
// 没有闲置的m则新建一个m,创建系统线程
newm(fn, _p_, id)
// Ownership transfer of _p_ committed by start in newm.
// Preemption is now safe.
releasem(mp)
return
}
...
nmp.spinning = spinning
nmp.nextp.set(_p_)
notewakeup(&nmp.park)
releasem(mp)
}
可见如果没有闲置的p的话,或者有自旋的m的话,都直接返回,等待调度,而不满足上述条件的话,则调用startm,这个startm主要的作用是获取一个m绑定p,来执行调度p,此时在当前m,调用notewakeup来唤醒闲置的m,如果没有m则新建m结构,创建系统线程,创建系统线程的地方有点深,要追几次源代码,流程大概是newm -> newm1 -> newosproc
,此时要注意newosproc创建系统线程的时候,将运行函数设置成了mstart_stub
,这个函数定义在汇编程序内,做一些寄存器操作,再调用mstart
函数,而mstart
又是实现写在汇编,也是调用mstart0
,mstart0
只是做了些栈指针初始化,然后再调用mstart1
,最终执行对应函数,如果函数为nil,就执行schedule
进行调度,一直找闲置的goroutine进行执行。
摘取newproc1函数的片段,
_p_ := _g_.m.p.ptr()
newg := gfget(_p_)
if newg == nil {
newg = malg(_StackMin)
casgstatus(newg, _Gidle, _Gdead)
allgadd(newg) // publishes with a g->status of Gdead so GC scanner doesn't look at uninitialized stack.
}
获取当前的p,gfget
就是在当前p中的gFree
中获取可用的闲置g,如果当前p的gFree
为空的话,则去sched
中gFree
偷取一部分(最多32个)放在当前p中。如果还是没有,新建一个g,并添加到allg全局的goroutine列表中去。
totalSize := 4*sys.PtrSize + uintptr(siz) + sys.MinFrameSize // extra space in case of reads slightly beyond frame
totalSize += -totalSize & (sys.StackAlign - 1) // align to StackAlign
sp := newg.stack.hi - totalSize
计算sp地址,再进行栈对齐。
memclrNoHeapPointers(unsafe.Pointer(&newg.sched), unsafe.Sizeof(newg.sched))
newg.sched.sp = sp
newg.stktopsp = sp
newg.sched.pc = funcPC(goexit) + sys.PCQuantum // +PCQuantum so that previous instruction is in same function
newg.sched.g = guintptr(unsafe.Pointer(newg))
gostartcallfn(&newg.sched, fn) // 这个注意,替换pc
newg.gopc = callerpc
newg.ancestors = saveAncestors(callergp)
newg.startpc = fn.fn
if _g_.m.curg != nil {
newg.labels = _g_.m.curg.labels
}
if isSystemGoroutine(newg, false) {
atomic.Xadd(&sched.ngsys, +1)
}
casgstatus(newg, _Gdead, _Grunnable)
首先清除g的sched的空间,因为sched是gobuf结构体,此结构体存储各个寄存器地址。上述代码中pc指向goexit
函数,此函数主要作用是回收g,再进行下一轮调度操作。然而gostartcallfn
这个函数将sp下移一个指针大小,指向旧的goexit
的pc指针,再将pc指向startpc
,也就是goroutine所调用函数的pc指针,最后将goroutine状态改为可执行状态。至此,newproc1结束,这个函数主要的作用就是返回一个g结构,对goroutine做好数据准备,最后执行就要看具体的调度了,等到m调度到就可以执行此函数。