go 的 goroutine 提供了一种比线程而言更廉价的方式处理并发场景。相比线程,协程占据更小的内存空间,并且由于是在用户态进行调度,上下文切换的代价更小。所以协程更加容易支撑几万几百万的并发。但 goroutine 太多仍会导致调度性能下降、GC 频繁、内存暴涨, 引发一系列问题。
因此本文的目的是学习如何实现一个go协程池。
借鉴java的线程池,定义如下的结构体
type GoroutinePool struct {name stringcoreSize uint32 //定义有多少协程taskChan chan func() //类似java的Runable中的run方法stop bool //是否停止协程池
}
新建一个协程池,通过start方法启动协程。
使用select实现任务的执行和协程的销毁
func NewGoroutinePool(name string, coreSize uint32) *GoroutinePool{goroutinePool := &GoroutinePool{name: name,coreSize: coreSize,taskChan: make(chan func()),stop: false,}goroutinePool.start()return goroutinePool
}func (pool *GoroutinePool) start() {for i := uint32(0); i < pool.coreSize; i++ {go func() {for {select {case task := <-pool.taskChan:task()default:if pool.stop && len(pool.taskChan) == 0{log.Printf("stop")close(pool.taskChan)break}}}}()}
}
提交任务并且执行,使用go的recover()机制,避免panic导致协程终止
func (pool *GoroutinePool) Execute(tasks ...Task) error {if pool.stop {return fmt.Errorf("pool is stop")}for _, t := range tasks {task := tfun := func() { pool.exec(task) }pool.taskChan <- fun}return nil
}func (pool *GoroutinePool) exec(task Task) {defer func() {if err := recover(); err != nil {stacks := pool.getStacks(5, 6)log.Printf("%s pool exec panic:%v,stack:%v", pool.name, err, stacks)}}()result, err := task()log.Printf("result:%v,err:%v", result, err)
}
停止协程、执行异常时获取堆栈信息
func (pool *GoroutinePool) Stop() {pool.stop = true
}func (pool *GoroutinePool) getStacks(skip int, maxNum int) []string {pc := make([]uintptr, maxNum)n := runtime.Callers(skip, pc)var stacks []stringfor i := 0; i < n; i++ {f := runtime.FuncForPC(pc[i])if f == nil {stacks = append(stacks, "unknown Func")} else {file, line := f.FileLine(pc[i])stacks = append(stacks, fmt.Sprintf("%v %v %v", f.Name(), file, line))}}return stacks
}
可以看出利用golang的go语法糖和channel机制可以很容易的实现一个协程池。
但是本文实现的协程池还缺少了:
1、协程池大小的动态扩展能力;例如java支持coreSzie和maxSize,允许一定的突发。
2、拒绝策略。
3、使用pool.taskChan <- fun 进行任务的提交,当channel满时,会阻塞业务逻辑。
推荐阅读
1、原来阿里字节员工简历长这样
2、一条SQL差点引发离职
3、MySQL并发插入导致死锁
如果你也觉得我的分享有价值,记得点赞或者收藏哦!你的鼓励与支持,会让我更有动力写出更好的文章哦!