golang channel 源码剖析

JoanOswald 发布于7天前 阅读114次
0 条评论

channel在 golang 中是一个非常重要的特性,它为我们提供了一个 并发模型 。对比锁,通过 chan 在多个 goroutine 之间完成数据交互,可以让代码更简洁、更容易实现、更不容易出错。golang 的 channel 设计模型遵循 CSP(Communicating Sequential Processes,序列通信处理) 的设计理念。

本文将从源码角度来分析 golang 的 channel 是怎样实现的。先看一下 **channel*8 给我们提供的一些特性。

1. channel 的使用

关于这一小节,熟悉 channel 使用的读者可以快速浏览一下这一部分,这里 没有什么特别的 东西。

1.1使用通道传输数据

func main() {
    c := make(chan int, 8)
    go func() {
        c <- 1
    }()
    fmt.Println(<-c)
}

上面的代码中, make(chan int, 8) 创建并返回一个缓冲区大小为 8 的通道,通道的元素类型为 int 。如果把这个 8 去掉,像这样: c := make(chan int) ,那么创建的通道就是没有缓冲区的通道。如果你熟悉 go,你一定知道我们可以一直向缓冲区发送数据,直到缓冲区变满为止才会阻塞。而如果我们向无缓冲区的通道发送数据,就有存在其它的接收者正在等待,发送才不会不阻塞。

在创建通道之后,接下来使用 go 语句启动一个 goroutine ,这个 goroutine 中,将 1 写入通道 c。最后使用 <-c 读取通道数据并且打印。

这很简单,但是我们需要思考几个问题:

  • 创建通道的时候发生了什么事情?我们创建了一个什么样的数据结构?
  • 向通道发送数据的时候发生了什么事情?缓冲区满了就会阻塞是怎么实现的?
  • 从通道中接收数据时发生了什么事情?
  • 带缓冲区的通道和不带缓冲区的通道有什么不同吗?

1.2select

然后让我们看一个稍微复杂一点的: selectselect 会从所有的 case 中挑选出一个不会阻塞的通道读操作、写操作或者是 default 操作执行。如果都会阻塞,那么 select 就会等待,对应的 goroutine 也会被挂起。

如下面的代码, c1c2 是两个通道, go 启动一个 goroutine ,如果 c1 可读且 c2 不可写,那么就会执行第一个 case , 如果 c1 不可读但 c2 可写,那么就会执行第二个 case 。如果 c1 可读而且 c2 可写,那么就会随机执行第一个 case 或者第二个 case 。如果 c1 不可读而且 c2 不可写,那么就会执行 default 。这里,如果我们没有实现 default 分支,那么 select 就会阻塞。

package main

import (
    "fmt"
    "math/rand"
)

func main() {

    c1 := make(chan int)
    c2 := make(chan int)

    go func() {
        for {
            select {
            case x := <-c1:
                fmt.Println("从 c1 接受数据;", x)
            case c2 <- 100:
                fmt.Println("向 c2 发送数据")
            default:
                fmt.Println("c1 和 c2 都没什么可操作的")
            }
        }
    }()

    for i := 0; i < 500; i++ {
        rd := rand.Intn(2)
        switch rd {
        case 0:
            c1 <- 200
        case 1:
            <-c2
        }
    }
}

只是稍微复杂了一点点,但是还是有很多东西我们需要去探索:

  • select 的工作原理是什么?它是怎么选出一个可执行的语句的?
  • select 为什么可以在多个通道上阻塞?
  • 为什么没有 default 分支时会阻塞,有 default 时会执行 default 的内容?
  • 有多个可执行的语句时,为什么会是随机选的,而不是按照我们代码的顺序?

带着上面的所有问题,我们来看一看 channel 的源码。

2. 预备知识

在深入 channel 源码之前,先了解一下需要有哪些预备知识

2.1goroutine的表示

runtime 库中, goroutine 用一个叫做 g 的结构表示,每个 g 对象表示一个 goroutine

type g struct {
  // ...
  atomicstatus   uint32  // 表示 goroutine 的状态
  param          unsafe.Pointer // 唤醒时参数
  waiting        *sudog // 等待队列,后文会说到
  // ...
}

通过 getg() 函数可以拿到当前 goroutineg 对象:

func getg() *g

2.2 sudog

g 对象中,有一个名字为 waiting 的 * sudog 指针,它表示这个 goroutine 正在等待什么东西或者正在等待哪些东西。

sudog是一个链表形式的类型, waitlink 表示它的下一个节点。对于 cisSelectelem 字段,我们后文会说到。

