聊聊二叉堆、红黑树、时间轮在定时任务中的应用

定时任务作为常用的一种调度方式,在各大系统得到了广泛的应用。
笔者也曾写过两篇关于定时任务框架介绍的文章:

  • 《介绍一下,spring cloud下的另一种定时任务解决方案》
  • 《四叉堆在GO中的应用-定时任务timer》

之前都是以如何使用为主,这次从数据结构与调度机制角度出发,对java中的定时任务再整体回顾一下。

单线程队列-timer

首先回顾下jdk中自带的timer。
以每隔5秒输出当前时间戳为例,代码如下:

        Timer timer = new Timer();timer.scheduleAtFixedRate(new TimerTask() {@Overridepublic void run() {System.out.println(System.currentTimeMillis());}}, 0, 5_000);

代码非常简洁,调用timer的scheduleAtFixedRate对TimerTask中的方法进行定时触发。

看一下Timer类的构成,类图如下:
timer

结合代码看一下:

public class Timer {//存放TimerTask的队列private final TaskQueue queue = new TaskQueue();//执行TimerTask的线程private final TimerThread thread = new TimerThread(queue);private final Object threadReaper = new Object() {@SuppressWarnings("deprecation")protected void finalize() throws Throwable {synchronized(queue) {thread.newTasksMayBeScheduled = false;queue.notify(); // In case queue is empty.}}};private static final AtomicInteger nextSerialNumber = new AtomicInteger(0);

nextSerialNumber是static的,以确保在同一个JVM中所有的serialNumber都是自增唯一的。
以定时执行某个任务为例,向Timer提交一个定时任务调用的是scheduleAtFixedRate方法。

