select多路监听
很多时候想要同时操作多个channel,比如从ch1、ch2读数据。Go提供了一个select语句块,它像switch一样工作,里面放一些case语句块,用来轮询每个case语句块的send或recv情况。
select
用法格式示例:
select {
// ch1有数据时,读取到v1变量中
case v1 := <-ch1:
...
// ch2有数据时,读取到v2变量中
case v2 := <-ch2:
...
// 所有case都不满足条件时,执行default
default:
...
}
defalut语句是可选的,不允许fall through行为,但允许case语句块为空块。select会被return、break关键字中断:return是退出整个函数,break是退出当前select。
select的行为模式主要是对channel是否可读进行轮询,但也可以用来向channel发送数据。它的行为如下:
- 如果所有的case语句块评估时都被阻塞,则阻塞直到某个语句块可以被处理
- 如果多个case同时满足条件,则随机选择一个进行处理,对于这一次的选择,其它的case都不会被阻塞,而是处理完被选中的case后进入下一轮select(如果select在循环中)或者结束select(如果select不在循环中或循环次数结束)
- 如果存在default且其它case都不满足条件,则执行default。所以default必须要可执行而不能阻塞
如果有所疑惑,后文的"select超时时间"有更有助于理解select的说明和示例。
所有的case块都是按源代码书写顺序进行评估的。当select未在循环中时,它将只对所有case评估一次,这次结束后就结束select。某次评估过程中如果有满足条件的case,则所有其它case都直接结束评估,并退出此次select。
其实如果注意到select语句是在某一个goroutine中评估的,就不难理解只有所有case都不满足条件时,select所在goroutine才会被阻塞,只要有一个case满足条件,本次select就不会出现阻塞的情况。
需要注意的是,如果在select中执行send操作,则可能会永远被send阻塞。所以,在使用send的时候,应该也使用defalut语句块,保证send不会被阻塞。如果没有default,或者能确保select不阻塞的语句块,则迟早会被send阻塞。在后文有一个select中send永久阻塞的分析:双层channel的一个示例。
一般来说,select会放在一个无限循环语句中,一直轮询channel的可读事件。
下面是一个示例,pump1()和pump2()都用于产生数据(一个产生偶数,一个产生奇数),并将数据分别放进ch1和ch2两个通道,suck()则从ch1和ch2中读取数据。然后在无限循环中使用select轮询这两个通道是否可读,最后main goroutine在1秒后强制中断所有goroutine。
package main
import (
"fmt"
"time"
)
func main() {
ch1 := make(chan int)
ch2 := make(chan int)
go pump1(ch1)
go pump2(ch2)
go suck(ch1, ch2)
time.Sleep(1e9)
}
func pump1(ch chan int) {
for i := 0; i <= 30; i++ {
if i%2 == 0 {
ch <- i
}
}
}
func pump2(ch chan int) {
for i := 0; i <= 30; i++ {
if i%2 == 1 {
ch <- i
}
}
}
func suck(ch1 chan int, ch2 chan int) {
for {
select {
case v := <-ch1:
fmt.Printf("Recv on ch1: %d\n", v)
case v := <-ch2:
fmt.Printf("Recv on ch2: %d\n", v)
}
}
}
nil channel和channel类型的channel
当未为channel分配内存时,channel就是nil channel,例如var ch1 chan int。nil channel会永远阻塞对该channel的读、写操作。
nil channel会阻塞对该channel的所有读、写。所以,可以将某个channel设置为nil,进行强制阻塞,对于select分支来说,就是强制禁用此分支。
nil channel
package main
import (
"fmt"
"math/rand"
"time"
)
// 不断向channel c中发送[0,10)的随机数
func send(c chan int) {
for {
c <- rand.Intn(10)
}
}
func add(c chan int) {
sum := 0
// 1秒后,将向t.C通道发送时间点,使其可读
t := time.NewTimer(1 * time.Second)
for {
// 一秒内,将一直选择第一个case
// 一秒后,t.C可读,将选择第二个case
// c变成nil channel后,两个case分支都将一直阻塞
select {
case input := <-c:
// 不断读取c中的随机数据进行加总
sum = sum + input
case <-t.C:
c = nil
fmt.Println(sum)
}
}
}
func main() {
c := make(chan int)
go add(c)
go send(c)
// 给3秒时间让前两个goroutine有足够时间运行
time.Sleep(3 * time.Second)
}
send()向通道c不断发送10以内的随机整数,add()则在一秒内不断读取通道c中的数据并进行加总。一秒时间到后,t.C通道就会有数据,第二个case分支就会被选中,第二个case会让第一个case评估的channel变为nil channel,使得第一个case从此永久禁用,因为第二个case没有多余的数据可读,它也被永久禁用。总共3秒之后,main goroutine结束,程序结束。
如果不理解NewTimer(d),换成After(d)是一样的,After(d)和NewTime(d).C是等价的。
func add(c chan int) {
sum := 0
t := time.After(1 * time.Second)
for {
select {
case val := <-c:
sum = sum + val
case <-t:
c = nil
fmt.Println(sum)
}
}
}
为select设置超时时间
After()
谁也无法保证某些情况下的select是否会永久阻塞。很多时候都需要设置一下select的超时时间,可以借助time包的After()实现。
time.After()的定义如下:
func After(d Duration) <-chan Time
After()函数接受一个时长d,然后它After()等待d时长,等待时间到后,将等待完成时所处时间点写入到channel中并返回这个只读channel。
所以,将该函数赋值给一个变量时,这个变量是一个只读channel,而channel是一个指针类型的数据,所以它是一个指针。
看下面的示例:
package main
import (
"fmt"
"time"
)
func main() {
fmt.Println(time.Now())
a := time.After(1*time.Second)
fmt.Println(<-a)
fmt.Println(a)
}
如果将After()放进select语句块的一个case中,那么就可以让其它的case有一定的时间长度来监听读、写事件,如果在这段时长内其它case还没有有可读、可写事件,这个After()所在case就会结束当前的select,然后终止select(如果select未在循环中)或进入下一轮select(如果select在循环中)。
以下是一个示例:
func main() {
ch1 := make(chan string)
// 激活一个goroutine,但5秒之后才发送数据
go func() {
time.Sleep(5 * time.Second)
ch1 <- "put value into ch1"
}()
select {
case val := <-ch1:
fmt.Println("recv value from ch1:",val)
return
// 只等待3秒,然后就结束
case <-time.After(3 * time.Second):
fmt.Println("3 second over, timeover")
}
}
运行后,将在大约3秒之后输出:
3 second over, timeover
上面出现了超时现象,因为新激活的goroutine首先要等待5秒,然后才将数据发送到channel ch1中。但是main goroutine继续运行到select语句块,由于第一个case未满足条件(注意,main goroutine并不会因此而阻塞)。评估第二个case时,将执行time.After()等待3秒,3秒之后读取到该函数返回的通道数据,于是该case满足select的条件,该select因为没有在循环中,所以直接结束,main goroutine也因此而终止。自始至终,新激活的goroutine都没有机会将数据发送到ch1中。
上面有两个注意点:
- (1).3秒等待时,只有在等待完成时case才被选中,在等待过程中,select一直在评估所有的case右边的表达式。
- (2).在上面的3秒等待过程中,第一个case的评估一直在持续着,因为在等待结束之前,select还未选中任何case,而是一直在评估所有的表达式,包括
<-ch1
的评估。
上面使用After(),也保证了select一定会选中某一个case,这时可以省略default块。
注意,After()放在select的内部和放在select的外部是完全不一样的,更助于理解的示例见下面的Tick()。
time.Tick()
After(d)是只等待一次d的时长,并在这次等待结束后将当前时间发送到通道。Tick(d)则是间隔地多次等待,每次等待d时长,并在每次间隔结束的时候将当前时间发送到通道。
因为Tick()也是在等待结束的时候发送数据到通道,所以它的返回值是一个channel,从这个channel中可读取每次等待完时的时间点。
下面是一个Tick()和After()结合的示例:
package main
import (
"fmt"
"time"
)
func main() {
select {
case <-time.Tick(2 * time.Second):
fmt.Println("2 second over:", time.Now().Second())
case <-time.After(7 * time.Second):
fmt.Println("5 second over, timeover", time.Now().Second())
return
}
}
上面的示例,在等待2秒之后,就会因为读取到了time.Tick()的通道数据而终止,因为select并未在循环内。
package main
import (
"fmt"
"time"
)
func main() {
for {
select {
case <-time.Tick(2 * time.Second):
fmt.Println("2 second over:", time.Now().Second())
case <-time.After(7 * time.Second):
fmt.Println("5 second over, timeover", time.Now().Second())
return
}
}
}
如果select在循环内,第二个case将永远选择不到。因为每次select轮询中,第一个case都因为2秒而先被选中,使得第二个case的评估总是被中断。进入下一个select轮询后,又会重新开始评估两个case,分别等待2秒和7秒。
上面不正常执行的原因是因为每次select都会重新评估这些表达式。如果把这些表达式放在select外面,则正常:
package main
import (
"fmt"
"time"
)
func main() {
tick := time.Tick(1 * time.Second)
after := time.After(7 * time.Second)
fmt.Println("start second:",time.Now().Second())
for {
select {
case <-tick:
fmt.Println("1 second over:", time.Now().Second())
case <-after:
fmt.Println("7 second over:", time.Now().Second())
return
}
}
}
将time.Tick()和time.After()放在for...select的外面,使得select每次只评估通道是否可读、可写事件,而不会重新执行time.Tick()和time.After(),使得它们重新进入计时状态。
注意上面的输出结果中,有两行:
1 second over: 45
7 second over: 45
说明在第45秒的时候,两个case都评估为真了,但是这一次选择了第一个case,然后进入下一个select过程,因为select的随机选择性,它会保证所有满足条件的case尽量均衡分布,这次将选择第二个case,它仍然为第45秒,这时因为一次for和select调用所花的时间不可能会超过1秒而进入第46秒。