type sudog struct {
        // ....
        isSelect bool
        elem     unsafe.Pointer // data element (may point to stack)      
        waitlink    *sudog // g.waiting list or semaRoot
        c           *hchan // channel
}

acquireSudog申请一个 sudog 对象。 releaseSudog 释放 sudog 对象

func acquireSudog() *sudog {}
func releaseSudog(s *sudog) {}

2.3 gopark 和 goready

gopark将当前的 goroutine 修改成等待状态,然后等待被唤醒。

func gopark(unlockf func(*g, unsafe.Pointer) bool, 
  lock unsafe.Pointer, 
  reason waitReason, 
  traceEv byte, 
  traceskip int)

goready函数用来唤醒一个 goroutine ,它将 goroutine 的状态修改为 可运行状态 ,随后会被调度器运行。当被调度时,对应的 gopark 函数返回。

2.4 race***

在编译时,使用 -race 参数,可以执行竞态检查,在我们即将要分析的源码中,有相当部分代码为 race 提供了支持。分析时会跳过这一部分,有兴趣的读者可以参考: https://blog.golang.org/race-detector

3. 基本数据结构

chan 使用 hchan 表示,它的传参与赋值始终都是指针形式,每个 hchan 对象代表着一个 chan。

  • hchan 中包含一个缓冲区 buf ,它表示已经发送但是还未被接收的数据缓存。buf 的大小由创建 chan 时的参数来决定。 qcount 表示当前缓冲区中有效数据的总量, dataqsiz 表示缓冲区的大小,对于无缓冲区通道而言 dataqsiz 的值为 0。如果 qcount 和 dataqsiz 的值相同,则表示缓冲区用完了。
  • 缓冲区表示的是一个 环形队列 (如果你不熟悉环形队列,可以看一下 https://www.geeksforgeeks.org/circular-queue-set-1-introduction-array-implementation/ )。其中 sendx 表示下一个发送的地址,recvx 表示下一个接收的地址。
  • recvq 表示等待接收的 sudog 列表,一个接收语句执行时,如果缓冲区没有数据而且当前没有别的发送者在等待,那么执行者 goroutine 会被挂起,并且将对应的 sudog 对象放到 recvq 中。
  • sendq 类似于 recvq,一个发送语句执行时,如果缓冲区已经满了,而且没有接收者在等待,那么执行者 goroutine 会被挂起,并且将对应的 sudog 放到 sendq 中。
  • closed 表示通道是否已经被关闭,0 代表没有被关闭,非 0 值代表已经被关闭。
  • lock 用于对 hchan 加锁
type hchan struct {
    qcount   uint           // total data in the queue
    dataqsiz uint           // size of the circular queue
    buf      unsafe.Pointer // points to an array of dataqsiz elements
    elemsize uint16
    closed   uint32
    elemtype *_type // element type
    sendx    uint   // send index
    recvx    uint   // receive index
    recvq    waitq  // list of recv waiters
    sendq    waitq  // list of send waiters

    // lock protects all fields in hchan, as well as several
    // fields in sudogs blocked on this channel.
    //
    // Do not change another G's status while holding this lock
    // (in particular, do not ready a G), as this can deadlock
    // with stack shrinking.
    lock mutex
}

type waitq struct {
    first *sudog
    last  *sudog
}

4. 创建通道

当你在代码里面写了一句 c := make(chan int, 8) 时,编译器就会把它翻译成

t := typeof(chan int) // 编译器给你生成了 chan int 的类型描述信息,然后 t 指向这个类型描述信息
c := makechan(t, 8)

没错,makechan 就是创建通道的入口。它的目的就是构建 hchan 对象并返回。由于 hchan 在程序中始终以引用的形式存在,通过赋值或者是传参,它指向的都是同一个对象,所以 hchan 在标准库中都是以指针形式呈现给外部的。对于 makechan 的逻辑,这里分 3 种情况:

  1. 缓冲区所需大小为 0。对于这种情况,在为 hchan 分配内存时,只需要分配 sizeof(hchan) 大小的内存。这很好理解。
  2. 缓冲区所需大小不为 0,而且数据类型不包含指针。
    我们先来理解下 不包含指针 这个东西,对于指针类型或者成员中有指针的类型,那就是包含指针的,否则就是不包含指针的。如下代码,A{}是不包含指针的,&A{}、B{}、&B{} 是包含指针的。
type A struct {
    a int
    b int
}

type B struct {
    a *int
    b *int
}

对于不包含指针的这种情况,分配一块连续内存容纳 hchan 和缓冲区对象。

  1. 缓冲区所需大小不为 0,而且数据类型包含指针。对于这种情况,分配两块内存,其中一块表示 hchan 对象,另外一块用来表示 buf。

下面是 makechan 的核心代码:

func makechan(t *chantype, size int) *hchan {
    // ...

    mem, overflow := math.MulUintptr(elem.size, uintptr(size))

    var c *hchan
    switch {
    case mem == 0:
        c = (*hchan)(mallocgc(hchanSize, nil, true))
    case elem.kind&kindNoPointers != 0:
        c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
        c.buf = add(unsafe.Pointer(c), hchanSize)
    default:
        c = new(hchan)
        c.buf = mallocgc(mem, elem, true)
    }

    c.elemsize = uint16(elem.size)
    c.elemtype = elem
    c.dataqsiz = uint(size)

    // ...

    return c
}

至于为什么要区分包含指针和不包含指针这两种情况,makechan 的注释给出了一段解释:

Hchan does not contain pointers interesting for GC when elements stored in buf do not contain pointers.

下面是我的猜想,如果不对,欢迎高人指正:

GC 不会知道 unsafe.Pointer 里面存储的是什么类型,因此如果实际元素类型里面包含指针,就要通过 mallocgc 将分配什么类型的数据告诉 gc,这样 gc 就不会回收这块内存中存储的指针所指向的内存。反之, buf 不包含指针,可以用一块大的内存来存储 hchan 对象和缓冲区,这样可以减轻 gc 压力。

5. 发送数据

向通道发送数据,runtime 中通过 chansend 实现,它的声明如下:

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool

参数 c 表示要向哪个 chan 发送数据, ep 表示要发送的数据的地址,block 表示是否需要阻塞, callerpc 表示调用地址。返回值 bool 表示数据是否成功发送。

block 是为了实现如下代码的语义:

c := make(chan int)
        // ...
    select {
    case <-c:
        // ...
    default:
        // ...
    }

上面这段代码被编译成对 selectnbsend 的调用:

if selectnbsend(c, v) {
    ... foo
} else {
    ... bar
}

selectnbsend 的实现如下

func selectnbsend(c *hchan, elem unsafe.Pointer) (selected bool) {
    return chansend(c, elem, false, getcallerpc()) // 非阻塞的发送
}

它与拥有多个 case 的 select 不同(多个 case 的 select 将在后文分析)。

chansend 按照下面的逻辑执行:

  1. 如果通道是空的,对于非阻塞的发送,直接返回 false。对于阻塞的通道,将 goroutine 挂起,并且永远不会返回
if c == nil {
        if !block {
            return false
        }
        gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
        throw("unreachable")
    }
  1. 非阻塞的情况下,如果通道没有关闭,而且当前没有接收者,缓冲区也已经满了或者没有缓冲区(即不可以发送数据)。那么直接返回 false
if !block && c.closed == 0 && ((c.dataqsiz == 0 && c.recvq.first == nil) ||
        (c.dataqsiz > 0 && c.qcount == c.dataqsiz)) {
        return false
    }

注意:前两步中都没有加锁。第 1 步中,没有访问 hchan 的任何成员,所以无需加锁。第 2 步中,可能被写的变量只有 closed 、qcount 和 c.recvq.first ,这些变量都是单字长的,所以对它们的单个值的读操作是原子性的。

然而,我们应该要更仔细的分析,对单个值的读操作是原子性的,但是对多个值的读操作就不一定是原子性的了。因为在判断完 closed 之后,通道可能在这一瞬间从未关闭状态转变成关闭状态(closed 不会从非 0 变成 0,但有可能从 0 变成非 0,所以在判断 closed==0 之后,通道可能还会转变成关闭状态),也就是说这里的 if 测试通过的那一瞬间,可能有两种情况:

  • 通道没有关闭,而且已经满了。那么这段逻辑运行 ok,应该返回 false。
  • 通道已经关闭,而且已经满了。按照发送数据的语义来说,此时应该 panic。但实际上这段逻辑的实现,它会返回 false。

但我们还要注意到的是,第 2 种情况的发生,肯定意味着第 1 种情况发生过。而且它取决与通道的 close 是何时被调用的,至少在 if 之前 close 还没有完成调用。所以我们认为第 2 种情况的逻辑也是正确的。

(嗯,确实有点难理解,也很难描述)

  1. 调用 lock 对通道加锁

  2. 如果此时通道被关闭,那么发生 panic

// 第 3 步,加锁
lock(&c.lock)  

// 第 4 步,如果通道已经被关闭了,那么 panic
if c.closed != 0 {
    unlock(&c.lock)
    panic(plainError("send on closed channel"))
}
  1. 从 recvq 中取出一个接收者,如果接收者存在,直接向该接收者发送数据。
if sg := c.recvq.dequeue(); sg != nil {
    send(c, sg, ep, func() { unlock(&c.lock) }, 3)
    return true
}

send 函数将 ep 作为参数传送给接收方的 sg 对象,然后使用 goready 将其唤醒。sg.elem 如果非空,则将 ep 的内容直接 copy 到 elem 指向的地址。

func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
    // ...
    if sg.elem != nil {
        sendDirect(c.elemtype, sg, ep)
        sg.elem = nil
    }
    gp := sg.g
    unlockf()
    gp.param = unsafe.Pointer(sg)
    goready(gp, skip+1)
}

