在 Go语言实战 中看到有些并发相关的例子,讲解得也比较详细,于是乎写来加深下印象。
无缓冲通道
无缓冲通道在接收前没有能力保存任何值。我自己找了书上的示例来加深一下印象。
模拟网球比赛
package mainimport ("fmt""math/rand""sync""time"
)var (wg sync.WaitGroup
)func init() {rand.Seed(time.Now().UnixNano()) // 让每次运行生成的随机数不相同
}func main() {count := make(chan int)wg.Add(2)go player("Nadal", count)go player("Looking", count)count <- 1wg.Wait()
}func player(name string, count chan int) {defer wg.Done()for {ball, ok := <-countif !ok {fmt.Printf("Player %s Win\n", name)return}n := rand.Intn(100)if n%13 == 0 {fmt.Printf("Player %s Missed\n", name)close(count)return}fmt.Printf("Player %s Hit %d\n", name, ball)count <- ball}
}
模拟接力赛
接力赛中,接力棒只能在一个人手中。
package mainimport ("fmt""sync""time"
)var (wg sync.WaitGroup
)func main() {baton := make(chan int)wg.Add(1)go Runner(baton)baton <- 1wg.Wait()
}func Runner(baton chan int) {var newRunner intrunner := <- batonfmt.Printf("Runner %d Running with baton\n", runner)if runner != 4 {newRunner = runner + 1fmt.Printf("Runner %d to the line\n", newRunner)go Runner(baton)}time.Sleep(100 * time.Millisecond)if runner == 4 {fmt.Printf("Runner %d finished, Race over\n", runner)wg.Done()return}fmt.Printf("Runner %d Exchange with runner %d \n", runner, newRunner)baton <- newRunner
}
Runner 1 Running with baton
Runner 2 to the line
Runner 1 Exchange with runner 2
Runner 2 Running with baton
Runner 3 to the line
Runner 2 Exchange with runner 3
Runner 3 Running with baton
Runner 4 to the line
Runner 3 Exchange with runner 4
Runner 4 Running with baton
Runner 4 finished, Race over
有缓冲通道
缓冲通道不强制发送和接收同时完成。
当通道关闭后,goroutine 依旧可以从通道接收数据,但是不能再向通道里发送数据。能够从已经关闭的通道接收数据这一点非常重要,因为这允许通 道关闭后依旧能取出其中缓冲的全部值,而不会有数据丢失。从一个已经关闭且没有数据的通道 里获取数据,总会立刻返回,并返回一个通道类型的零值。如果在获取通道时还加入了可选的标 志,就能得到通道的状态信息。
模拟任务分发和处理
package mainimport ("fmt""math/rand""sync""time"
)const (numberGoroutines = 4taskLoad = 10
)var wg sync.WaitGroupfunc init() {rand.Seed(time.Now().UnixNano())
}func main() {tasks := make(chan string, taskLoad)wg.Add(numberGoroutines)for gr := 1; gr <= numberGoroutines; gr++ {go worker(tasks, gr)}for post := 1; post <= taskLoad; post++ {tasks <- fmt.Sprintf("Task: %d", post)}close(tasks) // 任务发布完毕后关闭通道,关闭通道不影响其它Goroutine对已发布内容的正常接收wg.Wait()
}func worker(tasks chan string, worker int) {defer wg.Done()for {task, ok := <-tasksif !ok {fmt.Printf("Worker: %d : Shutting down\n", worker)return}fmt.Printf("Worker: %d : Started %s\n", worker, task)sleep := rand.Int63n(100)time.Sleep(time.Duration(sleep) * time.Millisecond)fmt.Printf("Worker: %d : Completed %s\n", worker, task)}
}
Worker: 4 : Started Task: 4
Worker: 2 : Started Task: 2
Worker: 3 : Started Task: 3
Worker: 1 : Started Task: 1
Worker: 3 : Completed Task: 3
Worker: 3 : Started Task: 5
Worker: 2 : Completed Task: 2
Worker: 2 : Started Task: 6
Worker: 1 : Completed Task: 1
Worker: 1 : Started Task: 7
Worker: 1 : Completed Task: 7
Worker: 1 : Started Task: 8
Worker: 3 : Completed Task: 5
Worker: 3 : Started Task: 9
Worker: 4 : Completed Task: 4
Worker: 4 : Started Task: 10
Worker: 2 : Completed Task: 6
Worker: 2 : Shutting down
Worker: 3 : Completed Task: 9
Worker: 3 : Shutting down
Worker: 1 : Completed Task: 8
Worker: 1 : Shutting down
Worker: 4 : Completed Task: 10
Worker: 4 : Shutting down
runner
runner 包用于展示如何使用通道来监视程序的执行时间,如果程序运行时间太长,也可以
用runner 包来终止程序。当开发需要调度后台处理任务的程序的时候,这种模式会很有用。
runner/runner.go
package runnerimport ("errors""os""os/signal""time"
)type Runner struct {interrupt chan os.Signalcomplete chan errortimeout <-chan time.Time // 单向通道,只允许接收tasks []func(int)
}var ErrTimeout = errors.New("received timeout")
var ErrInterrupt = errors.New("received interrupt")func New(d time.Duration) *Runner {return &Runner{interrupt: make(chan os.Signal, 1),complete: make(chan error),timeout: time.After(d),}
}func (r *Runner) Add(tasks ...func(int)) {r.tasks = append(r.tasks, tasks...)
}func (r *Runner) Start() error {signal.Notify(r.interrupt, os.Interrupt) // 如果有中断,会将中断信号发送到 r.interruptgo func() {r.complete <- r.run()}()select {case err := <-r.complete:return err // 如果提前中断,err 是 ErrInterrupt,正常结束则是 nilcase <-r.timeout:return ErrTimeout}
}func (r *Runner) run() error {for id, task := range r.tasks {if r.gotInterrupt() {return ErrInterrupt}task(id)}return nil
}func (r *Runner) gotInterrupt() bool {select {case <-r.interrupt:signal.Stop(r.interrupt)return truedefault:return false}
}
main.go
package mainimport ("github.com/test/runner""log""os""time"
)const timeout = 3 * time.Secondfunc main() {log.Println("Starting work.")r := runner.New(timeout)r.Add(createTask(), createTask(), createTask())if err := r.Start(); err != nil {switch err {case runner.ErrTimeout:log.Println("Terminating due to timeout.")os.Exit(1)case runner.ErrInterrupt:log.Println("Terminating due to interrupt.")os.Exit(2)}}log.Println("Process ended.")
}func createTask() func(int) {return func(id int) {log.Printf("Processor - Task #%d.", id)time.Sleep(time.Duration(id) * time.Second)}
}
超时退出
2024/04/21 17:26:30 Starting work.
2024/04/21 17:26:30 Processor - Task #0.
2024/04/21 17:26:30 Processor - Task #1.
2024/04/21 17:26:31 Processor - Task #2.
2024/04/21 17:26:33 Terminating due to timeout.
中断退出
2024/04/21 17:28:18 Starting work.
2024/04/21 17:28:18 Processor - Task #0.
2024/04/21 17:28:18 Processor - Task #1.
2024/04/21 17:28:19 Terminating due to interrupt.
正常退出
2024/04/21 17:30:40 Starting work.
2024/04/21 17:30:40 Processor - Task #0.
2024/04/21 17:30:40 Processor - Task #1.
2024/04/21 17:30:41 Processor - Task #2.
2024/04/21 17:30:43 Process ended.
pool
pool 使用有缓冲通道实现资源池。
pool/pool.go
package poolimport ("errors""io""log""sync"
)type Pool struct {m sync.Mutexresources chan io.Closerfactory func() (io.Closer, error)closed bool
}var ErrPoolClosed = errors.New("pool has been closed")func New(fn func() (io.Closer, error), size uint) (*Pool, error) {if size <= 0 {return nil, errors.New("size value too small")}return &Pool{resources: make(chan io.Closer, size),factory: fn,}, nil
}func (p *Pool) Acquire() (io.Closer, error) {select {case r, ok := <-p.resources:log.Println("Acquire:", "Shared resource")if !ok {return nil, ErrPoolClosed}return r, nildefault:log.Println("Acquire:", "New resource")return p.factory()}
}func (p *Pool) Release(r io.Closer) {p.m.Lock()defer p.m.Unlock()if p.closed {r.Close()return}select {case p.resources <- r:log.Println("Release:", "In queue")default:log.Println("Release:", "Closing")r.Close()}
}func (p *Pool) Close() {p.m.Lock()defer p.m.Unlock()if p.closed {return}p.closed = trueclose(p.resources)for r := range p.resources {r.Close()}
}
main.go
package mainimport ("github.com/test/pool""io""log""math/rand""sync""sync/atomic""time"
)const (maxGoroutines = 5pooledResources = 2
)type dbConnection struct {ID int32
}func (dbConn *dbConnection) Close() error {log.Println("Close: Connection", dbConn.ID)return nil
}var idCounter int32func createConnection() (io.Closer, error) {id := atomic.AddInt32(&idCounter, 1)log.Println("Create: New Connection", id)return &dbConnection{id}, nil
}func main() {var wg sync.WaitGroupwg.Add(maxGoroutines)p, err := pool.New(createConnection, pooledResources)if err != nil {log.Println(err)}for query := 0; query < maxGoroutines; query++ {go func(q int) {defer wg.Done()performQueries(q, p)}(query)}wg.Wait()log.Println("Shutdown program")p.Close()
}func performQueries(query int, p *pool.Pool) {conn, err := p.Acquire()if err != nil {log.Println(err)return}defer p.Release(conn)time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond)log.Printf("QID[%d] CID[%d]\n", query, conn.(*dbConnection).ID)
}
运行结果
2024/04/21 20:29:20 Acquire: New resource
2024/04/21 20:29:20 Create: New Connection 1
2024/04/21 20:29:20 Acquire: New resource
2024/04/21 20:29:20 Acquire: New resource
2024/04/21 20:29:20 Create: New Connection 3
2024/04/21 20:29:20 Acquire: New resource
2024/04/21 20:29:20 Create: New Connection 4
2024/04/21 20:29:20 Create: New Connection 2
2024/04/21 20:29:20 Acquire: New resource
2024/04/21 20:29:20 Create: New Connection 5
2024/04/21 20:29:20 QID[0] CID[2]
2024/04/21 20:29:20 Release: In queue
2024/04/21 20:29:20 QID[2] CID[5]
2024/04/21 20:29:20 Release: In queue
2024/04/21 20:29:20 QID[4] CID[1]
2024/04/21 20:29:20 Release: Closing
2024/04/21 20:29:20 Close: Connection 1
2024/04/21 20:29:21 QID[1] CID[4]
2024/04/21 20:29:21 Release: Closing
2024/04/21 20:29:21 Close: Connection 4
2024/04/21 20:29:21 QID[3] CID[3]
2024/04/21 20:29:21 Release: Closing
2024/04/21 20:29:21 Close: Connection 3
2024/04/21 20:29:21 Shutdown program
2024/04/21 20:29:21 Close: Connection 2
2024/04/21 20:29:21 Close: Connection 5
work
work 使用无缓冲通道创建资源池。work 新建时就生成指定个数 goroutine 循环等待(无缓冲通道无数据阻塞)消费任务,然后主线程再分发相应任务到通道,消费 goroutine 再继续执行消费任务。
work/work.go
package workimport "sync"type Worker interface {Task()
}type Pool struct {work chan Workerwg sync.WaitGroup
}func New(maxGoroutines int) *Pool {p := Pool{work: make(chan Worker),}p.wg.Add(maxGoroutines)for i := 0; i < maxGoroutines; i++ {go func() {defer p.wg.Done()for w := range p.work { // 如果 work 关闭,for range 循环结束w.Task()}}()}return &p
}func (p *Pool) Run(w Worker) {p.work <- w
}func (p *Pool) Shutdown() {close(p.work) // 关闭通道,避免 New 产生的 goroutine 一直阻塞不退出p.wg.Wait()
}
main.go
package mainimport ("github.com/test/work""log""sync""time"
)var names = []string{"steve","jason","looking",
}type namePrinter struct {name string
}func (m *namePrinter) Task() {log.Println(m.name)time.Sleep(time.Second)
}const times = 3func main() {p := work.New(2)var wg sync.WaitGroupwg.Add(times * len(names))for i := 0; i < times; i++ {for _, name := range names {np := namePrinter{name: name}go func() {defer wg.Done()p.Run(&np)}()}}wg.Wait()p.Shutdown()
}
运行结果
2024/04/21 21:39:28 steve
2024/04/21 21:39:28 looking
2024/04/21 21:39:29 jason
2024/04/21 21:39:29 looking
2024/04/21 21:39:30 steve
2024/04/21 21:39:30 jason
2024/04/21 21:39:31 looking
2024/04/21 21:39:31 steve
2024/04/21 21:39:32 jason