我们在日常开发中,如果涉及到对多个goroutine的并发调度处理,则不免会使用channel配合select来完成一些类似超时机制、临界条件的处理。而当我们在使用select控制结构时,可能会产生如下疑问:

  1. select在遇到多个case同时满足要求时是顺序执行还是随机选择一个case执行,其内在实现原理又是什么;
  2. select是如何做到在channel上进行无阻塞的收发操作的。

对第一个问题的简单理解是:为了保证goroutine并发执行的无序性,select会随机选取一种情况执行。

接下来我们就详细解读select的使用及实现原理,并给出以上问题的详细解释。至于channel和goroutine的理解,则放在其它文章中单独讨论。

1.select来源

select 是操作系统中的系统调用,我们经常会使用 select、poll 和 epoll 等函数构建 I/O 多路复用模型(这里涉及到网络io模型知识,也放到之后的文章单独讨论)提升程序的性能。

Go 语言的 select 与操作系统中的 select 比较相似,C 语言的 select 系统调用可以同时监听多个文件描述符的可读或者可写的状态。

Go 语言中的 select 也能够让 Goroutine 同时等待多个 Channel 可读或者可写,在多个文件或者 Channel状态改变之前,select 会一直阻塞当前线程或者 Goroutine:

go select的使用和实现原理(图1)

2.定义及简单使用

select语句是专为channel而设计的用于多个channel监听并收发消息的流程控制结构,当任何一个case满足条件则会执行,若没有可执行的case,就会执行default,如果没有default,程序就会阻塞。

同时select是与 switch 相似的控制结构,但是与 switch 不同的是,select 中虽然也有多个 case,但是这些 case 中的表达式必须都是 channel 的收发操作。下面的代码展示了一个包含 从channel 接收数据和向channel发送数据操作的 select 结构:

package main

import (
	"fmt"
	"time"
)

func main() {
	// 可以思考下这里为啥定义了缓存大小为1的channel
	c := make(chan int, 1)
	quit := make(chan int)
	go fibonacci(c, quit)
	time.Sleep(5 * time.Second)
	quit <- 0
	time.Sleep(1 * time.Second)
}

func fibonacci(c, quit chan int) {
	x, y := 0, 1
	for {
		select {
		case c <- x:
			x, y = y, <-c+y
			fmt.Println(x, y)
			time.Sleep(1 * time.Second)
		case <-quit:
			fmt.Println("num:", x)
			fmt.Println("quit")
			return
		}
	}
}

上述控制结构会等待 c <- x 或者 <-quit 两个表达式中任意一个返回。无论哪一个表达式返回都会立刻执行 case 中的代码,当 select 中的两个 case 同时被触发时,会随机执行其中的一个。

3.特性详解

  1. 每个case都必须是一个通信
  2. 所有channel表达式都会被求值
  3. 所有被发送的表达式都会被求值
  4. 如果任意某个通信可以进行,它就执行,其它被忽略
  5. 如果有多个case都可以运行,select会随机公平的选取一个执行,其它的不会执行。否则就执行default语句(如果有)
  6. 如果没有default语句,select经阻塞,直到某个通信可以运行;Go不会重新对channel或值进行求值

下面通过几个例子来理解这些特性:

  1. select closed/nil channel(case中涉及的channel已被关闭或者是零值)
for {
		select {
		case v1, ok := <-c1:
			// 如果c1被关闭(ok==false),每次从c1读取都会立即返回,将导致死循环
			// 可以通过将c1置为nil来让select ignore掉这个case,继续评估其它case
			if !ok {
				c1 = nil
			}

		case v2 := <-c2:
		// 同样,如果c2被关闭,每次从c1读取都会立即返回对应元素类型的零值(如空字符串),导致死循环
		// 解决方案仍然是置c2为nil,但是有可能误判(写入方是写入了一个零值而不是关闭channel,比如整数0)

		case c3 <- v3:
			// 如果c3已经关闭,则panic
			// 如果c3为nil,则ignore该case,这里也解释了上一节例子中定义带缓存的channel原因
		}
	}
  1. 实现非阻塞读写

结合特性5,6,可以通过带 default 语句的 select 实现非阻塞读写,在实践中还是比较有用的,比如 服务器线程尝试给玩家推送某条消息,可能并不希望 线程阻塞在该玩家的 writeChan 上。