func sendDirect(t *_type, sg *sudog, src unsafe.Pointer) {
    dst := sg.elem
    memmove(dst, src, t.size)
}

注意:如果有接收者在队列中等待,则说明此时的缓冲区是空的。

  1. 如果缓冲区还有多余的空间,那么将数据写入缓冲区。写入缓冲区后,将发送位置往后移动一个单位,然后将 qcount 加 1
if c.qcount < c.dataqsiz {
    qp := chanbuf(c, c.sendx)
    typedmemmove(c.elemtype, qp, ep)
    c.sendx++
    if c.sendx == c.dataqsiz {
        c.sendx = 0
    }
    c.qcount++
    unlock(&c.lock)
    return true
}

其中 chanbuf 函数从 buf 中取出第 i 个元素的存放地址:

func chanbuf(c *hchan, i uint) unsafe.Pointer {
    return add(c.buf, uintptr(i)*uintptr(c.elemsize))
}

typedmemmove 函数将类型为 c.elemtype 的 ep 的内容 拷贝到 qp 中。

  1. 如果执行前面的所有步骤还没有成功发送,那么就表示缓冲区没有空间了,而且也没有任何接收者在等待。所以后面必须要将 goroutine 挂起然后等待新的接收者了。但对于非阻塞的调用,不能等待,返回 false 表示数据发送不成功。