    public void scheduleAtFixedRate(TimerTask task, long delay, long period) {if (delay < 0)throw new IllegalArgumentException("Negative delay.");if (period <= 0)throw new IllegalArgumentException("Non-positive period.");//执行时间=当前时间戳+延迟时间sched(task, System.currentTimeMillis()+delay, period);}private void sched(TimerTask task, long time, long period) {if (time < 0)throw new IllegalArgumentException("Illegal execution time.");// Constrain value of period sufficiently to prevent numeric// overflow while still being effectively infinitely large.if (Math.abs(period) > (Long.MAX_VALUE >> 1))period >>= 1;synchronized(queue) {if (!thread.newTasksMayBeScheduled)throw new IllegalStateException("Timer already cancelled.");synchronized(task.lock) {if (task.state != TimerTask.VIRGIN)throw new IllegalStateException("Task already scheduled or cancelled");task.nextExecutionTime = time;task.period = period;task.state = TimerTask.SCHEDULED;}//将次task添加到队列中queue.add(task);if (queue.getMin() == task)queue.notify();}}

timer除了支持定时周期性任务scheduleAtFixedRate,也支持一次性延迟任务,最终都会调用用sched方法。
sched方法中仅实现入队的操作,且如果提交的Task位于队列头部则立即唤醒queue。

timer中入队的操作为二叉堆算法实现,细节不再复述。
如果向timer提交的TASK不位于队列头部,则由Timer中的TimerThread调度,首次调度时间为Timer初始化时开始。

    public Timer(String name) {thread.setName(name);thread.start();}

调用过程为一个死循环,详细逻辑位于mainLoop方法中。

    public void run() {try {mainLoop();} finally {// Someone killed this Thread, behave as if Timer cancelledsynchronized(queue) {newTasksMayBeScheduled = false;queue.clear();  // Eliminate obsolete references}}}private void mainLoop() {while (true) {try {TimerTask task;boolean taskFired;synchronized(queue) {// Wait for queue to become non-emptywhile (queue.isEmpty() && newTasksMayBeScheduled)queue.wait();if (queue.isEmpty())break; // Queue is empty and will forever remain; die// Queue nonempty; look at first evt and do the right thinglong currentTime, executionTime;task = queue.getMin();synchronized(task.lock) {if (task.state == TimerTask.CANCELLED) {queue.removeMin();continue;  // No action required, poll queue again}currentTime = System.currentTimeMillis();executionTime = task.nextExecutionTime;if (taskFired = (executionTime<=currentTime)) {if (task.period == 0) { // Non-repeating, removequeue.removeMin();task.state = TimerTask.EXECUTED;} else { // Repeating task, reschedulequeue.rescheduleMin(task.period<0 ? currentTime   - task.period: executionTime + task.period);}}}if (!taskFired) // Task hasn't yet fired; waitqueue.wait(executionTime - currentTime);}if (taskFired)  // Task fired; run it, holding no lockstask.run();} catch(InterruptedException e) {}}}

判断队列头部TASK是否达到执行时间,如满足则调用task.run,也就是运行此定时任务。

采用二叉堆,在一个线程中调用,与GO中自带的定时任务非常类似,整体比较简单。

线程池timer

通过前文了解,咱们知道了通过Timer+TimerTask可实现简单类型的定时任务,但在实际开发过程中如果安装了alibaba的代码规范检测插件(https://github.com/alibaba/p3c),Alibaba Java Coding Guidelines
则会对TimerTask报告警,如:
p3c-waring
它要求使用ScheduledExecutorService来替换Timer。

那么,ScheduledExecutorService是何方神圣?
熟悉JAVA的老司机都知道ScheduledExecutorService它是一个接口,其完整路径为:java.util.concurrent.ScheduledExecutorService ,其类图如下:
ExecutorService-class

它提供了与Timer类似的方法,有:

    public ScheduledFuture<?> schedule(Runnable command,long delay, TimeUnit unit);public <V> ScheduledFuture<V> schedule(Callable<V> callable,long delay, TimeUnit unit);public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit);public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,long initialDelay,long delay,TimeUnit unit);

在JDK中自带的ScheduledExecutorService实现类为ScheduledThreadPoolExecutor,其继承自ThreadPoolExecutor。

快速创建可使用JDK中的Executors生成一个ScheduledThreadPoolExecutor,如:

ScheduledExecutorService schService = Executors.newSingleThreadScheduledExecutor();

也或者手动指定ScheduledThreadPoolExecutor的构造参数创建,常用构造参数为:

    public ScheduledThreadPoolExecutor(int corePoolSize,ThreadFactory threadFactory,RejectedExecutionHandler handler)

DelayedWorkQueue

以单参数corePoolSize为例,可以看到ScheduledThreadPoolExecutor的一个重要入参数为DelayedWorkQueue。

    public ScheduledThreadPoolExecutor(int corePoolSize) {super(corePoolSize, Integer.MAX_VALUE,DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,new DelayedWorkQueue());}

与DelayQueue类似的,DelayedWorkQueue也是一个延迟队列,基于堆实现。它主要用于ScheduledThreadPoolExecutor中的任务调度管理。

JDK源码中对于DelayedWorkQueue介绍为:

/** A DelayedWorkQueue is based on a heap-based data structure* like those in DelayQueue and PriorityQueue, except that* every ScheduledFutureTask also records its index into the* heap array. This eliminates the need to find a task upon* cancellation, greatly speeding up removal (down from O(n)* to O(log n)), and reducing garbage retention that would* otherwise occur by waiting for the element to rise to top* before clearing. But because the queue may also hold* RunnableScheduledFutures that are not ScheduledFutureTasks,* we are not guaranteed to have such indices available, in* which case we fall back to linear search. (We expect that* most tasks will not be decorated, and that the faster cases* will be much more common.)** All heap operations must record index changes -- mainly* within siftUp and siftDown. Upon removal, a task's* heapIndex is set to -1. Note that ScheduledFutureTasks can* appear at most once in the queue (this need not be true for* other kinds of tasks or work queues), so are uniquely* identified by heapIndex.*/

关于DelayedWorkQueue中对堆的详细操作这里不再展开,与其他堆的操作类似的,都由siftUp(上推)和siftDown(下沉)构成,与DelayQueue不同的地方是DelayedWorkQueue中存储的每个节点会记录它在队列中的index。这样做的好处是在取消某个任务时可以快速定位到被取消的任务在堆中的位置,

每当有新的任务被提交到ScheduledThreadPoolExecutor时,最终都会被添加到此队列中。

    private void delayedExecute(RunnableScheduledFuture<?> task) {if (isShutdown())reject(task);else {super.getQueue().add(task);if (!canRunInCurrentRunState(task) && remove(task))task.cancel(false);elseensurePrestart();}}

任务的调度由父类ThreadPoolExecutor中的Worker进行触发,每个Worker是一个单独的线程,在它的RunWorker方法中会一直尝试从workQueue中获取队列头部的Task进行执行。

   final void runWorker(Worker w) {Thread wt = Thread.currentThread();Runnable task = w.firstTask;w.firstTask = null;w.unlock(); // allow interruptsboolean completedAbruptly = true;try {while (task != null || (task = getTask()) != null) {w.lock();// If pool is stopping, ensure thread is interrupted;// if not, ensure thread is not interrupted.  This// requires a recheck in second case to deal with// shutdownNow race while clearing interruptif ((runStateAtLeast(ctl.get(), STOP) ||(Thread.interrupted() &&runStateAtLeast(ctl.get(), STOP))) &&!wt.isInterrupted())wt.interrupt();try {beforeExecute(wt, task);try {task.run();afterExecute(task, null);} catch (Throwable ex) {afterExecute(task, ex);throw ex;}} finally {task = null;w.completedTasks++;w.unlock();}}completedAbruptly = false;} finally {processWorkerExit(w, completedAbruptly);}}

getTask方法为:

private Runnable getTask() {boolean timedOut = false; // Did the last poll() time out?for (;;) {int c = ctl.get();// Check if queue empty only if necessary.if (runStateAtLeast(c, SHUTDOWN)&& (runStateAtLeast(c, STOP) || workQueue.isEmpty())) {decrementWorkerCount();return null;}int wc = workerCountOf(c);// Are workers subject to culling?boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;if ((wc > maximumPoolSize || (timed && timedOut))&& (wc > 1 || workQueue.isEmpty())) {if (compareAndDecrementWorkerCount(c))return null;continue;}try {Runnable r = timed ?workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :workQueue.take();if (r != null)return r;timedOut = true;} catch (InterruptedException retry) {timedOut = false;}}}

在ScheduledThreadPoolExecutor中的workQueue则为DelayedWorkQueue。

ScheduledThreadPoolExecutor与Timer相比,在性能和成熟度方面都对Timer进行了加强,如在单jvm场景中使用ScheduledThreadPoolExecutor来实现定时任务是一个不错的选择。

quartz调度机制

ScheduledThreadPoolExecutor基于线程池实现了许多Timer所没有的特性。
Timer和ScheduledThreadPoolExecutor自带的类,在很多方面它们仍然具有很多共同点,如:

  • 任务均使用内存存储
  • 不支持集群
  • 任务的数据存储底层使用二叉堆结构

为了更适应复杂的业务场景,业界也先后诞生出了众多的定时任务框架,其中最为突出的是:至今仍被广泛应用的非quartz莫属。
quartz

其源码地址为:https://github.com/quartz-scheduler/quartz

quartz内容众多,本文仅对quartz中的trigger调度部分进行简单分析。
quartz中对于任务的存储默认也采用内存存储,实现类为RAMJobStore,除此之外也支持JDBCJobStore以将任务数据写入到数据库中。

在quartz中定义一个任务需要由Scheduler(调度器)、Job(任务)、Trigger(触发器)这3部分组成。
quartz-level

  • Job为具体需要被执行的任务
  • Trigger为任务所被期往执行的时间
  • Scheduler为任务被执行的容器组

Trigger分两种:CronTrigger与SimpleTrigger,区别为CronTrigger支持以cron表达式定义任务的执行时间。

以quartz中的SimpleTrigger和RAMJobStore为例,当提交了一个job到quartz中,它最终会被存储到对应的store中。

被执行的方法为:org.quartz.core.QuartzScheduler#scheduleJob(org.quartz.JobDetail, org.quartz.Trigger)

 public Date scheduleJob(JobDetail jobDetail,Trigger trigger) throws SchedulerException {validateState();……// 调用对应的jobStore保存此job和triggerresources.getJobStore().storeJobAndTrigger(jobDetail, trig);notifySchedulerListenersJobAdded(jobDetail);notifySchedulerThread(trigger.getNextFireTime().getTime());notifySchedulerListenersSchduled(trigger);return ft;

RAMJobStore被执行的方法为:org.quartz.simpl.RAMJobStore#storeTrigger


protected TreeSet<TriggerWrapper> timeTriggers = new TreeSet<TriggerWrapper>(new TriggerWrapperComparator());public void storeTrigger(OperableTrigger newTrigger,boolean replaceExisting) throws JobPersistenceException {TriggerWrapper tw = new TriggerWrapper((OperableTrigger)newTrigger.clone());synchronized (lock) {if (triggersByKey.get(tw.key) != null) {if (!replaceExisting) {throw new ObjectAlreadyExistsException(newTrigger);}//删除已有的重复triggerremoveTrigger(newTrigger.getKey(), false);}if (retrieveJob(newTrigger.getJobKey()) == null) {throw new JobPersistenceException("The job ("+ newTrigger.getJobKey()+ ") referenced by the trigger does not exist.");}// add to triggers by jobList<TriggerWrapper> jobList = triggersByJob.get(tw.jobKey);if(jobList == null) {jobList = new ArrayList<TriggerWrapper>(1);triggersByJob.put(tw.jobKey, jobList);}jobList.add(tw);// add to triggers by groupHashMap<TriggerKey, TriggerWrapper> grpMap = triggersByGroup.get(newTrigger.getKey().getGroup());if (grpMap == null) {grpMap = new HashMap<TriggerKey, TriggerWrapper>(100);triggersByGroup.put(newTrigger.getKey().getGroup(), grpMap);}grpMap.put(newTrigger.getKey(), tw);// add to triggers by FQN maptriggersByKey.put(tw.key, tw);if (pausedTriggerGroups.contains(newTrigger.getKey().getGroup())|| pausedJobGroups.contains(newTrigger.getJobKey().getGroup())) {tw.state = TriggerWrapper.STATE_PAUSED;if (blockedJobs.contains(tw.jobKey)) {tw.state = TriggerWrapper.STATE_PAUSED_BLOCKED;}} else if (blockedJobs.contains(tw.jobKey)) {tw.state = TriggerWrapper.STATE_BLOCKED;} else {// 将此TriggerWrapper添加到timerTriggers中timeTriggers.add(tw);}}
}

从源码中可以看出trigger最终会被添加到一个被TriggerWrapper修饰的TreeSet中,其比较器为TriggerWrapperComparator:

    class TriggerTimeComparator implements Comparator<Trigger>, Serializable {private static final long serialVersionUID = -3904243490805975570L;// This static method exists for comparator in TC clustered quartzpublic static int compare(Date nextFireTime1, int priority1, TriggerKey key1, Date nextFireTime2, int priority2, TriggerKey key2) {//先比较下次执行时间if (nextFireTime1 != null || nextFireTime2 != null) {if (nextFireTime1 == null) {return 1;}if (nextFireTime2 == null) {return -1;}if(nextFireTime1.before(nextFireTime2)) {return -1;}if(nextFireTime1.after(nextFireTime2)) {return 1;}}// 执行时间相同时比较优先级int comp = priority2 - priority1;if (comp != 0) {return comp;}return key1.compareTo(key2);}public int compare(Trigger t1, Trigger t2) {return compare(t1.getNextFireTime(), t1.getPriority(), t1.getKey(), t2.getNextFireTime(), t2.getPriority(), t2.getKey());}}

当完成了Job的存储后,其触发代码位于QuartzSchedulerThread中run中。这个方法中代码较长,简单看一下:

public void run() {int acquiresFailed = 0;while (!halted.get()) {try {// check if we're supposed to pause...synchronized (sigLock) {// ……// wait a bit, if reading from job store is consistently// failing (e.g. DB is down or restarting)..// ……int availThreadCount = qsRsrcs.getThreadPool().blockForAvailableThreads();synchronized (sigLock) {if (halted.get()) {break;}}if(availThreadCount > 0) { // will always be true, due to semantics of blockForAvailableThreads...List<OperableTrigger> triggers;long now = System.currentTimeMillis();clearSignaledSchedulingChange();try {// 调用jobStore返回一批最先被执行的任务triggers = qsRsrcs.getJobStore().acquireNextTriggers(now + idleWaitTime, Math.min(availThreadCount, qsRsrcs.getMaxBatchSize()), qsRsrcs.getBatchTimeWindow());acquiresFailed = 0;if (log.isDebugEnabled())log.debug("batch acquisition of " + (triggers == null ? 0 : triggers.size()) + " triggers");} catch (JobPersistenceException jpe) {// ……}if (triggers != null && !triggers.isEmpty()) {now = System.currentTimeMillis();long triggerTime = triggers.get(0).getNextFireTime().getTime();long timeUntilTrigger = triggerTime - now;// ……boolean goAhead = true;synchronized(sigLock) {goAhead = !halted.get();}if(goAhead) {try {// 标记为正在执行List<TriggerFiredResult> res = qsRsrcs.getJobStore().triggersFired(triggers);if(res != null)bndles = res;} catch (SchedulerException se) {// ……}}for (int i = 0; i < bndles.size(); i++) {TriggerFiredResult result =  bndles.get(i);TriggerFiredBundle bndle =  result.getTriggerFiredBundle();Exception exception = result.getException();……JobRunShell shell = null;try {// 创建job执行的RunShellshell = qsRsrcs.getJobRunShellFactory().createJobRunShell(bndle);shell.initialize(qs);} catch (SchedulerException se) {// 运行出错,标记为已完成,并标记为运行异常        qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(), CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR);continue;}// 运行此JOBif (qsRsrcs.getThreadPool().runInThread(shell) == false) {// this case should never happen, as it is indicative of the// scheduler being shutdown or a bug in the thread pool or// a thread pool being used concurrently - which the docs// say not to do...getLog().error("ThreadPool.runInThread() return false!");qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(), CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR);}}continue; // while (!halted)}} else { // if(availThreadCount > 0)// should never happen, if threadPool.blockForAvailableThreads() follows contractcontinue; // while (!halted)}// ……} catch(RuntimeException re) {getLog().error("Runtime error occurred in main trigger firing loop.", re);}} // while (!halted)// drop references to scheduler stuff to aid garbage collection...qs = null;qsRsrcs = null;
}

runInThread最终调用的类为org.quartz.simpl.SimpleThreadPool.WorkerThread中的run方法。

public void run() {boolean ran = false;while (run.get()) {try {synchronized(lock) {while (runnable == null && run.get()) {lock.wait(500);}if (runnable != null) {ran = true;// 调用JOB的runrunnable.run();}}} catch (InterruptedException unblock) {// ……}}
}

以上便是quartz中关于一个job存储和调度的具体代码。quartz细节过多且非常庞大这里仅看了下核心片段部分,总结一下:

  • 在RAMJobStore中,任务在被添加时会被放入一个红黑树中,放入的顺序为先以最先执行时间判断,再以优先级判断。
  • quartz中的任务调度会由schedule中的QuartzSchedulerThread持续从JobStore中取出job放入到worker线程中执行。

时间轮算法

通过前文的了解,从timer、ScheduledExecutorService到quartz,如仅从底层存储的数据结构进行划分,存放定时任务的数据结构有二叉堆、红黑树。
在二叉堆与红黑树中,新增一个节点时间复杂度均为:O(logn),当需要处理的定时任务较多时,则性能也会随之降低。

那么,是否存在一种算法即便面对数量众多的定时任务,调度的复杂度也能很低?

Timing Wheel Algorithm–时间轮算法,这便是接下来要回顾的内容。
timerWheel

关于时间轮的介绍文章也非常多,简单理解:它是一个由时间槽和链表构成的数据结构。
每个时间槽中有一个数值,时钟每拨动一次,当前时间的指针也随之转动,时间槽中数值的单位决定了这个时间轮的精度。在定时任务场景中,每个时间槽内由一个存放了Task的链表组成,时钟指向某个时间槽时,则代表该槽内满足运行条件的task可以被运行。

在新的Task需要被新增时,根据当前时间槽定计算出新Task应该被放置的位置,并添加到该槽的链表中。这点与HashMap很类似,新增节点的时间复杂度近似O(1)。

多级时间轮

前面描述的时间轮是单轮时间轮。以上图单轮12格、每秒移动一格为例,能实现的最长周期为12秒,如想要实现一分钟内的倒计时周期则需要将时间槽调整为60格,更高精度则需要将轮子的周期继续扩充,以此类推。

尽管通过增加槽数可以实现更多粒度的控制,但它并不是一种好的解决方式,毕竟槽数的增加也会让空间占用同比上升,较长延迟的任务也无法实现。

为此,一种类似水表时间轮便诞生了——多级时间轮
mulLevelWheel

在多级时间轮中,用多个单时间轮构成一个整体上的逻辑时间轮,每个时间轮具有不同的刻度,刻度小的满一卷后更高一级刻度的轮子进一格,以此类推。

多层级时间轮

除了多级时间轮外,还有另一种类似的时间轮——多层时间轮
mulCircleWheel

工作机制与手表类似,最外层指针跑满一卷后内层指针前进一格,以此类推。

多层多级时间轮对比

与多级时间轮相比,多层时间轮实现所需的数据结构上仅需要一个大的单轮即可,可以节约更多的存储空间。

一般来讲,多层时间轮侧重于在单一时间轮内通过多层次结构(如链表)管理任务,提高时间槽内的任务调度效率,比较适合任务间隔较小且频繁的场景。

如果需要处理大跨度的任务,则更适合使用多级时间轮。

netty时间轮

上面对时间轮的理论知识进行了介绍,接下来看一下使用“多级时间轮”在netty框架中的实际应用。

HashedWheelTimer用法

HashedWheelTimer maintains a data structure called ‘wheel’. To put simply, a wheel is a hash table of TimerTasks whose hash function is ‘dead line of the task’.

HashedWheelTimer是netty中实现的时间轮,使用一个哈希表来存储每个task的信息。

在编程和计算机科学中,哈希函数是一种将任意长度的数据(如字符串或数字)映射到固定长度(如较小的整数)的算法。

以实现1秒后延迟输出信息为例,其代码为:

  final HashedWheelTimer timer = new HashedWheelTimer();//延迟1秒执行任务timer.newTimeout(new TimerTask() {@Overridepublic void run(Timeout timeout) throws Exception {System.out.println("Task executed after 1 second delay");}}, 1, TimeUnit.SECONDS);

以实现每隔3秒输出信息为例,其代码为:

//每3秒输出当前时间
timer.newTimeout(new TimerTask() {@Overridepublic void run(Timeout timeout) throws Exception {System.out.println("now=" + System.currentTimeMillis());//再次提交,实现循环定时执行timer.newTimeout(this, 3, TimeUnit.SECONDS);}
}, 3, TimeUnit.SECONDS);

运行结果为:nettyTimeWheelDemo

代码非常简短也很有效。
new一个HashedWheelTimer,并使用newTimeout传入所需要执行的TimerTask和延迟时间即可。

HashedWheelTimer类图总览

netty-timerWheel-sum

以上为HashedWheelTimer类结构总览图。需要关注的关键信息有:

HashedWheelTimer,时间轮:

  • wheel,时间轮数组,由多个时间槽(HashedWheelBucket)构成,即一个wheel内有多个HashedWheelBucket
  • taskExecutor,执行时间槽内任务的线程池
  • workerThread,时间轮调度线程
  • worker,在workerThread中执行的具体类,负责对时间轮和里面的任务进行调度
  • timeouts,存放新提交任务的队列,实际入槽由worker执行时触发
  • startTime,时间轮首次转动的时间,单位为纳秒

TimerTask,被提交到时间轮中的任务,有且仅有一个run方法,用于执行具体业务

HashedWheelTimeout,包装TimerTask的类:

  • task,即具体执行任务的TimerTask
  • next,邻居1号,同一个时间槽队列中的后一个HashedWheelTimeout
  • prev,邻居2号,同一个时间槽队列中的前一个HashedWheelTimeout
  • remainingRounds,剩余层数,0层时且时间槽匹配就会被执行
  • deadline,task应该被执行的相对时间
  • bucket,此HashedWheelTimeout所处的时间槽,位于哪个HashedWheelBucket内
  • expire方法,提交本task任务到线程池

HashedWheelBucket,时间槽,管理HashedWheelTimeout的容器:

  • head,HashedWheelTimeout队列的第一个
  • tail,HashedWheelTimeout队列的最后一个
  • expireTimeouts方法,时间指针指向该时间槽时,对该槽内的HashedWheelTimeout任务提交到线程池或层数减一

这里对HashedWheelTimer整体进行重点总览,在下文中将对HashedWheelTimer的详细实现进行介绍。

HashedWheelTimer构造方法

HashedWheelTimer提供了多个构造方法,一般用最简单的无参构造函数就行,所涉及到的源码如下:

    public HashedWheelTimer() {this(Executors.defaultThreadFactory());}public HashedWheelTimer(ThreadFactory threadFactory) {// 精度为100毫秒,即0.1秒this(threadFactory, 100, TimeUnit.MILLISECONDS);}public HashedWheelTimer(ThreadFactory threadFactory, long tickDuration, TimeUnit unit) {// 指定512个时间槽,一圈51.2秒this(threadFactory, tickDuration, unit, 512);}public HashedWheelTimer(ThreadFactory threadFactory,long tickDuration, TimeUnit unit, int ticksPerWheel) {this(threadFactory, tickDuration, unit, ticksPerWheel, true);}public HashedWheelTimer(ThreadFactory threadFactory,long tickDuration, TimeUnit unit, int ticksPerWheel, boolean leakDetection) {this(threadFactory, tickDuration, unit, ticksPerWheel, leakDetection, -1);}public HashedWheelTimer(ThreadFactory threadFactory,long tickDuration, TimeUnit unit, int ticksPerWheel, boolean leakDetection,long maxPendingTimeouts) {this(threadFactory, tickDuration, unit, ticksPerWheel, leakDetection,maxPendingTimeouts, ImmediateExecutor.INSTANCE);}public HashedWheelTimer(ThreadFactory threadFactory,long tickDuration, TimeUnit unit, int ticksPerWheel, boolean leakDetection,long maxPendingTimeouts, Executor taskExecutor) {checkNotNull(threadFactory, "threadFactory");checkNotNull(unit, "unit");checkPositive(tickDuration, "tickDuration");checkPositive(ticksPerWheel, "ticksPerWheel");this.taskExecutor = checkNotNull(taskExecutor, "taskExecutor");// Normalize ticksPerWheel to power of two and initialize the wheel.// 创建时间轮wheel = createWheel(ticksPerWheel);mask = wheel.length - 1;// Convert tickDuration to nanos.long duration = unit.toNanos(tickDuration);// Prevent overflow.if (duration >= Long.MAX_VALUE / wheel.length) {throw new IllegalArgumentException(String.format("tickDuration: %d (expected: 0 < tickDuration in nanos < %d",tickDuration, Long.MAX_VALUE / wheel.length));}if (duration < MILLISECOND_NANOS) {logger.warn("Configured tickDuration {} smaller than {}, using 1ms.",tickDuration, MILLISECOND_NANOS);this.tickDuration = MILLISECOND_NANOS;} else {this.tickDuration = duration;}workerThread = threadFactory.newThread(worker);leak = leakDetection || !workerThread.isDaemon() ? leakDetector.track(this) : null;this.maxPendingTimeouts = maxPendingTimeouts;if (INSTANCE_COUNTER.incrementAndGet() > INSTANCE_COUNT_LIMIT &&WARNED_TOO_MANY_INSTANCES.compareAndSet(false, true)) {reportTooManyInstances();}}private static HashedWheelBucket[] createWheel(int ticksPerWheel) {ticksPerWheel = MathUtil.findNextPositivePowerOfTwo(ticksPerWheel);// 给时间轮的槽赋值HashedWheelBucket[] wheel = new HashedWheelBucket[ticksPerWheel];for (int i = 0; i < wheel.length; i ++) {wheel[i] = new HashedWheelBucket();}return wheel;}

关键信息有:

  • HashedWheelTimer默认的构造方法创建了1个包含有512个槽位的时间轮,每个槽位的时间间隔为0.1秒,即一个时间轮的最长周期为51.2秒
  • 指定了运行提交任务的线程池为ImmediateExecutor.INSTANCE,即在当前调用的线程中执行任务
  • 创建了一个worker线程,用于管理此时间轮中的所有任务

HashedWheelTimer调度原理

需要注意的是,HashedWheelTimer仅对时间轮进行了创建,并未对任务进行实际的调度。

一个HashedWheelTimer的实际调度,由首次调用newTimeout方法时触发,源码如下:

	// 创建一个多【生产者】单【消费者】的队列,用来存放具体的Timeout任务private final Queue<HashedWheelTimeout> timeouts = PlatformDependent.newMpscQueue();public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {// ……// 确保work线程已启动,如未启动则启动start();// Add the timeout to the timeout queue which will be processed on the next tick.// During processing all the queued HashedWheelTimeouts will be added to the correct HashedWheelBucket.// 计算出此任务的deadline;此任务运行需等待时长=当前时间+延迟时间-轮子首次转动时间long deadline = System.nanoTime() + unit.toNanos(delay) - startTime;// Guard against overflow.if (delay > 0 && deadline < 0) {deadline = Long.MAX_VALUE;}// 将当前task封装为HashedWheelTimeout,并添加到timeouts队列中HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline);timeouts.add(timeout);return timeout;}public void start() {switch (WORKER_STATE_UPDATER.get(this)) {case WORKER_STATE_INIT:if (WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_INIT, WORKER_STATE_STARTED)) {// 如work线程未启动则进行启动,让轮子转起来workerThread.start();}break;case WORKER_STATE_STARTED:break;case WORKER_STATE_SHUTDOWN:throw new IllegalStateException("cannot be started once stopped");default:throw new Error("Invalid WorkerState");}// Wait until the startTime is initialized by the worker.while (startTime == 0) {try {startTimeInitialized.await();} catch (InterruptedException ignore) {// Ignore - it will be ready very soon.}}}

workerThread.start()则执行的是io.netty.util.HashedWheelTimer.Worker中的run方法。

它负责时间轮的持续转动及对任务的调度执行,源码如下:

public void run() {// 对startTime进行初始化,设置为轮子首次转动的时间戳startTime = System.nanoTime();// ……// Notify the other threads waiting for the initialization at start().startTimeInitialized.countDown();do {// 嘀嗒,sleep间隔时间并得到当前deadline,deadline=System.nanoTime()-startTimefinal long deadline = waitForNextTick();if (deadline > 0) {// 使用位运算得到当前idx,mask=wheel.length-1,wheel.length是2的N次幂,mask是全1的二进制数int idx = (int) (tick & mask);// 处理已被取消的任务processCancelledTasks();// 拿到当前指针指向的时间槽HashedWheelBucket bucket =wheel[idx];// 将刚提交的任务分配到时间槽上transferTimeoutsToBuckets();// 执行当前时间槽中满足条件的任务;槽数+层数均匹配就执行此taskbucket.expireTimeouts(deadline);tick++;}} while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_STARTED);// Fill the unprocessedTimeouts so we can return them from stop() method.for (HashedWheelBucket bucket: wheel) {bucket.clearTimeouts(unprocessedTimeouts);}for (;;) {HashedWheelTimeout timeout = timeouts.poll();if (timeout == null) {break;}if (!timeout.isCancelled()) {unprocessedTimeouts.add(timeout);}}processCancelledTasks();}

上面代码中会进行一个死循环让时间指针滴答滴答转动起来,每到达一个时间槽时会让新提交的task进行入槽。入槽流程代码如下:

private void transferTimeoutsToBuckets() {// transfer only max. 100000 timeouts per tick to prevent a thread to stale the workerThread when it just// adds new timeouts in a loop.for (int i = 0; i < 100000; i++) {HashedWheelTimeout timeout = timeouts.poll();if (timeout == null) {// all processedbreak;}if (timeout.state() == HashedWheelTimeout.ST_CANCELLED) {// Was cancelled in the meantime.continue;}// 需要等待的槽数=任务运行需等待时长/每个槽的间隔时长long calculated = timeout.deadline / tickDuration;// 需要等待的层数=(需要等待的槽数-已走过的槽数)/总槽数timeout.remainingRounds = (calculated - tick) / wheel.length;final long ticks = Math.max(calculated, tick); // Ensure we don't schedule for past.// 此任务应运行的槽值int stopIndex = (int) (ticks & mask);// 拿到时间槽,并放到该槽的末尾HashedWheelBucket bucket = wheel[stopIndex];bucket.addTimeout(timeout);}
}

waitForNextTick为时间指针等待的间隔方法,代码如下:

private long waitForNextTick() {// 计算出指向下一个时间槽的相对时间long deadline = tickDuration * (tick + 1);for (;;) {// 得到此时间轮的当前时间final long currentTime = System.nanoTime() - startTime;// 计算出还应该等待的时长,理论时间-时间时间则为应等待的时间。此处+999999/1000000的目的是为了向上取整long sleepTimeMs = (deadline - currentTime + 999999) / 1000000;if (sleepTimeMs <= 0) {// 不需要等待了,则直接返回当前时间if (currentTime == Long.MIN_VALUE) {return -Long.MAX_VALUE;} else {return currentTime;}}// Check if we run on windows, as if thats the case we will need// to round the sleepTime as workaround for a bug that only affect// the JVM if it runs on windows.//// See https://github.com/netty/netty/issues/356if (PlatformDependent.isWindows()) {sleepTimeMs = sleepTimeMs / 10 * 10;if (sleepTimeMs == 0) {sleepTimeMs = 1;}}try {// 等待一下,时间到了再指向下一个时间槽Thread.sleep(sleepTimeMs);} catch (InterruptedException ignored) {if (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_SHUTDOWN) {return Long.MIN_VALUE;}}}
}

在worker中指向某个时间槽时,会将该槽内的所有任务过一便,该执行的就执行,该取消的取消,该减层数的减层数。expireTimeouts是HashedWheelBucket的方法,代码如下:

public void expireTimeouts(long deadline) {// 从该时间槽内的链表头部开始HashedWheelTimeout timeout = head;// process all timeoutswhile (timeout != null) {// 迭代链表中的每个task节点HashedWheelTimeout next = timeout.next;if (timeout.remainingRounds <= 0) {// 此task位于最外层,则将其从队列中移除next = remove(timeout);if (timeout.deadline <= deadline) {// task应该执行时间位于当前时间前,调用expire方法运行此tasktimeout.expire();} else {// The timeout was placed into a wrong slot. This should never happen.throw new IllegalStateException(String.format("timeout.deadline (%d) > deadline (%d)", timeout.deadline, deadline));}} else if (timeout.isCancelled()) {//此task已被取消,从链表中移除next = remove(timeout);} else {//让内层的task向外移动一层,距离触发又近了一圈timeout.remainingRounds --;}// 链表迭代timeout = next;}
}public void expire() {// expire代表此task已经可以被执行了if (!compareAndSetState(ST_INIT, ST_EXPIRED)) {return;}try {// 将此task提交线程池中执行timer.taskExecutor.execute(this);} catch (Throwable t) {if (logger.isWarnEnabled()) {logger.warn("An exception was thrown while submit " + TimerTask.class.getSimpleName()+ " for execution.", t);}}
}

最后想说
“程序=算法+数据结构。”
不同的算法与数据结构有着它独特的美,在实际业务运用时也需要从具体的业务出发进行多维度分析,选择一个底层实现最适合的框架,以让您的业务场景运行起来速度又快占用空间又少,岂不美哉。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/24250.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Vue项目安装axios报错npm error code ERESOLVE npm error ERESOLVE could not resolve解决方法

在Vue项目中安装axios时报错 解决方法&#xff1a;在npm命令后面加--legacy-peer-deps 例如&#xff1a;npm install axios --save --legacy-peer-deps 因为别的需求我把node版本重装到了最新版&#xff08;不知道是不是这个原因&#xff09;&#xff0c;后来在项目中安装axi…

在推荐四款软件卸载工具,让流氓软件无处遁形

Revo Uninstaller Revo Uninstaller是一款电脑软件、浏览器插件卸载软件&#xff0c;目前已经有了17年的历史了。可以扫描所有window用户卸载软件后的残留物&#xff0c;并及时清理&#xff0c;避免占用电脑空间。 Revo Uninstaller可以通过命令行卸载软件&#xff0c;可以快速…

前端生成海报图技术选型与问题解决

作者&#xff1a;vivo 互联网大前端团队 - Tian Yuhan 本篇文章主要聚焦海报图分享这个形式&#xff0c;探讨纯前端在H5&小程序内&#xff0c;合成海报到下载到本地、分享至社交平台整个流程中可能遇到的问题&#xff0c;以及如何解决。 一、引言 绝大多数的电商平台都会…

30、matlab现代滤波:维纳滤波/LMS算法滤波/小波变换滤波

1、信号1和信号2的维纳滤波 实现代码 N 2000; %采样点数 Fs 2000; %采样频率 t 0:1 / Fs:1 - 1 / Fs; %时间序列 Signal1 sin(2*pi*20* t) sin(2*pi*40* t) sin(2*pi*60* t); Signal2[2*ones(1,50),zeros(1,50),-1*ones(1,100),zeros(1,50),-2*ones(1,50),zeros(1,50),1…

【C语言】05.数组

一、数组的概念 本文来介绍数组&#xff0c;首先我们需要了解数组是什么&#xff1f; 数组是⼀组相同类型元素的集合。 • 数组中存放的是1个或者多个数据&#xff0c;但是数组元素个数不能为0。 • 数组中存放的多个数据&#xff0c;类型是相同的。 数组分为⼀维数组和多维数组…

Go源码--sync库(2)

简介 这边文章主要讲解 Sync.Cond和Sync.Rwmutex Sync.Cond 简介 sync.Cond 经常用来处理 多个协程等待 一个协程通知 这种场景&#xff0c; 主要 是阻塞在某一协程中 等待被另一个协程唤醒 继续执行 这个协程后续的功能。cond经常被用来协调协程对某一资源的访问 ants协程池…

Win10 Edge提示兼容性问题打不开|解决浏览器兼容性问题

Edge有时候会与某些安全软件不兼容&#xff0c;导致报错 报错代码&#xff1a;STATUS_INVALID_IMAGE_HASH 解决Edge浏览器兼容性问题方法/步骤&#xff1a; 1、按 Win R 组合键&#xff0c;打开运行&#xff0c;并输入 regedit 命令&#xff0c;确定或回车&#xff0c;可以…

SAP ERP系统主要模块简介

SAP系统通过提供一系列高度灵活的模块&#xff0c;满足企业在不同业务领域的需求。这些模块不仅功能齐全且相对独立&#xff0c;但它们之间又能紧密协作&#xff0c;共同构筑一个协同高效的工作环境。 财务会计&#xff08;FI&#xff09;模块 它涵盖了总账、应收账款、应付账…

DexCap——斯坦福李飞飞团队泡茶机器人:更好数据收集系统的原理解析、源码剖析

前言 2023年7月&#xff0c;我司组建大模型项目开发团队&#xff0c;从最开始的论文审稿&#xff0c;演变成目前的两大赋能方向 大模型应用方面&#xff0c;以微调和RAG为代表 除了论文审稿微调之外&#xff0c;目前我司内部正在逐一开发论文翻译、论文对话、论文idea提炼、论…

k8s:优雅关闭pod的简单例子

先通过Dockerfile创建一个image vim Dockerfie <<<< 内容如下&#xff1a; FROM centosRUN sed -i -e "s|mirrorlist|#mirrorlist|g" /etc/yum.repos.d/CentOS-* RUN sed -i -e "s|#baseurlhttp://mirror.centos.org|baseurlhttp://vault.centos.o…

Qsemaphore

Qsemaphore 实现 给while循环阻塞延时 基本思路就是&#xff1a; whlie循环里面 通过m&#xff3f;bthreadFlag&m_bStatus这两个标志位&#xff0c;判断是否进入while循环&#xff0c;再根据40行的acquire&#xff08;&#xff09;来阻塞循环&#xff0c;因为定时器的槽函数…

SQL Server数据库xp_cmdshell提权笔记

文章目录 一、简介二、搭建环境三、利用条件1、查询 xp_cmdshell 是否开启&#xff0c;返回为1则证明存在2、判断权限是不是sa&#xff0c;回是1说明是sa3、开启xp_cmdshell4、关闭xp_cmdshell 四、获取数据库权限1、成功获取sqlserver&#xff0c;进行登陆2、开启xp_cmdshell权…

代码随想录算法训练营第31天(py)| 贪心 | 455.分发饼干、376. 摆动序列、53. 最大子序和

455.分发饼干 力扣链接 假设你是一位很棒的家长&#xff0c;想要给你的孩子们一些小饼干。但是&#xff0c;每个孩子最多只能给一块饼干。 对每个孩子 i&#xff0c;都有一个胃口值 g[i]&#xff0c;这是能让孩子们满足胃口的饼干的最小尺寸&#xff1b;并且每块饼干 j&#…

Docker|了解容器镜像层(1)

引言 容器非常神奇。它们允许简单的进程表现得像虚拟机。在这种优雅的底层是一组模式和实践&#xff0c;最终使一切运作起来。在设计的根本是层。层是存储和分发容器化文件系统内容的基本方式。这种设计既出人意料地简单&#xff0c;同时又非常强大。在今天的帖子[1]中&#xf…

29网课交单平台 epay.php SQL注入漏洞复现

0x01 产品简介 29网课交单平台是一个专注于在线教育和知识付费领域的交单平台。该平台基于PHP开发,通过全开源修复和优化,为用户提供了高效、稳定、安全的在线学习和交易环境。作为知识付费系统的重要组成部分,充分利用了互联网的优势,为用户提供了便捷的支付方式、高效的…

继承-进阶

父子类成员共享 普通成员对象/父子间不共享&#xff0c; 成员独立 函数成员共享&#xff08;函数不存储在对象中&#xff09; 子类由两部分构成&#xff1a;父类中继承的成员和子类中新定义成员 继承方式 子类中存在父类private成员但不可直接访问&#xff08;及时在类中&am…

微信如何防止被对方拉黑删除?一招教你解决!文末附软件!

你一定不知道&#xff0c;微信可以防止被对方拉黑删除&#xff0c;秒变无敌。只需一招就能解决&#xff01;赶快来学&#xff01;文末有惊喜&#xff01; 惹到某些重要人物&#xff08;比如女朋友&#xff09;&#xff0c;被删除拉黑一条龙&#xff0c;那真的是太令人沮丧了&a…

加密经济浪潮:探索Web3对金融体系的颠覆

随着区块链技术的快速发展&#xff0c;加密经济正在成为全球金融领域的一股新的浪潮。而Web3作为下一代互联网的代表&#xff0c;以其去中心化、可编程的特性&#xff0c;正深刻影响着传统金融体系的格局和运作方式。本文将深入探讨加密经济对金融体系的颠覆&#xff0c;探索We…

机器学习实验----支持向量机(SVM)实现二分类

目录 一、介绍 (1)解释算法 (2)数据集解释 二、算法实现和代码介绍 1.超平面 2.分类判别模型 3.点到超平面的距离 4.margin 间隔 5.拉格朗日乘数法KKT不等式 (1)介绍 (2)对偶问题 (3)惩罚参数 (4)求解 6.核函数解决非线性问题 7.SMO (1)更新w (2)更新b 三、代…

此表单不安全,因此系统已关闭自动填充功能

问题截图&#xff1a; 截图就不放了&#xff0c;公司的系统不方便&#xff0c;就是form表单会有个提示“此表单不安全&#xff0c;因此系统已关闭自动填充功能” 解决思路&#xff1a; 1、问题原因 使用https访问&#xff0c;但表单提交地址是http的 2、查看表单配置 表单…