全栈编程

Balance $ 2,317
Item Sold 1230
文章作者: 全栈编程@luboke.com
版权声明: 本文章为go语言体系课视频教程配套电子书,版权归 全栈编程@luboke.com所有,欢迎免费学习,转载必须注明出处!但禁止任何商业用途,否则将受到法律制裁!

指定channel的方向

上面通过两个channel将3个goroutine连接起来,其中起连接作用的是第二个函数addRandNum()。在这个函数中使用了两个channel作为参数:一个channel用于接收、一个channel用于发送。

其实channel类的参数变量可以指定数据流向:

  • in <-chan int:表示channel in通道只用于接收数据
  • out chan<- int:表示channel out通道只用于发送数据

只用于接收数据的通道<-chan不可被关闭,因为关闭通道是针对发送数据而言的,表示无数据再需发送。对于recv来说,关闭通道是没有意义的。

所以,上面示例中三个函数可改写为:

func getRandNum(out chan<- int) {
    ...
}

func addRandNum(in <-chan int, out chan<- int) {
    ...
}

func printRes(in <-chan int){
    ...
}

buffered channel异步队列请求示例

下面是使用buffered channel实现异步处理请求的示例。

在此示例中:

  • 有(最多)3个worker,每个worker是一个goroutine,它们有worker ID。
  • 每个worker都从一个buffered channel中取出待执行的任务,每个任务是一个struct结构,包含了任务id(JobID),当前任务的队列号(ID)以及任务的状态(worker是否执行完成该任务)。
  • 在main goroutine中将每个任务struct发送到buffered channel中,这个buffered channel的容量为10,也就是最多只允许10个任务进行排队。
  • worker每次取出任务后,输出任务号,然后执行任务(run),最后输出任务id已完成。
  • 每个worker执行任务的方式很简单:随机睡眠0-1秒钟,并将任务标记为完成。

以下是代码部分:

package main

import (
	"fmt"
	"io/ioutil"
	"math/rand"
	"net/http"
	"strings"
	"sync"
	"time"
)
//任务结构体
type Task struct {
	ID         int
	JobID      int
	Status     string
	CreateTime time.Time
	Urls	   string
	RStatus 	string
	RStatusCode int
}

func (t *Task) run() {
	sleep := rand.Intn(1000)
	time.Sleep(time.Duration(sleep) * time.Millisecond)
	rstatus,rstatuscode := GetData(t.Urls)
	t.Status = "Completed"
	t.RStatus = rstatus
	t.RStatusCode = rstatuscode
}

var wg sync.WaitGroup

// worker的数量,即使用多少goroutine执行任务
const workerNum = 3

func main() {
	wg.Add(workerNum)

	// 创建容量为10的buffered channel
	taskQueue := make(chan *Task,10)
	// 激活goroutine,执行任务
	for workID := 1; workID <= workerNum; workID++ {
		go worker(taskQueue, workID)
	}
	// 将待执行任务放进buffered channel,共15个任务,由3个goroutine来执行
	for i := 1; i <= 15; i++ {
		taskQueue <- &Task{
			ID:         i,
			JobID:      100 + i,
			CreateTime: time.Now(),
			Urls:"http://www.luboke.com",
		}
	}
	close(taskQueue)
	wg.Wait()
}

func GetData(url string) (string,int) {
	resp,err := http.Get(url)
	if err != nil {
		panic(err)
	}

	defer resp.Body.Close()
	return resp.Status,resp.StatusCode
}

func PostData(url string, data string, contentType string) ([]byte, error) {
	resp,err := http.Post(url, contentType, strings.NewReader(data))
	if err != nil {
		panic(err)
	}

	defer resp.Body.Close()
	return ioutil.ReadAll(resp.Body)
}

// 从buffered channel中读取任务,并执行任务
func worker(in <-chan *Task, workID int) {
	defer wg.Done()
	for v := range in {
		fmt.Printf("Worker-%d: 处理请求【TaskID】:%d, 【JobID】:%d\n", workID, v.ID, v.JobID)
		v.run()
		fmt.Printf("Worker-%d: 完成请求【TaskID】:%d, 【JobID】:%d,【状态描述】:%s,【状态码】:%d\n", workID, v.ID, v.JobID,v.RStatus,v.RStatusCode)
	}
}
文章作者: 全栈编程@luboke.com
版权声明: 本文章为go语言体系课视频教程配套电子书,版权归 全栈编程@luboke.com所有,欢迎免费学习,转载必须注明出处!但禁止任何商业用途,否则将受到法律制裁!
copyright © 2020 全栈编程@luboke.com