if !block {
        unlock(&c.lock)
        return false
    }
  1. 创建 sudog 对象,然后入队并且让 goroutine 进入等待状态。直到被唤醒时 goparkunlock 才会返回。
gp := getg()

mysg := acquireSudog()
mysg.elem = ep
mysg.waitlink = nil
mysg.g = gp
mysg.isSelect = false
mysg.c = c

gp.waiting = mysg
gp.param = nil

c.sendq.enqueue(mysg)
goparkunlock(&c.lock, waitReasonChanSend, traceEvGoBlockSend, 3)
  1. goparkunlock 返回后,代表已经发送完数据了,此时做一些清理工作,如将 sudog 对象释放,将 g 的 waiting 置空等。

6. 接收数据

接收数据的操作和发送数据的操作大同小异,它的实现函数为 chanrecv

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool)
  • chanrecv 从 c 中接收数据,并且将接收到的数据存到 ep 中,block 表示是否需要阻塞。
  • 如果没有数据可以接收,而且是非阻塞的情况,则返回 (false,flase)。如果 c 已经关闭了,将 ep 指向的值置为 0值 ,并且返回 (true, false)。其它情况返回值为 (true,true),表示成功从 c 中获取到了数据。

同样地,block 是为了实现以下语义:

select {
case v = <-c:
    ... foo
default:
    ... bar
}

它被编译成:

if selectnbrecv(&v, c) {
    ... foo
} else {
    ... bar
}

其中 selectnbrecv 的实现为:

func selectnbrecv(elem unsafe.Pointer, c *hchan) (selected bool) {
    selected, _ = chanrecv(c, elem, false)  // 非阻塞接收
    return
}

接下来,我们分析以下 recv 的逻辑:

  1. 如果 c 为空且是非阻塞模式,那么直接返回 (false,false)。否则永远等待
if c == nil {
    if !block {
        return
    }
    gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
    throw("unreachable")
}
  1. 对于非阻塞的情况,如果当前没有数据可以接收了,那么返回 (false,false)。
if !block && (c.dataqsiz == 0 && c.sendq.first == nil ||
    c.dataqsiz > 0 && atomic.Loaduint(&c.qcount) == 0) &&
    atomic.Load(&c.closed) == 0 {
    return
}

和非阻塞发送有两个不同的地方:

  • 对 closed 的判断放到了后面。
  • 使用了 atomic。

我们先来看一下下面这段代码:

c := make(chan int, 1)
c <- 1

go func() {
    select {
    case <-c:
        println("recv from c")
    default:
        println("c is not ready - BUG!")
    }
}()

