channel
// cancelFn 数据通道关闭通知退出
func cancelFn(dataChan chan int) {for {select {case val, ok := <-dataChan:// 关闭data通道时,通知退出// 一个可选是判断data=指定值时退出if !ok {fmt.Printf("Channel closed !!!")return}fmt.Printf("Receive data from dataChan %d\n", val)}}
}func main() {channels := make([]chan int, 10)for i := 0; i < 10; i++ {channels[i] = make(chan int)go cancelFn(channels[i])channels[i] <- 1 // 向管道写数据fmt.Println(i, "quit")}
}
watitGroup
var wg sync.WaitGroupfunc main() {ch := make(chan int)wg.Add(1) //设置计数器 表示goroutine个数加1go func() {v, ok := <-chif ok {fmt.Println("value", v)}wg.Done() //执行结束之后 , goroutine个数减1}()wg.Add(1)go func() {ch <- 4wg.Done()}()wg.Wait() //主goroutine阻塞,等待计数器变为0
}
WaitGroup原理
type WaitGroup struct {statel [3]uint32/*长度为3的数组包含两个计数器和一个信号量counter : 当前还未执行的结束的goroutine计数器waiter count : 等待goroutine-group结束的goroutine数量semaphore: 信号量*/
}
WaitGroup对外提供了三个接口
- Add(delta int) : 将delta值加到counter中
- Wait(): waiter递增加1 , 并阻塞等待信号量semaphore
- Done(): counter递减1 , 按照waiter数值释放相应次数的信号量
Add(delta int)
Add() 做了两件事 , 一是把delta值累加到counter中,因为delta可以为负值.所以说当counter变为0时,根据waiter数值释放等量的信号量 , 把等待的goroutine全部唤醒,如果couner变为负值,则触发panic.
Wait()
Wait()方法一个是要累加waiter , 二是阻塞等待信号量.
Done()
Done 只做一件事,把counter减少1,其实Done里面调用的就是Add(-1)
context原理
Context实际上只定义了接口,凡是实现该接口的类都能称为Context.
type Context interface {Deadline() (deadline time.Time , ok bool)Done() <-chan struct{}Err() errorvalue(key interface{}) interface{}
}
Deadline()
该方法返回一个deadline和标识是否已设置deadline的bool值,如果没有设置deadline , 则ok为false,此时deadline为一个初始值的time.Time值.
Done()
该方法返回一个用于探测context是否取消的channel,当context取消时,会自动将该channel关闭. 对于不支持取消的context(如:context.Backgroud) , 该方法可能会返回nil.
Err()
该方法描述context关闭的原因.关闭原因由context实现控制.
value()
有一种context,它不是用于控制呈树状分布的goroutine , 而是用于在树状分布的goroutine之间传递信息.Value()方法就是此种类型的context,根据key查询map集合中的value.
空context
context包中定义了一个公用的emptyCtx全局变量 , 名为backgroud,可以使用context.Backgroud()获取它.context包中提供了四个方法创建不同类型的context , 使用这四个方法如果没有父context,则都需要传入background , 即将background作为父节点:
- WithCancel();
- WithDeadline();
- WithTimeout();
- WithValue();
context包中实现Context接口的struct,除了emptyCtx , 还有cancelCtx , timerCtx 和 valueCtx三种.
cancelCtx
type cancelCtx struct {Contextmu sync.Mutexdone chan struct{}children map[canceler]struct{}err error
}
children 中记录了由此context 派生的所有child , 此context被"cancel"时,会把其中所有的child都cancel掉.cancelCtx与deadline和value无关 , 所以只需要实现Done() 和 Err() 外露接口即可.
Cancel()接口的实现
cancel()内部方法时理解cancelCtx的关键cancelCtx.children的map中,其中key值即后代对象,value值并没有意义.
func (c *cancelCtx) cancel(removeFromParent bool, err error) {c.mu.Lock()c.err = err //设置一个error,说明关闭原因close(c.done) //将channel关闭,以此通知派生的contextfor child := range c.children { //遍历所有children,逐个调用cancel方法child.cancel(false, err)}c.children = nilc.mu.Unlock()if removeFromParent { //正常情况下,需要将自己从parent删除removeChild(c.Context, c)}
}
WithCancel()方法的使用案例
func HandelRequest(ctx context.Context) {go WriteRedis(ctx)go WriteDatabase(ctx)for {select {case <-ctx.Done():fmt.Println("HandelRequest Done.")returndefault:fmt.Println("HandelRequest running")time.Sleep(2 * time.Second)}}
}
func WriteRedis(ctx context.Context) {for {select {case <-ctx.Done():fmt.Println("WriteRedis Done.")returndefault:fmt.Println("WriteRedis running")time.Sleep(2 * time.Second)}}
}
func WriteDatabase(ctx context.Context) {for {select {case <-ctx.Done():fmt.Println("WriteDatabase Done.")returndefault:fmt.Println("WriteDatabase running")time.Sleep(2 * time.Second)}}
}
func main() {ctx, cancel := context.WithCancel(context.Background())go HandelRequest(ctx)time.Sleep(5 * time.Second)fmt.Println("It's time to stop all sub goroutines!")cancel()//Just for test whether sub goroutines exit or nottime.Sleep(5 * time.Second)
}
HandelRequest()用于处理某个请求 , 其又会创建两个协程 , main协程可以在适当时机cancel掉所有自子协程
timeCtx
type timerCtx struct {cancelCtxtimer *time.Timer deadline time.Time
}
timerCtx 在cancelCtx的基础上,增加了deadline用于标示自动cancel的最终时间,而timer就是一个触发自动cancel的定时器.由此衍生出了WithDeadline()和WithTimeout().
- deadline:指定最后期限.
- timeout: 指定最长存活时间.
package mainimport ("fmt""time""context"
)func HandelRequest(ctx context.Context) {go WriteRedis(ctx)go WriteDatabase(ctx)for {select {case <-ctx.Done():fmt.Println("HandelRequest Done.")returndefault:fmt.Println("HandelRequest running")time.Sleep(2 * time.Second)}}
}func WriteRedis(ctx context.Context) {for {select {case <-ctx.Done():fmt.Println("WriteRedis Done.")returndefault:fmt.Println("WriteRedis running")time.Sleep(2 * time.Second)}}
}func WriteDatabase(ctx context.Context) {for {select {case <-ctx.Done():fmt.Println("WriteDatabase Done.")returndefault:fmt.Println("WriteDatabase running")time.Sleep(2 * time.Second)}}
}func main() {ctx, _ := context.WithTimeout(context.Background(), 5 * time.Second)go HandelRequest(ctx)time.Sleep(10 * time.Second)
}
valueCtx
type valueCtx struct {Contextkey, val interface{}
}
valueCtx 只是在Context基础上增加了一个key-value对,用于在各级协程之间传递数据.因此只需要实现Value()接口.
func HandelRequest(ctx context.Context) {for {select {case <-ctx.Done():fmt.Println("HandelRequest Done.")returndefault:fmt.Println("HandelRequest running, parameter: ", ctx.Value("parameter"))time.Sleep(2 * time.Second)}}
}func main() {ctx := context.WithValue(context.Background(), "parameter", "1")go HandelRequest(ctx)time.Sleep(10 * time.Second)
}
子协程可以读到context的key-value