select {
    case writeChan <- msg:
        // do something write successed
    default:
        // drop msg, or log err
}

需要注意,一些同学可能将select与switch搞混,习惯先把default写好,然后加上外层的for循环导致死循环。使用select语句,for和default基本不会同时出现。

  1. 实现定时任务

结合特性2,每次 select 都会对所有通信表达式求值,因此可通过 time.After简洁实现定时器功能,并且定时任务可通过 done channel 停止:

for {
	select {
	case <- time.After(time.Second):
	    // do something per second
	case <- donec:
		return	
	}
}

现在我们稍微变更一下:

donec := make(chan bool, 1)
close(donec)
for {
	select {
	case <- time.After(time.Second):
		fmt.Println("timer")
	case <- donec:
	}
}

现在这段代码会输出什么?由例子1我们知道,从一个已经关闭的channel读取数据不会阻塞也不会painc,而会读出对应元素类型的零值。因此什么也不会输出。

  • 每次执行了 case <- donec1 后,select 再次对 case1 的 timer.After 求值,返回一个新的下一秒超时的 Time
  • 再次执行到 case <- donec ….

因此,case <- timer.After(time.Second) 不应该解释为每一秒执行一次,而是其它 case 如果有一秒都没有执行,那么就执行这个 case。

  1. 多个case满足读写条件

结合特性4,如果多个case满足读写条件,select会随机选择一个语句执行:

package main

import (
	"fmt"
	"time"
)

func main() {
	ch := make(chan int, 1024)
	go func(ch chan int) {
		for {
			val := <-ch
			fmt.Printf("val:%d\n", val)
		}
	}(ch)

	tick := time.NewTicker(1 * time.Second)
	for i := 0; i < 5; i++ {
		select {
		case ch <- i:
		case <-tick.C:
			fmt.Printf("%d: case <-tick.C\n", i)
		}

		time.Sleep(500 * time.Millisecond)
	}
	close(ch)
	tick.Stop()
}

我的执行结果如下:

go select的使用和实现原理(图2)

可以看到向ch写入的3”不见”了,因为当tick.C和ch同时满足读写条件时,select随机选择了一个执行,导致看起来一些数据丢了。

其实这个例子是比较极端的,因为向ch写入的数据本身就与外部for循环计数耦合了,导致依赖于select的随机结果(本次没随机到,放到下次,但此时写入的数据已经变更了),因此实际不是数据丢了,而是代码设计时没有考虑到每次select只会执行一条读写语句(并且是随机选取的),导致结果不如预期。

4.数据结构

select 在 Go 语言的源代码中不存在对应的结构体,但是我们使用 runtime.scase 结构体表示 select 控制结构中的 case:

// Select case descriptor.
// Known to compiler.
// Changes here must also be made in src/cmd/internal/gc/select.go's scasetype.
type scase struct {
	c           *hchan         // chan
	elem        unsafe.Pointer // data element
	kind        uint16
	pc          uintptr // race pc (for race detector / msan)
	releasetime int64
}

由于非 default 的 case 中都与 Channel 的发送和接收数据有关,所以在 scase 结构体中也包含一个 c 字段用于存储 case 中使用的 Channel,elem 是用于接收或者发送数据的变量地址、kind 表示当前 case 的种类,总共包含以下四种:

// scase.kind values.
// Known to compiler.
// Changes here must also be made in src/cmd/compile/internal/gc/select.go's walkselectcases.
const (
	caseNil = iota
	caseRecv
	caseSend
	caseDefault
)

这四种常量分别表示不同类型的 case,顾名思义,我们知道它们分别代表零值channel、接收channel、发送channel及default。

5.实现原理

select 语句在编译期间会被转换成 OSELECT 节点。每个 OSELECT 节点都会持有一组 OCASE 节点,如果 OCASE 的执行条件是空,那就意味着这是一个 default 节点。

go select的使用和实现原理(图3)

上图展示的就是 select 语句在编译期间的结构,每一个 OCASE 既包含执行条件也包含满足条件后执行的代码。

编译器在中间代码生成期间会根据 select 中 case 的不同对控制语句进行优化,这一过程都发生在 cmd/compile/internal/gc.walkselectcases 函数中,我们在这里会分四种情况介绍处理的过程和结果:

  • select 不存在任何的 case
  • select 只存在一个 case
  • select 存在两个 case,其中一个 case 是 default
  • select 存在多个 case