close(c)
<-c

从 go 的语义上来说,不论何时,default 都不应该被执行:如果 select 发生在 close 之前,那么从 c 中取出来的数据应该是 1。 如果 select 发生在 close 之后但是在 <-c 之前,那么也应该从 c 中取出 1。如果 select 发生在 <-c 之后,从 c 中取出的数据是 0 ,而且接收数据是失败的,但是不会执行 default。

那么,如果把对 closed 的判断放到通道是否有数据可接收的判断之前,像这样:

if !block && atomic.Load(&c.closed) == 0 && (c.dataqsiz == 0 && c.sendq.first == nil ||
    c.dataqsiz > 0 && atomic.Loaduint(&c.qcount) == 0)  {
    return
}

这意味着 if 测试通过后的一瞬间存在两种情况:

  • 通道未关闭,但是不存在数据可接收,也没有发送者在等待。对于这种情况,应该返回 (false,false)。执行 default 段的代码。
  • 通道已关闭,且不存在数据可接收,也没有发送者在等待。对于这种情况,根据 go 语义,应该返回 (true, false),并且执行 case 段的代码。但是我们的这个实现显然是错误的,它返回了 (false,false)。就上面的接收例子而言, close(c) 和 <-c 正好发生在 atomic.Load(&c.closed) == 0 执行完成之后,但还没有执行后面的判断,那 if 再执行后面的判断,显然也是通过的。所以问题就出来了。

再来看一下正确的实现,它也会在 if 测试通过后的一瞬间存在两种情况:

  • 不存在数据可接收,而且通道没有关闭。此时返回 (false,false)
  • 存在数据可接收,而且通道没有关闭。此时应该返回 (true,true)。但是,这种情况意味着上一种情况曾今存在过, 而且至少在 if 执行前的那一瞬间还存在。所以我们认为它返回 (false,false) 是合理的。

另外 atomic 在这里是为了保证内存顺序的正确性。

  1. 加锁,然后判断如果通道已经关闭而且没有剩余的数据可以读取了,那么就返回 (true,false)。
lock(&c.lock)

if c.closed != 0 && c.qcount == 0 {
    unlock(&c.lock)
    if ep != nil {
        typedmemclr(c.elemtype, ep)
    }
    return true, false
}

typedmemclr 的作用是将 ep 指向的类型为 elemtype 的内存块置为 0 值。

  1. 如果有发送者在队列等待,那么直接从发送者那里提取数据,并且唤醒这个发送者。当然对于带缓冲区的 chan,它会先将缓冲区的数据提取出来,然后将等待中的发送者的数据拷贝到缓冲区中。
if sg := c.sendq.dequeue(); sg != nil {
    recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
    return true, true
}

func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
    if c.dataqsiz == 0 {
        if ep != nil {
            recvDirect(c.elemtype, sg, ep)
        }
    } else {
        qp := chanbuf(c, c.recvx)
        if ep != nil {
            typedmemmove(c.elemtype, ep, qp)
        }
        typedmemmove(c.elemtype, qp, sg.elem)
        c.recvx++
        if c.recvx == c.dataqsiz {
            c.recvx = 0
        }
        c.sendx = c.recvx 
    }
    sg.elem = nil
    gp := sg.g
    unlockf()
    gp.param = unsafe.Pointer(sg)
    goready(gp, skip+1)
}

recv 函数判断 chan 是否带有缓冲区,如果不带缓冲区,直接从发送者那里复制数据到 ep。如果带缓冲区,那么你应该能够理解,由于有发送者在等待, 所以缓冲区一定是满的 。它将缓冲区的第一个数据复制到 ep,然后将发送者的数据复制到缓冲区。这是为了尽量满足先来后到的需求(当然,由于并发的存在,这样做实际上不能完全确定)。

接下来,通过 goready 将发送者唤醒。

  1. 如果缓冲区中有数据,那么从缓冲区复制数据到 ep,并且修改下次接收位置和 qcount
if c.qcount > 0 {
    qp := chanbuf(c, c.recvx)
    if ep != nil {
        typedmemmove(c.elemtype, ep, qp)
    }
    typedmemclr(c.elemtype, qp)
    c.recvx++
    if c.recvx == c.dataqsiz {
        c.recvx = 0
    }
    c.qcount--
    unlock(&c.lock)
    return true, true
}
  1. 在执行完成上面的流程后,仍然没有返回,说明缓冲区内已经没有数据了,而且也没有发送者在等待中。所以如果是非阻塞接收,那么直接返回 (false,false)。
