全栈编程

Balance $ 2,317
Item Sold 1230
文章作者: 全栈编程@luboke.com
版权声明: 本文章为全栈编程go语言体系课视频教程配套电子书,版权归 全栈编程@luboke.com所有,欢迎免费学习,转载必须注明出处!但禁止任何商业用途,否则将受到法律制裁!

select多路监听

很多时候想要同时操作多个channel,比如从ch1、ch2读数据。Go提供了一个select语句块,它像switch一样工作,里面放一些case语句块,用来轮询每个case语句块的send或recv情况。

select

用法格式示例:

select {
	// ch1有数据时,读取到v1变量中
	case v1 := <-ch1:
		...
	// ch2有数据时,读取到v2变量中
	case v2 := <-ch2:
		...
	// 所有case都不满足条件时,执行default
	default:
		...
}

defalut语句是可选的,不允许fall through行为,但允许case语句块为空块。select会被return、break关键字中断:return是退出整个函数,break是退出当前select。

select的行为模式主要是对channel是否可读进行轮询,但也可以用来向channel发送数据。它的行为如下:

  • 如果所有的case语句块评估时都被阻塞,则阻塞直到某个语句块可以被处理
  • 如果多个case同时满足条件,则随机选择一个进行处理,对于这一次的选择,其它的case都不会被阻塞,而是处理完被选中的case后进入下一轮select(如果select在循环中)或者结束select(如果select不在循环中或循环次数结束)
  • 如果存在default且其它case都不满足条件,则执行default。所以default必须要可执行而不能阻塞

如果有所疑惑,后文的"select超时时间"有更有助于理解select的说明和示例。

所有的case块都是按源代码书写顺序进行评估的。当select未在循环中时,它将只对所有case评估一次,这次结束后就结束select。某次评估过程中如果有满足条件的case,则所有其它case都直接结束评估,并退出此次select

其实如果注意到select语句是在某一个goroutine中评估的,就不难理解只有所有case都不满足条件时,select所在goroutine才会被阻塞,只要有一个case满足条件,本次select就不会出现阻塞的情况。

需要注意的是,如果在select中执行send操作,则可能会永远被send阻塞。所以,在使用send的时候,应该也使用defalut语句块,保证send不会被阻塞。如果没有default,或者能确保select不阻塞的语句块,则迟早会被send阻塞。在后文有一个select中send永久阻塞的分析:双层channel的一个示例。

一般来说,select会放在一个无限循环语句中,一直轮询channel的可读事件。

下面是一个示例,pump1()和pump2()都用于产生数据(一个产生偶数,一个产生奇数),并将数据分别放进ch1和ch2两个通道,suck()则从ch1和ch2中读取数据。然后在无限循环中使用select轮询这两个通道是否可读,最后main goroutine在1秒后强制中断所有goroutine。

package main

import (
	"fmt"
	"time"
)

func main() {
	ch1 := make(chan int)
	ch2 := make(chan int)
	go pump1(ch1)
	go pump2(ch2)
	go suck(ch1, ch2)
	time.Sleep(1e9)
}
func pump1(ch chan int) {
	for i := 0; i <= 30; i++ {
		if i%2 == 0 {
			ch <- i
		}
	}
}
func pump2(ch chan int) {
	for i := 0; i <= 30; i++ {
		if i%2 == 1 {
			ch <- i
		}
	}
}
func suck(ch1 chan int, ch2 chan int) {
	for {
		select {
		case v := <-ch1:
			fmt.Printf("Recv on ch1: %d\n", v)
		case v := <-ch2:
			fmt.Printf("Recv on ch2: %d\n", v)
		}
	}
}

nil channel和channel类型的channel

当未为channel分配内存时,channel就是nil channel,例如var ch1 chan int。nil channel会永远阻塞对该channel的读、写操作。

nil channel会阻塞对该channel的所有读、写。所以,可以将某个channel设置为nil,进行强制阻塞,对于select分支来说,就是强制禁用此分支。

nil channel

package main

import (
	"fmt"
	"math/rand"
	"time"
)

// 不断向channel c中发送[0,10)的随机数
func send(c chan int) {
	for {
		c <- rand.Intn(10)
	}
}

func add(c chan int) {
	sum := 0

	// 1秒后,将向t.C通道发送时间点,使其可读
	t := time.NewTimer(1 * time.Second)

	for {
		// 一秒内,将一直选择第一个case
		// 一秒后,t.C可读,将选择第二个case
		// c变成nil channel后,两个case分支都将一直阻塞
		select {
		case input := <-c:
			// 不断读取c中的随机数据进行加总
			sum = sum + input
		case <-t.C:
			c = nil
			fmt.Println(sum)
		}
	}
}

func main() {
	c := make(chan int)
	go add(c)
	go send(c)
	// 给3秒时间让前两个goroutine有足够时间运行
	time.Sleep(3 * time.Second)
}

send()向通道c不断发送10以内的随机整数,add()则在一秒内不断读取通道c中的数据并进行加总。一秒时间到后,t.C通道就会有数据,第二个case分支就会被选中,第二个case会让第一个case评估的channel变为nil channel,使得第一个case从此永久禁用,因为第二个case没有多余的数据可读,它也被永久禁用。总共3秒之后,main goroutine结束,程序结束。

如果不理解NewTimer(d),换成After(d)是一样的,After(d)和NewTime(d).C是等价的。

func add(c chan int) {
	sum := 0
	t := time.After(1 * time.Second)
	for {
		select {
		case val := <-c:
			sum = sum + val
		case <-t:
			c = nil
			fmt.Println(sum)
		}
	}
}

 