上述四种情况不仅会涉及编译器的重写和优化,还会涉及 Go 语言的运行时机制,我们会从编译期间和运行时两个角度分析上述情况。

  1. 直接阻塞

首先介绍的是最简单的情况,也就是当 select 结构中不包含任何 case。我们截取 cmd/compile/internal/gc.walkselectcases 函数的前几行代码:

func walkselectcases(cases *Nodes) []*Node {
	n := cases.Len()
	sellineno := lineno

	// optimization: zero-case select
	if n == 0 {
		return []*Node{mkcall("block", nil, nil)}
	}
	...
}

这段代码很简单并且容易理解,它直接将类似 select {} 的语句转换成调用 runtime.block 函数:

func block() {
	gopark(nil, nil, waitReasonSelectNoCases, traceEvGoStop, 1)
}

runtime.block 的实现非常简单,它会调用 runtime.gopark 让出当前 Goroutine 对处理器的使用权并传入等待原因 waitReasonSelectNoCases。

简单总结一下,空的 select 语句会直接阻塞当前 Goroutine,导致 Goroutine 进入无法被唤醒的永久休眠状态。

  1. 单一管道

如果当前的 select 条件只包含一个 case,那么编译器会将 select 改写成 if 条件语句。下面对比了改写前后的代码:

// 改写前
select {
case v, ok <-ch: // case ch <- v
    ...    
}

// 改写后
if ch == nil {
    block()
}
v, ok := <-ch // case ch <- v
...

cmd/compile/internal/gc.walkselectcases 在处理单操作 select 语句时,会根据 Channel 的收发情况生成不同的语句。当 case 中的 channel 是空指针时,会直接挂起当前 Goroutine 并陷入永久休眠。

  1. 非阻塞操作

当 select 中仅包含两个 case,并且其中一个是 default 时,Go 语言的编译器就会认为这是一次非阻塞的收发操作。cmd/compile/internal/gc.walkselectcases 会对这种情况单独处理。不过在正式优化之前,该函数会将 case 中的所有 channel 都转换成指向 channel 的地址,我们会分别介绍非阻塞发送和非阻塞接收时,编译器进行的不同优化。

通过本节我们也可以基本理解问题2:select是如何做到在channel上进行无阻塞的收发操作的了。

  • 3.1.非阻塞发送

首先是 channel 的发送过程,当 case 中表达式的类型是 caseSend 时,编译器会使用条件语句和 runtime.selectnbsend 函数改写代码:

select {
case ch <- i:
    ...
default:
    ...
}

if selectnbsend(ch, i) {
    ...
} else {
    ...
}

这段代码中最重要的就是 runtime.selectnbsend,它为我们提供了向 channel 非阻塞地发送数据的能力。我们在go channel解析 这篇文章(文章地址:go channel解析(一))中介绍了向 channel 发送数据的 runtime.chansend 函数包含一个 block 参数,该参数会决定这一次的发送是不是阻塞的:

func selectnbsend(c *hchan, elem unsafe.Pointer) (selected bool) {
	return chansend(c, elem, false, getcallerpc())
}

由于我们向 runtime.chansend 函数传入了非阻塞,所以在不存在接收方或者缓冲区空间不足时,当前 Goroutine 都不会阻塞而是会直接返回。

  • 3.2.非阻塞接收

由于从 Channel 中接收数据可能会返回一个或者两个值,所以接收数据的情况会比发送稍显复杂,不过改写的套路是差不多的:

// 改写前
select {
case v <- ch: // case v, ok <- ch:
    ......
default:
    ......
}

// 改写后
if selectnbrecv(&v, ch) { // if selectnbrecv2(&v, &ok, ch) {
    ...
} else {
    ...
}

返回值数量不同会导致使用函数的不同,两个用于非阻塞接收消息的函数 runtime.selectnbrecv 和 runtime.selectnbrecv2 只是对 runtime.chanrecv 返回值的处理稍有不同:

func selectnbrecv(elem unsafe.Pointer, c *hchan) (selected bool) {
	selected, _ = chanrecv(c, elem, false)
	return
}

