本文章为go语言体系课视频教程配套电子书,版权归 全栈编程@luboke.com所有,欢迎免费学习,转载必须注明出处!但禁止任何商业用途,否则将受到法律制裁!
指定channel的方向
上面通过两个channel将3个goroutine连接起来,其中起连接作用的是第二个函数addRandNum()。在这个函数中使用了两个channel作为参数:一个channel用于接收、一个channel用于发送。
其实channel类的参数变量可以指定数据流向:
in <-chan int
:表示channel in通道只用于接收数据out chan<- int
:表示channel out通道只用于发送数据
只用于接收数据的通道<-chan
不可被关闭,因为关闭通道是针对发送数据而言的,表示无数据再需发送。对于recv来说,关闭通道是没有意义的。
所以,上面示例中三个函数可改写为:
func getRandNum(out chan<- int) {
...
}
func addRandNum(in <-chan int, out chan<- int) {
...
}
func printRes(in <-chan int){
...
}
buffered channel异步队列请求示例
下面是使用buffered channel实现异步处理请求的示例。
在此示例中:
- 有(最多)3个worker,每个worker是一个goroutine,它们有worker ID。
- 每个worker都从一个buffered channel中取出待执行的任务,每个任务是一个struct结构,包含了任务id(JobID),当前任务的队列号(ID)以及任务的状态(worker是否执行完成该任务)。
- 在main goroutine中将每个任务struct发送到buffered channel中,这个buffered channel的容量为10,也就是最多只允许10个任务进行排队。
- worker每次取出任务后,输出任务号,然后执行任务(run),最后输出任务id已完成。
- 每个worker执行任务的方式很简单:随机睡眠0-1秒钟,并将任务标记为完成。
以下是代码部分:
package main
import (
"fmt"
"io/ioutil"
"math/rand"
"net/http"
"strings"
"sync"
"time"
)
//任务结构体
type Task struct {
ID int
JobID int
Status string
CreateTime time.Time
Urls string
RStatus string
RStatusCode int
}
func (t *Task) run() {
sleep := rand.Intn(1000)
time.Sleep(time.Duration(sleep) * time.Millisecond)
rstatus,rstatuscode := GetData(t.Urls)
t.Status = "Completed"
t.RStatus = rstatus
t.RStatusCode = rstatuscode
}
var wg sync.WaitGroup
// worker的数量,即使用多少goroutine执行任务
const workerNum = 3
func main() {
wg.Add(workerNum)
// 创建容量为10的buffered channel
taskQueue := make(chan *Task,10)
// 激活goroutine,执行任务
for workID := 1; workID <= workerNum; workID++ {
go worker(taskQueue, workID)
}
// 将待执行任务放进buffered channel,共15个任务,由3个goroutine来执行
for i := 1; i <= 15; i++ {
taskQueue <- &Task{
ID: i,
JobID: 100 + i,
CreateTime: time.Now(),
Urls:"http://www.luboke.com",
}
}
close(taskQueue)
wg.Wait()
}
func GetData(url string) (string,int) {
resp,err := http.Get(url)
if err != nil {
panic(err)
}
defer resp.Body.Close()
return resp.Status,resp.StatusCode
}
func PostData(url string, data string, contentType string) ([]byte, error) {
resp,err := http.Post(url, contentType, strings.NewReader(data))
if err != nil {
panic(err)
}
defer resp.Body.Close()
return ioutil.ReadAll(resp.Body)
}
// 从buffered channel中读取任务,并执行任务
func worker(in <-chan *Task, workID int) {
defer wg.Done()
for v := range in {
fmt.Printf("Worker-%d: 处理请求【TaskID】:%d, 【JobID】:%d\n", workID, v.ID, v.JobID)
v.run()
fmt.Printf("Worker-%d: 完成请求【TaskID】:%d, 【JobID】:%d,【状态描述】:%s,【状态码】:%d\n", workID, v.ID, v.JobID,v.RStatus,v.RStatusCode)
}
}
本文章为go语言体系课视频教程配套电子书,版权归 全栈编程@luboke.com所有,欢迎免费学习,转载必须注明出处!但禁止任何商业用途,否则将受到法律制裁!