以 go1.23.3 linux/amd64 为例。
定时器示例代码:
package mainimport ("context""fmt""time"
)var ctx context.Contextfunc main() {timeout := 600 * time.Secondctx, _ = context.WithTimeout(context.Background(), timeout)deadline, _ := ctx.Deadline()fmt.Println("process start", time.Now().Format(time.DateTime))fmt.Println("ctx deadline", deadline.Format(time.DateTime))go func() {defer func() {fmt.Println("goroutine exit", time.Now().Format(time.DateTime))}()for {select {case <-ctx.Done():fmt.Println("ctx.Done", time.Now().Format(time.DateTime))returndefault:fmt.Println("do something start", time.Now().Format(time.DateTime))time.Sleep(60 * time.Second)fmt.Println("do something end ", time.Now().Format(time.DateTime))}}}()time.Sleep(timeout + 10*time.Second)fmt.Println("process exit", time.Now().Format(time.DateTime))
}
定时器创建流程:
定时器的类型为:runtime.timer,新建定时器会调用runtime.newTimer函数。
runtime.newTimer函数会调用func (t *timer) maybeAdd(),在此函数中将定时器放入ts堆中:
func (t *timer) maybeAdd() {// Note: Not holding any locks on entry to t.maybeAdd,// so the current g can be rescheduled to a different M and P// at any time, including between the ts := assignment and the// call to ts.lock. If a reschedule happened then, we would be// adding t to some other P's timers, perhaps even a P that the scheduler// has marked as idle with no timers, in which case the timer could// go unnoticed until long after t.when.// Calling acquirem instead of using getg().m makes sure that// we end up locking and inserting into the current P's timers.mp := acquirem()ts := &mp.p.ptr().timersts.lock()ts.cleanHead()t.lock()t.trace("maybeAdd")when := int64(0)wake := falseif t.needsAdd() {t.state |= timerHeapedwhen = t.whenwakeTime := ts.wakeTime()wake = wakeTime == 0 || when < wakeTimets.addHeap(t)}t.unlock()ts.unlock()releasem(mp)if wake {wakeNetPoller(when)}
}
入堆:
// addHeap adds t to the timers heap.
// The caller must hold ts.lock or the world must be stopped.
// The caller must also have checked that t belongs in the heap.
// Callers that are not sure can call t.maybeAdd instead,
// but note that maybeAdd has different locking requirements.
func (ts *timers) addHeap(t *timer) {assertWorldStoppedOrLockHeld(&ts.mu)// Timers rely on the network poller, so make sure the poller// has started.if netpollInited.Load() == 0 {netpollGenericInit()}if t.ts != nil {throw("ts set in timer")}t.ts = tsts.heap = append(ts.heap, timerWhen{t, t.when})ts.siftUp(len(ts.heap) - 1)if t == ts.heap[0].timer {ts.updateMinWhenHeap()}
}
timers堆为每P持有,保存P队列中协程定义的定时器。
// A timers is a per-P set of timers.
type timers struct {// mu protects timers; timers are per-P, but the scheduler can// access the timers of another P, so we have to lock.mu mutex// heap is the set of timers, ordered by heap[i].when.// Must hold lock to access.heap []timerWhen// len is an atomic copy of len(heap).len atomic.Uint32// zombies is the number of timers in the heap// that are marked for removal.zombies atomic.Int32// raceCtx is the race context used while executing timer functions.raceCtx uintptr// minWhenHeap is the minimum heap[i].when value (= heap[0].when).// The wakeTime method uses minWhenHeap and minWhenModified// to determine the next wake time.// If minWhenHeap = 0, it means there are no timers in the heap.minWhenHeap atomic.Int64// minWhenModified is a lower bound on the minimum// heap[i].when over timers with the timerModified bit set.// If minWhenModified = 0, it means there are no timerModified timers in the heap.minWhenModified atomic.Int64
}type timerWhen struct {timer *timerwhen int64
}
创建定时器堆栈如图:
定时器触发流程:
timers堆的定时器通过func (ts *timers) run(now int64) int64出堆并运行。
而检查是否有定时器到期是通过函数func (ts *timers) check(now int64) (rnow, pollUntil int64, ran bool)中的func (ts *timers) wakeTime() int64进行的。
check函数和wakeTime函数的调度时机在runtime/proc.go文件中多处存在,如runtime.findRunnable()、runtime.stealWork(now int64)、runtime.schedule()等。
这种依赖协程调度、系统调用等触发的定时器检查,延迟时间最多可达到func sysmon()协程的间隔时间10ms。
触发定时器堆栈如图:
另外在新建定时器时,也会检查timers堆顶部的定时器剩余时间,如果已经到期也会立刻通过runtime.wakeNetPoller(when int64)触发runtime.netpoll(delay int64)返回,检查是否存在可处理的event,然后进行timers堆的定时器check。
定时器精度小结:
golang内置的Timer定时器维护在用户态,比较轻量,依赖协程调度、系统调用、event等来触发时间到期检查,延迟在10ms以内,精度不高。
定时器的观测:
修改源码创建多个ctx定时器:
package mainimport ("context""fmt""time"
)var ctx context.Contextfunc main() {timeout := 300 * time.Secondctx, _ = context.WithTimeout(context.Background(), timeout)ctx, _ = context.WithTimeout(ctx, 180*time.Second)deadline, _ := ctx.Deadline()fmt.Println("process start", time.Now().Format(time.DateTime))fmt.Println("ctx deadline", deadline.Format(time.DateTime))go func() {defer func() {fmt.Println("goroutine exit", time.Now().Format(time.DateTime))}()for {select {case <-ctx.Done():fmt.Println("ctx.Done", time.Now().Format(time.DateTime))returndefault:fmt.Println("do something start", time.Now().Format(time.DateTime))time.Sleep(5 * time.Second)fmt.Println("do something end ", time.Now().Format(time.DateTime))}}}()time.Sleep(timeout + 10*time.Second)fmt.Println("process exit", time.Now().Format(time.DateTime))
}
dlv调试:
1、查看当前的定时器数量:
p runtime.allp[1].timers.heap
2、查看每个定时器的超时时间:
p (runtime.allp[1].timers.heap[0].when - time.startNano)/int64(time.Second)
p (runtime.allp[1].timers.heap[0].when - time.startNano)/int64(time.Second)
p (runtime.allp[1].timers.heap[0].when - time.startNano)/int64(time.Second)
3、调用其中一个定时器的回调函数:
call runtime.allp[1].timers.heap[0].timer.arg.(func())()
4、查看控制台输出:
共有3个定时器,分别是ctx的2个和主协程的time.Sleep,其中timers堆顶是180s的定时器。
在手工调用timers堆顶定时器的回调函数后,提前收到ctx.Done通知,程序提前退出。
如图:
cancelCtx的父子关系:
继续上面的例子:
1、查看ctx两个定时器的回调函数是否一致:
p runtime.allp[1].timers.heap[0].timer.arg
p runtime.allp[1].timers.heap[1].timer.arg
2、查看父子cancelCtx变量内容:
#子ctx
p ctx
#父ctx
p ctx.cancelCtx.Context
3、观测结果说明:
父子cancelCtx回调函数内部引用的外部变量context.timerCtx并不相同。
父子cancelCtx之间是嵌套关系,子嵌套(继承)父。
最终使用的ctx为子ctx,子ctx的任一层父ctx的超时都会导致子ctx退出。
如图:
父子cancelCtx的嵌套关系通过函数func (c *cancelCtx) propagateCancel(parent Context, child canceler)完成:
// propagateCancel arranges for child to be canceled when parent is.
// It sets the parent context of cancelCtx.
func (c *cancelCtx) propagateCancel(parent Context, child canceler) {c.Context = parentdone := parent.Done()if done == nil {return // parent is never canceled}select {case <-done:// parent is already canceledchild.cancel(false, parent.Err(), Cause(parent))returndefault:}if p, ok := parentCancelCtx(parent); ok {// parent is a *cancelCtx, or derives from one.p.mu.Lock()if p.err != nil {// parent has already been canceledchild.cancel(false, p.err, p.cause)} else {if p.children == nil {p.children = make(map[canceler]struct{})}p.children[child] = struct{}{}}p.mu.Unlock()return}if a, ok := parent.(afterFuncer); ok {// parent implements an AfterFunc method.c.mu.Lock()stop := a.AfterFunc(func() {child.cancel(false, parent.Err(), Cause(parent))})c.Context = stopCtx{Context: parent,stop: stop,}c.mu.Unlock()return}goroutines.Add(1)go func() {select {case <-parent.Done():child.cancel(false, parent.Err(), Cause(parent))case <-child.Done():}}()
}
--end--