func selectnbrecv2(elem unsafe.Pointer, received *bool, c *hchan) (selected bool) {
	selected, *received = chanrecv(c, elem, false)
	return
}

因为接收方不需要,所以 runtime.selectnbrecv 会直接忽略返回的布尔值,而 runtime.selectnbrecv2 会将布尔值回传给调用方。与 runtime.chansend 一样,runtime.chanrecv 也提供了一个 block 参数用于控制这次接收是否阻塞。

  1. 常见流程

在默认的情况下,编译器会使用如下的流程处理 select 语句:

  • 将所有的 case 转换成包含 channel 以及类型等信息的 runtime.scase 结构体;
  • 调用运行时函数 runtime.selectgo 从多个准备就绪的 channel 中选择一个可执行的 runtime.scase 结构体;
  • 通过 for 循环生成一组 if 语句,在语句中判断自己是不是被选中的 case;

一个包含三个 case 的正常 select 语句其实会被展开成如下所示的逻辑,我们可以看到其中处理的三个部分:

selv := [3]scase{}
order := [6]uint16
for i, cas := range cases {
    c := scase{}
    c.kind = ...
    c.elem = ...
    c.c = ...
}
chosen, revcOK := selectgo(selv, order, 3)
if chosen == 0 {
    ...
    break
}
if chosen == 1 {
    ...
    break
}
if chosen == 2 {
    ...
    break
}

展开后的代码片段中最重要的就是用于选择待执行 case 的运行时函数 runtime.selectgo,这也是我们要关注的重点。因为这个函数的实现比较复杂, 所以这里分两部分分析它的执行过程:

  • 执行一些必要的初始化操作并确定 case 的处理顺序;
  • 在循环中根据 case 的类型做出不同的处理;

通过本节我们将能理解问题1:select在遇到多个case同时满足条件时,会随机选取某一个case执行。

4.1. 初始化

runtime.selectgo 函数首先会进行执行必要的初始化操作并决定处理 case 的两个顺序 — 轮询顺序 pollOrder 和加锁顺序 lockOrder:

func selectgo(cas0 *scase, order0 *uint16, ncases int) (int, bool) {
	cas1 := (*[1 << 16]scase)(unsafe.Pointer(cas0))
	order1 := (*[1 << 17]uint16)(unsafe.Pointer(order0))
	
	ncases := nsends + nrecvs
	scases := cas1[:ncases:ncases]
	pollorder := order1[:ncases:ncases]
	lockorder := order1[ncases:][:ncases:ncases]

	norder := 0
	for i := range scases {
		cas := &scases[i]
	}

	for i := 1; i < ncases; i++ {
		j := fastrandn(uint32(i + 1))
		pollorder[norder] = pollorder[j]
		pollorder[j] = uint16(i)
		norder++
	}
	pollorder = pollorder[:norder]
	lockorder = lockorder[:norder]

	// 根据 Channel 的地址排序确定加锁顺序
	...
	sellock(scases, lockorder)
	...
}

轮询顺序 pollOrder 和加锁顺序 lockOrder 分别是通过以下的方式确认的:

轮询顺序:通过 runtime.fastrandn 函数引入随机性;

加锁顺序:按照 channel 的地址排序后确定加锁顺序;

随机的轮询顺序可以避免 channel 的饥饿问题,保证公平性;而根据 channel 的地址顺序确定加锁顺序能够避免死锁的发生。这段代码最后调用的 runtime.sellock 会按照之前生成的加锁顺序锁定 select 语句中包含所有的 channel。

4.2.循环

当我们为 select 语句锁定了所有 channel 之后就会进入 runtime.selectgo 函数的主循环,它会分三个阶段查找或者等待某个 channel 准备就绪:

  1. 查找是否已经存在准备就绪的 channel,即可以执行收发操作
  2. 将当前 Goroutine 加入 channel 对应的收发队列上并等待其他 Goroutine 的唤醒
  3. 当前 Goroutine 被唤醒之后找到满足条件的 channel 并进行处理

runtime.selectgo 函数会根据不同情况通过 goto 语句跳转到函数内部的不同标签执行相应的逻辑,其中包括:

  • bufrecv:可以从缓冲区读取数据;
  • bufsend:可以向缓冲区写入数据;
  • recv:可以从休眠的发送方获取数据;
  • send:可以向休眠的接收方发送数据;
  • rclose:可以从关闭的 Channel 读取 EOF;
  • sclose:向关闭的 Channel 发送数据;
  • retc:结束调用并返回;