if !block {
    unlock(&c.lock)
    return false, false
}
  1. 对于阻塞接收的情况,将调用者 goroutine 挂起,并且等待被唤醒。
gp := getg()
mysg := acquireSudog()
mysg.elem = ep
mysg.waitlink = nil
gp.waiting = mysg
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.param = nil
c.recvq.enqueue(mysg)
goparkunlock(&c.lock, waitReasonChanReceive, traceEvGoBlockRecv, 3)
  1. goparkunlock 返回后,说明已经接收到数据了,或者是通道已经被关闭了。此时和发送一样,做一些清理工作。然后根据是否为关闭导致的返回对应的 bool 值。

7. 关闭通道

closechan 函数实现了通道的关闭,它的声明如下:

func closechan(c *hchan)

closechan 按照如下的流程执行:

  1. 加锁,然后判断如果通道早已关闭了,就 panic。(你不能对一个被关闭的通道再执行关闭操作)
lock(&c.lock)
if c.closed != 0 {
    unlock(&c.lock)
    panic(plainError("close of closed channel"))
}
  1. 将关闭标志置为 1.
c.closed = 1
  1. 唤醒所有的接收者,并且将接收数据置为 0 值。唤醒所有发送者,令其 panic。 gList 就是一个 g 对象的列表。
var glist gList

for {
    sg := c.recvq.dequeue()
    if sg == nil {
        break
    }
    if sg.elem != nil {
        typedmemclr(c.elemtype, sg.elem)
        sg.elem = nil
    }
    if sg.releasetime != 0 {
        sg.releasetime = cputicks()
    }
    gp := sg.g
    gp.param = nil
    glist.push(gp)
}

for {
    sg := c.sendq.dequeue()
    if sg == nil {
        break
    }
    sg.elem = nil
    if sg.releasetime != 0 {
        sg.releasetime = cputicks()
    }
    gp := sg.g
    gp.param = nil
    glist.push(gp)
}
unlock(&c.lock)

for !glist.empty() {
    gp := glist.pop()
    gp.schedlink = 0
    goready(gp, 3)
}

8. select

select 函数是本文的最后一部分,也是最复杂的一部分。它的实现函数是 selectgo

8.1 selectgo 的声明

runtime 通过遍历+等待的方式实现 select 语义,遍历时判断如果 有可执行的 case 或者 select 中带有 default,那么就执行之。如果没有,就通过 gopark 将调用者转换为等待状态,使用 sudog 链表表示它在多个通道上等待。其中任意一个通道对应的 sudog 都可以唤醒调用者。

函数 selectgo 实现了 select 语义。它的第一个返回值表示需要执行哪个 case, 第 2 个返回值表示如果要执行的 case 是 caseRecv,那么接收数据是否成功(对于已经关闭的通道来说,这个返回值会是 false,这个我们在 chanrecv 函数中已经看到了)。

func selectgo(cas0 *scase, order0 *uint16, ncases int) (int, bool)
  • 参数 cas0 指向 scase 数组的第一个元素, 每个 scase 表示一个 case 分支, scase 的定义如下:
type scase struct {
    c           *hchan         // chan
    elem        unsafe.Pointer // data element
    kind        uint16
      // ...
}

const (
    caseNil = iota
    caseRecv
    caseSend
    caseDefault
)

c 表示这个 case 对应的通道 ,elem 表示接收数据的地址或者要发送的数据的地址。kind 取值为 caseNil 表示一个 0 值,在真实的 select 中没有任何东西和它对应,它用于表示 无效的 的意思。caseRecv 和 caseSend 分别表示接收和发送的 case。caseDefault 对应 default 分支。

  • order0 参数指向的是一个 2 倍 case 数量大小的数组,它用来为 selectgo 提供额外的空间用来使用堆排序和随机顺序执行。你可能在想,这个空间它自己也能分配,为什么要让外部提供?其实这样做是有它的目的的,首先在 selectgo 中,它不知道调用者的 case 究竟有多少个,那么它无法分配栈内存,它只能分配堆内存,而我们的代码中 for + select 的用法是很常见的,这样小而且频繁的堆内存分配势必给 gc 带来非常大的压力。其次,在 select 的调用处,编译器能够知道你有多少个 case,所以它可以给你分配固定大小的栈内存。(对于这一段,如果你觉得难以理解,可以先跳过,不影响你理解后文)。
  • ncases 表示的是 case 的数量,包括 default。

8.2 避免死锁

在继续探索这个函数之前,可能还需要了解一个东西。那就是对多个锁的占有和释放。

