Golang源码笔记: channel源码阅读

channel

channel 创建

package main

func main(){
    ch := make(chan int)
    go func() {
        ch <- 1
    }()
    <- ch
}

执行go tool compile -S main.go 查看对应的asm代码,在main中可以看到runtime.makechan的调用,并且传入参数type.chan int类型以及0值,在go源码src/runtime/chan.go文件中可以看到此方法。

        0x0024 00036 (main.go:4)        PCDATA  $0, $1
        0x0024 00036 (main.go:4)        PCDATA  $1, $0
        0x0024 00036 (main.go:4)        LEAQ    type.chan int(SB), AX
        0x002b 00043 (main.go:4)        PCDATA  $0, $0
        0x002b 00043 (main.go:4)        MOVQ    AX, (SP)
        0x002f 00047 (main.go:4)        MOVQ    $0, 8(SP)
        0x0038 00056 (main.go:4)        CALL    runtime.makechan(SB)
        0x003d 00061 (main.go:4)        PCDATA  $0, $1
        0x003d 00061 (main.go:4)        MOVQ    16(SP), AX
        0x0042 00066 (main.go:4)        PCDATA  $0, $0
        0x0042 00066 (main.go:4)        PCDATA  $1, $1
        0x0042 00066 (main.go:4)        MOVQ    AX, "".ch+24(SP)
        0x0047 00071 (main.go:5)        MOVL    $8, (SP)
        0x004e 00078 (main.go:5)        PCDATA  $0, $2

makechan中的调用,主要是初始化一个hchan结构体,返回指针,其中的代码不赘述。先简单了解一下hchan结构体,对于结构体的各个属性的注释已经标注了。


type hchan struct {
    qcount   uint           // channel缓冲区当前数量 相当于len(chan)
    dataqsiz uint           // channel缓冲区总容量 相当于cap(chan)
    buf      unsafe.Pointer // 指针指向缓冲区
    elemsize uint16
    closed   uint32
    elemtype *_type // 内部元素类型 例如int,string
    sendx    uint   // 当前发送的刻度,说刻度感觉比较好,因为buf是环形缓冲区
    recvx    uint   // 当前recv的刻度
    recvq    waitq  // 等待接收的goroutine队列
    sendq    waitq  // 等待发送的goroutine队列

    // lock protects all fields in hchan, as well as several
    // fields in sudogs blocked on this channel.
    //
    // Do not change another G's status while holding this lock
    // (in particular, do not ready a G), as this can deadlock
    // with stack shrinking.
    lock mutex
}

需要注意的是,如下面代码所示,在有缓冲的channel情况下,并且元素类型占有内存,例如struct{},如果元素类型是指针类型,
将有两次alloc操作,一次为new hchan结构体,一次为buf mallocgc操作。示例代码走的是mem == 0分支。


    var c *hchan
    switch {
    case mem == 0:
        // Queue or element size is zero.
        c = (*hchan)(mallocgc(hchanSize, nil, true))
        // Race detector uses this location for synchronization.
        c.buf = c.raceaddr()
    case elem.ptrdata == 0:
        // Elements do not contain pointers.
        // Allocate hchan and buf in one call.
        c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
        c.buf = add(unsafe.Pointer(c), hchanSize)
    default:
        // Elements contain pointers.
        c = new(hchan)
        c.buf = mallocgc(mem, elem, true)
    }

chansend 发送channel

后面的执行调用了runtime.newproc新起了一个goroutine来调用发送数据到channel中的函数,下面是函数的asm代码:

        0x0024 00036 (main.go:6)        PCDATA  $0, $1
        0x0024 00036 (main.go:6)        PCDATA  $1, $1
        0x0024 00036 (main.go:6)        MOVQ    "".ch+32(SP), AX
        0x0029 00041 (main.go:6)        PCDATA  $0, $0
        0x0029 00041 (main.go:6)        MOVQ    AX, (SP)
        0x002d 00045 (main.go:6)        PCDATA  $0, $1
        0x002d 00045 (main.go:6)        LEAQ    ""..stmp_0(SB), AX
        0x0034 00052 (main.go:6)        PCDATA  $0, $0
        0x0034 00052 (main.go:6)        MOVQ    AX, 8(SP)
        0x0039 00057 (main.go:6)        CALL    runtime.chansend1(SB)
        0x003e 00062 (main.go:7)        MOVQ    16(SP), BP
        0x0043 00067 (main.go:7)        ADDQ    $24, SP
        0x0047 00071 (main.go:7)        RET

可以看到是调用了runtime.chansend1函数,同样在chan.go中找到该函数。

// entry point for c <- x from compiled code
//go:nosplit
func chansend1(c *hchan, elem unsafe.Pointer) {
    chansend(c, elem, true, getcallerpc())
}