我们先来分析循环执行的第一个阶段,查找已经准备就绪的 channel。循环会遍历所有的 case 并找到需要被唤起的 runtime.sudog 结构,在这个阶段,我们会根据 case 的四种类型分别处理:

  1. 当 case 不包含 channel 时; 这种 case 会被跳过;
  2. 当 case 会从 channel 中接收数据时;

2.1如果当前 channel 的 sendq 上有等待的 Goroutine,就会跳到 recv 标签并从缓冲区读取数据后将等待 Goroutine 中的数据放入到缓冲区中相同的位置;

2.2.如果当前 channel 的缓冲区不为空,就会跳到 bufrecv 标签处从缓冲区获取数据;

2.3.如果当前 channel 已经被关闭,就会跳到 rclose 做一些清除的收尾工作;

  1. 当 case 会向 channel 发送数据时;

3.1.如果当前 channel 已经被关,闭就会直接跳到 sclose 标签,触发 panic 尝试中止程序;

3.2.如果当前 channel 的 recvq 上有等待的 Goroutine,就会跳到 send 标签向 Channel 发送数据;

3.3.如果当前 channel 的缓冲区存在空闲位置,就会将待发送的数据存入缓冲区;

  1. 当 select 语句中包含 default 时;

表示前面的所有 case 都没有被执行,这里会解锁所有 channel 并返回,意味着当前 select 结构中的收发都是非阻塞的;

go select的使用和实现原理(图4)

第一阶段的主要职责是查找所有 case 中是否有可以立刻被处理的 Channel。无论是在等待的 Goroutine 上还是缓冲区中,只要存在数据满足条件就会立刻处理,

如果不能立刻找到活跃的 channel 就会进入循环的下一阶段,按照需要将当前 Goroutine 加入到 channel 的 sendq 或者 recvq 队列中:

func selectgo(cas0 *scase, order0 *uint16, ncases int) (int, bool) {
	...
	gp = getg()
	nextp = &gp.waiting
	for _, casei := range lockorder {
		casi = int(casei)
		cas = &scases[casi]
		c = cas.c
		sg := acquireSudog()
		sg.g = gp
		sg.c = c

		if casi < nsends {
			c.sendq.enqueue(sg)
		} else {
			c.recvq.enqueue(sg)
		}
	}

	gopark(selparkcommit, nil, waitReasonSelect, traceEvGoBlockSelect, 1)
	...
}

除了将当前 Goroutine 对应的 runtime.sudog 结构体加入队列之外,这些结构体都会被串成链表附着在 Goroutine 上。在入队之后会调用 runtime.gopark 挂起当前 Goroutine 等待调度器的唤醒。

go select的使用和实现原理(图5)

等到 select 中的一些 channel 准备就绪之后,当前 Goroutine 就会被调度器唤醒。这时会继续执行 runtime.selectgo 函数的第三部分,从 runtime.sudog 中读取数据:

func selectgo(cas0 *scase, order0 *uint16, ncases int) (int, bool) {
	...
	sg = (*sudog)(gp.param)
	gp.param = nil

	casi = -1
	cas = nil
	sglist = gp.waiting
	for _, casei := range lockorder {
		k = &scases[casei]
		if sg == sglist {
			casi = int(casei)
			cas = k
		} else {
			c = k.c
			if int(casei) < nsends {
				c.sendq.dequeueSudoG(sglist)
			} else {
				c.recvq.dequeueSudoG(sglist)
			}
		}
		sgnext = sglist.waitlink
		sglist.waitlink = nil
		releaseSudog(sglist)
		sglist = sgnext
	}

	c = cas.c
	goto retc
	...
}

第三次遍历全部 case 时,我们会先获取当前 Goroutine 接收到的参数 sudog 结构,我们会依次对比所有 case 对应的 sudog 结构找到被唤醒的 case,获取该 case 对应的索引并返回。

由于当前的 select 结构找到了一个 case 执行,那么剩下 case 中没有被用到的 sudog 就会被忽略并且释放掉。为了不影响 Channel 的正常使用,我们还是需要将这些废弃的 sudog 从 Channel 中出队。

