api示例:该实例主要功能是实现一个API,API在调用的时候会向channel中发送任务数据。Consumer函数去消费channel中的任务数据,并且可以通过maxConcurrency去控制消费的并发数
package mainimport ("fmt""github.com/kataras/iris/v12""sync""time"
)type APIResponse struct {Data stringStatus string
}var ch = make(chan *APIResponse, 100) // 创建带缓冲的通道,作为数据队列
var wg sync.WaitGroupfunc Consumer(ch <-chan *APIResponse, wgg *sync.WaitGroup, maxConcurrency int) {//var wggg sync.WaitGroupsemaphore := make(chan struct{}, maxConcurrency) // 控制并发数for task := range ch {//wggg.Add(1)semaphore <- struct{}{} // 获取信号令牌,控制并发数go func(t *APIResponse) {defer func() {<-semaphore // 释放信号令牌,允许新的任务并发执行//wg.Done()}()// 处理任务time.Sleep(time.Second * 10)fmt.Println("Processing task:", t.Data, t.Status)// 模拟任务处理耗时// time.Sleep(time.Second)}(task)}//wggg.Wait()
}func main() {app := iris.New()//maxConcurrency := 5 // 设置最大并发数//wg.Add(1)//go Consumer(ch, &wg, maxConcurrency)go Consumer(ch, &wg, maxConcurrency)//// 定义路由和处理函数app.Get("/", func(ctx iris.Context) {//fmt.Println(time.Now().Format("2006-01-02 15:04:05.000"))//defer close(ch)//defer wg.Done()response := &APIResponse{Data: fmt.Sprintf(time.Now().Format("2006-01-02 15:04:05.000")),Status: "200",}ch <- response//fmt.Println("发送---")//fmt.Println("ch : ", len(ch), ch)ctx.HTML("<h1>Welcome to Iris</h1>")})app.Get("/hello", func(ctx iris.Context) {name := ctx.URLParam("name")ctx.Writef("Hello, %s!", name)})// 启动应用程序app.Run(iris.Addr(":8080"))//wg.Wait()
}
模拟并发请求api,生成任务数据:
package mainimport ("fmt""io/ioutil""net/http"
)func main() {for i := 0; i < 30; i++ {fmt.Println("i = ", i)url := "http://localhost:8080"method := "GET"client := &http.Client{}req, err := http.NewRequest(method, url, nil)if err != nil {fmt.Println(err)return}res, err := client.Do(req)if err != nil {fmt.Println(err)return}defer res.Body.Close()body, err := ioutil.ReadAll(res.Body)if err != nil {fmt.Println(err)return}fmt.Println(string(body))}
}
控制并发数方法改进:(有什么区别,有什么优势呢?这是个问题)
func GenWorkConsumer() {var jobGroup sync.WaitGroupsemaphore := make(chan struct{}, 2) // 控制并发数for task := range GenWorkChan {jobGroup.Add(1)semaphore <- struct{}{} // 获取信号令牌,控制并发数go func(t *GenWork) {defer func() {<-semaphore // 释放信号令牌,允许新的任务并发执行jobGroup.Done()}()GenWorkDispatch(t) // 完成具体的工作}(task)}jobGroup.Wait()
}