上面看到直接调用chansend函数,下面为chansend部分源码。

    lock(&c.lock)

    if c.closed != 0 {
        unlock(&c.lock)
        panic(plainError("send on closed channel"))
    }

    if sg := c.recvq.dequeue(); sg != nil {
        // Found a waiting receiver. We pass the value we want to send
        // directly to the receiver, bypassing the channel buffer (if any).
        send(c, sg, ep, func() { unlock(&c.lock) }, 3)
        return true
    }

    if c.qcount < c.dataqsiz {
        // Space is available in the channel buffer. Enqueue the element to send.
        qp := chanbuf(c, c.sendx)
        if raceenabled {
            raceacquire(qp)
            racerelease(qp)
        }
        typedmemmove(c.elemtype, qp, ep)
        c.sendx++
        if c.sendx == c.dataqsiz {
            c.sendx = 0
        }
        c.qcount++
        unlock(&c.lock)
        return true
    }

    if !block {
        unlock(&c.lock)
        return false
    }

    // Block on the channel. Some receiver will complete our operation for us.
    gp := getg()
    mysg := acquireSudog()
    mysg.releasetime = 0
    if t0 != 0 {
        mysg.releasetime = -1
    }
    // No stack splits between assigning elem and enqueuing mysg
    // on gp.waiting where copystack can find it.
    mysg.elem = ep
    mysg.waitlink = nil
    mysg.g = gp
    mysg.isSelect = false
    mysg.c = c
    gp.waiting = mysg
    gp.param = nil
    c.sendq.enqueue(mysg)
    gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)

可以看到主要有几种分支:

  1. 当recvq中有等待的goroutine时候,直接拿出一个sudog发送过去
  2. 当有缓冲空间时,直接插入缓冲队列
  3. 当以上条件都不满足,则将当前goroutine纳入sendq等待队列

以上就是send的一些总结,下面再看chanrecv的源码。

chanrecv 接收channel


if sg := c.sendq.dequeue(); sg != nil {
        // Found a waiting sender. If buffer is size 0, receive value
        // directly from sender. Otherwise, receive from head of queue
        // and add sender's value to the tail of the queue (both map to
        // the same buffer slot because the queue is full).
        recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
        return true, true
    }

    if c.qcount > 0 {
        // Receive directly from queue
        qp := chanbuf(c, c.recvx)
        if raceenabled {
            raceacquire(qp)
            racerelease(qp)
        }
        if ep != nil {
            typedmemmove(c.elemtype, ep, qp)
        }
        typedmemclr(c.elemtype, qp)
        c.recvx++
        if c.recvx == c.dataqsiz {
            c.recvx = 0
        }
        c.qcount--
        unlock(&c.lock)
        return true, true
    }

    if !block {
        unlock(&c.lock)
        return false, false
    }

    // no sender available: block on this channel.
    gp := getg()
    mysg := acquireSudog()
    mysg.releasetime = 0
    if t0 != 0 {
        mysg.releasetime = -1
    }
    // No stack splits between assigning elem and enqueuing mysg
    // on gp.waiting where copystack can find it.
    mysg.elem = ep
    mysg.waitlink = nil
    gp.waiting = mysg
    mysg.g = gp
    mysg.isSelect = false
    mysg.c = c
    gp.param = nil
    c.recvq.enqueue(mysg)
    goparkunlock(&c.lock, waitReasonChanReceive, traceEvGoBlockRecv, 3)

可以看到chanrecv与chansend大体一致,主要也是那几种方式

  1. senq队列有等待的sudog直接拿到这个sender的内容,将该sender的元素推入队列尾部,去除队列头部元素返回给receiver
  2. 当缓冲区里面有数据的时候,直接取出缓冲区头部的数据返回
  3. 否则将当前goroutine加入recvq等待队列

chanclose 关闭channel

最后看看chanclose的源码部分


    ...

    c.closed = 1

    var glist gList

    // release all readers
    for {
        sg := c.recvq.dequeue()
        if sg == nil {
            break
        }
        if sg.elem != nil {
            typedmemclr(c.elemtype, sg.elem)
            sg.elem = nil
        }
        if sg.releasetime != 0 {
            sg.releasetime = cputicks()
        }
        gp := sg.g
        gp.param = nil
        if raceenabled {
            raceacquireg(gp, c.raceaddr())
        }
        glist.push(gp)
    }

    // release all writers (they will panic)
    for {
        sg := c.sendq.dequeue()
        if sg == nil {
            break
        }
        sg.elem = nil
        if sg.releasetime != 0 {
            sg.releasetime = cputicks()
        }
        gp := sg.g
        gp.param = nil
        if raceenabled {
            raceacquireg(gp, c.raceaddr())
        }
        glist.push(gp)
    }
    unlock(&c.lock)

    // Ready all Gs now that we've dropped the channel lock.
    for !glist.empty() {
        gp := glist.pop()
        gp.schedlink = 0
        goready(gp, 3)
    }

在上面的注释其实解释的很好了,将recvq和senq的等待队列全部清空塞入glist队列,并且依次激活这些协程。具体可以看看chansend和chanrecv的goparkunlock下面的部分,那便是goready之后调用的流程。

发表回复

您的电子邮箱地址不会被公开。 必填项已用 * 标注

11 − 9 =