当我们在循环中发现缓冲区中有元素或者缓冲区未满时就会通过 goto 关键字跳转到 bufrecv 和 bufsend 两个代码段,这两段代码的执行过程都很简单,它们只是向 Channel 中发送数据或者从缓冲区中获取新数据:

bufrecv:
	recvOK = true
	qp = chanbuf(c, c.recvx)
	if cas.elem != nil {
		typedmemmove(c.elemtype, cas.elem, qp)
	}
	typedmemclr(c.elemtype, qp)
	c.recvx++
	if c.recvx == c.dataqsiz {
		c.recvx = 0
	}
	c.qcount--
	selunlock(scases, lockorder)
	goto retc

bufsend:
	typedmemmove(c.elemtype, chanbuf(c, c.sendx), cas.elem)
	c.sendx++
	if c.sendx == c.dataqsiz {
		c.sendx = 0
	}
	c.qcount++
	selunlock(scases, lockorder)
	goto retc

这里在缓冲区进行的操作和直接调用 runtime.chansend 和 runtime.chanrecv 差不多,上述两个过程在执行结束之后都会直接跳到 retc 字段。

两个直接收发 channel 的情况会调用运行时函数 runtime.send 和 runtime.recv,这两个函数会与处于休眠状态的 Goroutine 打交道:

recv:
	recv(c, sg, cas.elem, func() { selunlock(scases, lockorder) }, 2)
	recvOK = true
	goto retc

send:
	send(c, sg, cas.elem, func() { selunlock(scases, lockorder) }, 2)
	goto retc

不过如果向关闭的 channel 发送数据或者从关闭的 channel 中接收数据,情况就稍微有一点复杂了:

  • 从一个关闭 channel 中接收数据会直接清除 channel 中的相关内容;
  • 向一个关闭的 channel 发送数据就会直接 panic 造成程序崩溃:
rclose:
	selunlock(scases, lockorder)
	recvOK = false
	if cas.elem != nil {
		typedmemclr(c.elemtype, cas.elem)
	}
	goto retc

sclose:
	selunlock(scases, lockorder)
	panic(plainError("send on closed channel"))

总体来看,select 语句中的 Channel 收发操作和直接操作 Channel 没有太多出入,只是由于 select 多出了 default 关键字所以会支持非阻塞的收发。

6.总结

我们简单总结一下 select 结构的执行过程与实现原理,首先在编译期间,Go 语言会对 select 语句进行优化,它会根据 select 中 case 的不同选择不同的优化路径:

  • 空的 select 语句会被转换成调用 runtime.block 直接挂起当前 Goroutine;
  • 如果 select 语句中只包含一个 case,编译器会将其转换成 if ch == nil { block }; n; 表达式;
  1. 首先判断操作的 Channel 是不是空的;
  2. 然后执行 case 结构中的内容;
  • 如果 select 语句中只包含两个 case 并且其中一个是 default,那么会使用 runtime.selectnbrecv 和 runtime.selectnbsend 非阻塞地执行收发操作;
  • 在默认情况下会通过 runtime.selectgo 获取执行 case 的索引,并通过多个 if 语句执行对应 case 中的代码;

在编译器已经对 select 语句进行优化之后,Go 语言会在运行时执行编译期间展开的 runtime.selectgo 函数,该函数会按照以下的流程执行:

  1. 随机生成一个遍历的轮询顺序 pollOrder 并根据 channel 地址生成锁定顺序 lockOrder;
  2. 根据 pollOrder 遍历所有的 case 查看是否有可以立刻处理的 channel;
  • 如果存在,直接获取 case 对应的索引并返回;
  • 如果不存在,创建 runtime.sudog 结构体,将当前 Goroutine 加入到所有相关 channel 的收发队列,并调用 runtime.gopark 挂起当前 Goroutine 等待调度器的唤醒;
  1. 当调度器唤醒当前 Goroutine 时,会再次按照 lockOrder 遍历所有的 case,从中查找需要被处理的 runtime.sudog 对应的索引;

综上,我们详细介绍了go select的使用及其基本原理,尝试理解其具体实现过程也有助于我们熟练掌握其用法并从中学到有用的编程思想。

参考文章:

https://wudaijun.com/2017/10/go-select/

https://draveness.me/golang/docs/part2-foundation/ch05-keyword/golang-select/

{{o.name}}
{{m.name}}