goroutine的启动过程源码分析

每次新起一个gouroutine都会调用runtime.newproc函数,在src/runtime/proc.go中,代码如下:

func newproc(siz int32, fn *funcval) {
    argp := add(unsafe.Pointer(&fn), sys.PtrSize) //计算参数位置
    gp := getg()
    pc := getcallerpc()
    systemstack(func() {
        // 当前肯定是g0,调用
        newg := newproc1(fn, argp, siz, gp, pc)

        _p_ := getg().m.p.ptr()
        runqput(_p_, newg, true)

        if mainStarted {
            wakep()
        }
    })
}

上述可见是在系统栈内执行,也就是切换到g0的执行状态,首先调用newproc1函数,这个函数主要的作用是拿到一个goroutine,也就是g,然后获取当前p,调用runqput将goroutine设置为下一个调用的goroutine,如果主函数main已经启动,则尝试唤醒线程去调度,这里,笔者认为是开始的准备阶段,默认会根据系统cpu核数来初始化p的数量,在启动之后,此时对应处理的系统线程并没有创建,而是按需创建。

func wakep() {
    // 查看是否有闲置的p
    if atomic.Load(&sched.npidle) == 0 {
        return
    }
    // be conservative about spinning threads
    if atomic.Load(&sched.nmspinning) != 0 || !atomic.Cas(&sched.nmspinning, 0, 1) {
        return
    }
    startm(nil, true)
}

func startm(_p_ *p, spinning bool) {
    mp := acquirem()
    lock(&sched.lock)
    if _p_ == nil {
        //如果没有指定p则去全局查看有没有闲置的p
        _p_ = pidleget()
        ...
    }
    nmp := mget()
    if nmp == nil {
        id := mReserveID()
        unlock(&sched.lock)

        var fn func()
        if spinning {
            // The caller incremented nmspinning, so set m.spinning in the new M.
            fn = mspinning
        }
        // 没有闲置的m则新建一个m,创建系统线程
        newm(fn, _p_, id)
        // Ownership transfer of _p_ committed by start in newm.
        // Preemption is now safe.
        releasem(mp)
        return
    }
    ...
    nmp.spinning = spinning
    nmp.nextp.set(_p_)
    notewakeup(&nmp.park)
    releasem(mp)
}

可见如果没有闲置的p的话,或者有自旋的m的话,都直接返回,等待调度,而不满足上述条件的话,则调用startm,这个startm主要的作用是获取一个m绑定p,来执行调度p,此时在当前m,调用notewakeup来唤醒闲置的m,如果没有m则新建m结构,创建系统线程,创建系统线程的地方有点深,要追几次源代码,流程大概是newm -> newm1 -> newosproc,此时要注意newosproc创建系统线程的时候,将运行函数设置成了mstart_stub,这个函数定义在汇编程序内,做一些寄存器操作,再调用mstart函数,而mstart又是实现写在汇编,也是调用mstart0mstart0只是做了些栈指针初始化,然后再调用mstart1,最终执行对应函数,如果函数为nil,就执行schedule进行调度,一直找闲置的goroutine进行执行。

摘取newproc1函数的片段,

    _p_ := _g_.m.p.ptr()
    newg := gfget(_p_)
    if newg == nil {
        newg = malg(_StackMin)
        casgstatus(newg, _Gidle, _Gdead)
        allgadd(newg) // publishes with a g->status of Gdead so GC scanner doesn't look at uninitialized stack.
    }

获取当前的p,gfget就是在当前p中的gFree中获取可用的闲置g,如果当前p的gFree为空的话,则去schedgFree偷取一部分(最多32个)放在当前p中。如果还是没有,新建一个g,并添加到allg全局的goroutine列表中去。

    totalSize := 4*sys.PtrSize + uintptr(siz) + sys.MinFrameSize // extra space in case of reads slightly beyond frame
    totalSize += -totalSize & (sys.StackAlign - 1)               // align to StackAlign
    sp := newg.stack.hi - totalSize

计算sp地址,再进行栈对齐。

    memclrNoHeapPointers(unsafe.Pointer(&newg.sched), unsafe.Sizeof(newg.sched))
    newg.sched.sp = sp
    newg.stktopsp = sp
    newg.sched.pc = funcPC(goexit) + sys.PCQuantum // +PCQuantum so that previous instruction is in same function
    newg.sched.g = guintptr(unsafe.Pointer(newg))
    gostartcallfn(&newg.sched, fn) // 这个注意,替换pc
    newg.gopc = callerpc
    newg.ancestors = saveAncestors(callergp)
    newg.startpc = fn.fn
    if _g_.m.curg != nil {
        newg.labels = _g_.m.curg.labels
    }
    if isSystemGoroutine(newg, false) {
        atomic.Xadd(&sched.ngsys, +1)
    }
    casgstatus(newg, _Gdead, _Grunnable)

首先清除g的sched的空间,因为sched是gobuf结构体,此结构体存储各个寄存器地址。上述代码中pc指向goexit函数,此函数主要作用是回收g,再进行下一轮调度操作。然而gostartcallfn这个函数将sp下移一个指针大小,指向旧的goexit的pc指针,再将pc指向startpc,也就是goroutine所调用函数的pc指针,最后将goroutine状态改为可执行状态。至此,newproc1结束,这个函数主要的作用就是返回一个g结构,对goroutine做好数据准备,最后执行就要看具体的调度了,等到m调度到就可以执行此函数。

发表回复

您的电子邮箱地址不会被公开。 必填项已用 * 标注

8 − 3 =