为select设置超时时间

After()

谁也无法保证某些情况下的select是否会永久阻塞。很多时候都需要设置一下select的超时时间,可以借助time包的After()实现。

time.After()的定义如下:

func After(d Duration) <-chan Time

After()函数接受一个时长d,然后它After()等待d时长,等待时间到后,将等待完成时所处时间点写入到channel中并返回这个只读channel。

所以,将该函数赋值给一个变量时,这个变量是一个只读channel,而channel是一个指针类型的数据,所以它是一个指针。

看下面的示例:

package main

import (
	"fmt"
	"time"
)
func main() {
	fmt.Println(time.Now())
	a := time.After(1*time.Second)
	fmt.Println(<-a)
	fmt.Println(a)
}

 

如果将After()放进select语句块的一个case中,那么就可以让其它的case有一定的时间长度来监听读、写事件,如果在这段时长内其它case还没有有可读、可写事件,这个After()所在case就会结束当前的select,然后终止select(如果select未在循环中)或进入下一轮select(如果select在循环中)。

以下是一个示例:

func main() {
	ch1 := make(chan string)

	// 激活一个goroutine,但5秒之后才发送数据
	go func() {
		time.Sleep(5 * time.Second)
		ch1 <- "put value into ch1"
	}()

	select {
	case val := <-ch1:
		fmt.Println("recv value from ch1:",val)
		return
	// 只等待3秒,然后就结束
	case <-time.After(3 * time.Second):
		fmt.Println("3 second over, timeover")
	}
}

运行后,将在大约3秒之后输出:

3 second over, timeover

上面出现了超时现象,因为新激活的goroutine首先要等待5秒,然后才将数据发送到channel ch1中。但是main goroutine继续运行到select语句块,由于第一个case未满足条件(注意,main goroutine并不会因此而阻塞)。评估第二个case时,将执行time.After()等待3秒,3秒之后读取到该函数返回的通道数据,于是该case满足select的条件,该select因为没有在循环中,所以直接结束,main goroutine也因此而终止。自始至终,新激活的goroutine都没有机会将数据发送到ch1中。

上面有两个注意点:

  • (1).3秒等待时,只有在等待完成时case才被选中,在等待过程中,select一直在评估所有的case右边的表达式
  • (2).在上面的3秒等待过程中,第一个case的评估一直在持续着,因为在等待结束之前,select还未选中任何case,而是一直在评估所有的表达式,包括<-ch1的评估。

上面使用After(),也保证了select一定会选中某一个case,这时可以省略default块。

注意,After()放在select的内部和放在select的外部是完全不一样的,更助于理解的示例见下面的Tick()。

time.Tick()

After(d)是只等待一次d的时长,并在这次等待结束后将当前时间发送到通道。Tick(d)则是间隔地多次等待,每次等待d时长,并在每次间隔结束的时候将当前时间发送到通道。

因为Tick()也是在等待结束的时候发送数据到通道,所以它的返回值是一个channel,从这个channel中可读取每次等待完时的时间点。

下面是一个Tick()和After()结合的示例:

package main

import (
	"fmt"
	"time"
)

func main() {
	select {
	case <-time.Tick(2 * time.Second):
		fmt.Println("2 second over:", time.Now().Second())
	case <-time.After(7 * time.Second):
		fmt.Println("5 second over, timeover", time.Now().Second())
		return
	}
}

上面的示例,在等待2秒之后,就会因为读取到了time.Tick()的通道数据而终止,因为select并未在循环内。

 

package main

import (
	"fmt"
	"time"
)

func main() {
	for {
		select {
		case <-time.Tick(2 * time.Second):
			fmt.Println("2 second over:", time.Now().Second())
		case <-time.After(7 * time.Second):
			fmt.Println("5 second over, timeover", time.Now().Second())
			return
		}
	}
}

如果select在循环内,第二个case将永远选择不到。因为每次select轮询中,第一个case都因为2秒而先被选中,使得第二个case的评估总是被中断。进入下一个select轮询后,又会重新开始评估两个case,分别等待2秒和7秒。

 

 

上面不正常执行的原因是因为每次select都会重新评估这些表达式。如果把这些表达式放在select外面,则正常:

package main

import (
	"fmt"
	"time"
)

func main() {
	tick := time.Tick(1 * time.Second)
	after := time.After(7 * time.Second)
	fmt.Println("start second:",time.Now().Second())
	for {
		select {
		case <-tick:
			fmt.Println("1 second over:", time.Now().Second())
		case <-after:
			fmt.Println("7 second over:", time.Now().Second())
			return
		}
	}
}

将time.Tick()和time.After()放在for...select的外面,使得select每次只评估通道是否可读、可写事件,而不会重新执行time.Tick()和time.After(),使得它们重新进入计时状态。

注意上面的输出结果中,有两行:

1 second over: 45
7 second over: 45

说明在第45秒的时候,两个case都评估为真了,但是这一次选择了第一个case,然后进入下一个select过程,因为select的随机选择性,它会保证所有满足条件的case尽量均衡分布,这次将选择第二个case,它仍然为第45秒,这时因为一次for和select调用所花的时间不可能会超过1秒而进入第46秒。

文章作者: 全栈编程@luboke.com
版权声明: 本文章为全栈编程go语言体系课视频教程配套电子书,版权归 全栈编程@luboke.com所有,欢迎免费学习,转载必须注明出处!但禁止任何商业用途,否则将受到法律制裁!
copyright © 2020 全栈编程@luboke.com