在 selectgo 中,毫无疑问要同时访问多个通道,每个通道都应该加锁才能访问。那么要获得多个锁的所有权,为了不造成死锁,需要按照固定的顺序加锁和解锁(我想你应该知道死锁是什么,而且这种按顺序的加锁和解锁方式可以避免死锁)。

runtime 中的 sellock 和 selunlock 用于对 scase 数组加锁和解锁。注意解锁的时候顺序和加锁的顺序是相反的。

另外由于一个 select 语句中可能存在多个 case 对同一个通道的操作,而对于同一个通道来说,只能加锁一次,也只能解锁一次。所以加锁迭代中需要判断是否和上次加锁的通道一样,解锁迭代中需要判断下个要解锁的通道是否和当前通道一样。 lockorder 是要保证同一个通道存在多次,那么它们需要是相邻的。

func sellock(scases []scase, lockorder []uint16) {
    var c *hchan
    for _, o := range lockorder {
        c0 := scases[o].c
        if c0 != nil && c0 != c {
            c = c0
            lock(&c.lock)
        }
    }
}

func selunlock(scases []scase, lockorder []uint16) {
    for i := len(scases) - 1; i >= 0; i-- {
        c := scases[lockorder[i]].c
        if c == nil {
            break
        }
        if i > 0 && c == scases[lockorder[i-1]].c {
            continue // will unlock it on the next iteration
        }
        unlock(&c.lock)
    }
}

接下来我们深入探索 selectgo 这个函数的实现,根据代码结构,本节将按照分段的方式对这个函数进行讲解。

8.3 pollorder 和 lockorder

pollorder 表示轮询顺序,为了实现 select 中的随机语义,轮询应该是随机的。 pollorder 对应参数 order0 指针的前半部分。pollorder 包含 0~ncases-1 中的所有数字,下面是随机生成 pollorder 的代码

for i := 1; i < ncases; i++ {
    j := fastrandn(uint32(i + 1))
    pollorder[i] = pollorder[j]
    pollorder[j] = uint16(i)
}

这个很有意思,它假设第一个元素初始为 0,而且没有对后面的元素做任何假设。每次迭代中,从前面的所有元素中随机挑选一个,然后将当前索引和它置换。从而生成 0~ncases-1 的值。

它只要求第一个元素初始值为 0 ,这样编译器可以为我们对 select 的调用生成更加高效的代码。

lockorder 表示加锁顺序,用以传给 sellock 和 selunlock 加锁和解锁。它最后存储的值为按照地址排序的通道的。利用 pollorder 构建一个最大堆:

for i := 0; i < ncases; i++ {
    j := i
    c := scases[pollorder[i]].c
    for j > 0 && scases[lockorder[(j-1)/2]].c.sortkey() < c.sortkey() {
        k := (j - 1) / 2
        lockorder[j] = lockorder[k]
        j = k
    }
    lockorder[j] = pollorder[i]
}

注意和常规的最小堆构建稍有不同,因为它将其它内存的内容构建成最小堆放到了当前内存中,并且使用插入法建堆。这种方式的时间复杂度是 O(nlogn)。相比常规的建堆时间复杂度是 O(n)。看似慢了,但实际上在数据量比较小的时候,插入法建堆更快,而且如果在这里使用的是常规建堆方法,需要先执行一次内存拷贝操作。

接下来就是使用大根堆的排序了:

for i := ncases - 1; i >= 0; i-- {
        o := lockorder[i]
        c := scases[o].c
        lockorder[i] = lockorder[0]
        j := 0
        for {
            k := j*2 + 1
            if k >= i {
                break
            }
            if k+1 < i && scases[lockorder[k]].c.sortkey() < scases[lockorder[k+1]].c.sortkey() {
                k++
            }
            if c.sortkey() < scases[lockorder[k]].c.sortkey() {
                lockorder[j] = lockorder[k]
                j = k
                continue
            }
            break
        }
        lockorder[j] = o
    }

每次外层迭代,都将最大的元素移到后面,然后重新调整位置满足堆的属性。

8.3 loop 段

在 loop 段开始之前,selectgo 先使用了 sellock 对所有的通道加锁,注意 lockorder 在这里的作用。

sellock(scases, lockorder)

loop 段是 selectgo 函数的核心部分,它的目的是先遍历一次所有的 case 和 default 语句,看一下是否有可执行的分支,如果有,那么就转移到对应的段去处理。否则就阻塞并且等待被唤醒。

我们先看循环部分:

loop:
var dfli int
var dfl *scase
var casi int
var cas *scase
var recvOK bool
for i := 0; i < ncases; i++ {
    casi = int(pollorder[i])
    cas = &scases[casi]
    c = cas.c

    switch cas.kind {
    case caseNil:
        continue

    case caseRecv:
        sg = c.sendq.dequeue()
        if sg != nil {
            goto recv
        }
        if c.qcount > 0 {
            goto bufrecv
        }
        if c.closed != 0 {
            goto rclose
        }

    case caseSend:
        if c.closed != 0 {
            goto sclose
        }
        sg = c.recvq.dequeue()
        if sg != nil {
            goto send
        }
        if c.qcount < c.dataqsiz {
            goto bufsend
        }

    case caseDefault:
        dfli = casi
        dfl = cas
    }
}

它遍历了所有的 case+default,然后按照 case 的类别做如下处理:

  • 无效的 case,不处理
  • 接收 case,根据不同的情况分别跳转到 recv, bufrecv, rclose 段。注意这里的顺序,rclose 是放在最后面的。
  • 发送 case,根据不同的情况分别跳转到 sclose,send, bufsend 段。这里是要把 sclose 放在最前面的,因为向一个已经关闭的通道发送数据,就应该 panic
  • 对于 default,selectgo 简单的将这个 case 信息保存下来,留给后面处理。

当循环结束后,如果有 default 语句存在,那么执行 default 的内容。

if dfl != nil {
    selunlock(scases, lockorder)
    casi = dfli
    cas = dfl
    goto retc
}

selectgo 用 casi 表示要执行哪个 case 的内容, cas 表示要执行的分支的 scase 对象。这里它简单的对这两个变量赋值,然后转移到 retc 段。

8.4 loop 之后

当上面的流程都执行完了,还没有 goto 出去,说明没有任何 case 当前可以执行。那么就挂起并等待被唤醒。

gp = getg()
nextp = &gp.waiting
for _, casei := range lockorder {
    casi = int(casei)
    cas = &scases[casi]
    if cas.kind == caseNil {
        continue
    }
    c = cas.c
    sg := acquireSudog()
    sg.g = gp
    sg.isSelect = true
    sg.elem = cas.elem
    sg.c = c
    *nextp = sg
    nextp = &sg.waitlink

    switch cas.kind {
    case caseRecv:
        c.recvq.enqueue(sg)

    case caseSend:
        c.sendq.enqueue(sg)
    }
}
gp.param = nil
gopark(selparkcommit, nil, waitReasonSelect, traceEvGoBlockSelect, 1)

它按照锁顺序一次遍历每个 case,然后将其放到 g.waitlink 这个 sudog 链表中,表明是在等待多个 case 。并且对于每个 case,都往 recvq 或者 sendq 里面插入这个 sudog,用以表示这个等待者。

然后使用 gopark 将当前 goroutine 切换到等待状态。

当 gopark 返回时,说明已经被某个 channel 唤醒了,后面主要是一些清理工作。

8.5 bufrecv 段

bufrecv 段从带 buf 的通道中接收数据。执行到 bufrecv 段了,说明对应的通道缓冲区有数据可以接收了

bufrecv:
recvOK = true
qp = chanbuf(c, c.recvx)
if cas.elem != nil {
    typedmemmove(c.elemtype, cas.elem, qp)
}
typedmemclr(c.elemtype, qp)
c.recvx++
if c.recvx == c.dataqsiz {
    c.recvx = 0
}
c.qcount--
selunlock(scases, lockorder)
goto retc

这一段的实现和之前讨论的 recv 函数类似,但是最后它把所有权交给 retc

8.6 bufsend 段

bufsend 段向缓冲区写入数据,与 send 函数类似,但是最后把所有权让给了 retc

bufsend:
typedmemmove(c.elemtype, chanbuf(c, c.sendx), cas.elem)
c.sendx++
if c.sendx == c.dataqsiz {
    c.sendx = 0
}
c.qcount++
selunlock(scases, lockorder)
goto retc

后面还有 recv段,rclose段,send 段,sclose 段等,这些逻辑基本上都可以在 chansend 与 chanrecv 中找到共通点。

8.7 retc 段

retc:
  return casi, recvOK

它简单的做了一个返回工作 (当然还有其它的部分,但这部分已经超出本文范围)

9. 写在最后

本文只展示了最核心的逻辑部分,完整的源码请参考 $GOROOT/src/runtime/chan.go 和 $GOROOT/src/runtime/select.go

本文如有错误,欢迎大家指出。

查看原文: golang channel 源码剖析

  • purpleswan
  • bluegorilla
  • whiteostrich
  • redmeercat
需要 登录 后回复方可回复, 如果你还没有账号你可以 注册 一个帐号。