目录
在许多业务场景中,我们都会碰到延迟任务,定时任务这种需求。特别的,在网络连接的场景中,常常会出现一些超时控制。由于服务端的连接数量很大,这些超时任务的数量往往也是很庞大的。实现对大量任务的超时管理并不是一个容易的事情。
本章我们将介绍几种用于实现超时任务的数据结构,并且最后分析 Netty 在超时任务上采取的结构和代码。
欢迎加入技术交流群186233599讨论交流,也欢迎关注笔者公众号:风火说。
JDK 在 1.3 的时候引入了Timer
数据结构用于实现定时任务。Timer
的实现思路比较简单,其内部有两个主要属性:
TaskQueue
:定时任务抽象类TimeTask
的列表。TimerThread
:用于执行定时任务的线程。Timer
结构还定义了一个抽象类TimerTask
并且继承了Runnable
接口。业务系统实现了这个抽象类的run
方法用于提供具体的延时任务逻辑。
TaskQueue
内部采用大顶堆的方式,依据任务的触发时间进行排序。而TimerThread
则以死循环的方式从TaskQueue
获取队列头,等待队列头的任务的超时时间到达后触发该任务,并且将任务从队列中移除。
Timer
的数据结构和算法都很容易理解。所有的超时任务都首先进入延时队列。后台超时线程不断的从延迟队列中获取任务并且等待超时时间到达后执行任务。延迟队列采用大顶堆排序,在延迟任务的场景中有三种操作,分别是:添加任务,提取队列头任务,查看队列头任务。
查看队列头任务的事件复杂度是 O(1) 。而添加任务和提取队列头任务的时间复杂度都是 O(Log2n) 。当任务数量较大时,添加和删除的开销也是比较大的。此外,由于Timer
内部只有一个处理线程,如果有一个延迟任务的处理消耗了较多的时间,会对应的延迟后续任务的处理。
由于Timer
只有一个线程用来处理延迟任务,在任务数量很多的时候显然是不足够的。在 JDK1.5 引入线程池接口ExecutorService
后,也对应的提供了一个用于处理延时任务的ScheduledExecutorService
子类接口。该接口内部也一样使用了一个使用小顶堆进行排序的延迟队列存放任务。线程池中的线程会在这个队列上等待直到有任务可以提取。
ScheduledExecutorService
的实现上有一些特殊,只有一个线程能够提取到延迟队列头的任务,并且根据任务的超时时间进行等待。在这个等待期间,其他的线程是无法获取任务的。这样的实现是为了避免多个线程同时获取任务,导致超时时间未到达就任务触发或者在等待任务超时时间时有新的任务被加入而无法响应。
由于ScheduledExecutorService
可以使用多个线程,这样也缓解了因为个别任务执行时间长导致的后续任务被阻塞的情况。不过延迟队列也是一样采用小顶堆的排序方式,因此添加任务和删除任务的时间复杂度都是 O(Log2n) 。在任务数量很大的情况下,性能表现比较差。
虽然Timer
和ScheduledThreadPoolExecutor
都提供了对延迟任务的支撑能力,但是由于新增任务和提取任务的时间复杂度都是 O(Log2n) ,在任务数量很大,比如几万,十几万的时候,性能的开销就变得很巨大。
那么,是否存在新增任务和提取任务比 O(Log2n) 复杂度更低的数据结构呢?答案是存在的。在论文《Hashed and Hierarchical Timing Wheels》中设计了一种名为时间轮( Timing Wheels )的数据结构,这种结构在处理延迟任务时,其新增任务和删除任务的时间复杂度降低到了 O(1) 。
时间轮的数据结构很类似于我们钟表上的数据指针,故而得名时间轮。其数据结构用图示意如下
每一个时间“格子”我们称之为槽位,槽位中存放着延迟任务队列。槽位本身代表着一个时间单位,比如 1 秒。时间轮拥有的槽位个数就是该时间轮能够处理的最大延迟跨度的任务,槽位的时间单位代表着时间轮的精度。这意味着小于时间单位的时间在该时间轮是无法被区分的。
槽位上的延迟任务队列中的任务都有相同的延迟时间。每一个单位时间,指针都会移动到下一个槽位。当指针指向某一个槽位时,该槽位的延迟任务队列中的任务都会被触发。
当有一个延迟任务要插入时间轮时,首先计算其延迟时间与单位时间的余值,从指针指向的当前槽位移动余值的个数槽位,就是该延迟任务需要被放入的槽位。
举个例子,时间轮有8个槽位,编号为 0 ~ 7 。指针当前指向槽位 2 。新增一个延迟时间为 4 秒的延迟任务,4 % 8 = 4,因此该任务会被插入 4 + 2 = 6,也就是槽位6的延迟任务队列。
时间轮的槽位实现可以采用循环数组的方式达成,也就是让指针在越过数组的边界后重新回到起始下标。概括来说,可以将时间轮的算法描述为
用队列来存储延迟任务,同一个队列中的任务,其延迟时间相同。用循环数组的方式来存储元素,数组中的每一个元素都指向一个延迟任务队列。
有一个当前指针指向数组中的某一个槽位,每间隔一个单位时间,指针就移动到下一个槽位。被指针指向的槽位的延迟队列,其中的延迟任务全部被触发。
在时间轮中新增一个延迟任务,将其延迟时间除以单位时间得到的余值,从当前指针开始,移动余值对应个数的槽位,就是延迟任务被放入的槽位。
基于这样的数据结构,插入一个延迟任务的时间复杂度就下降到 O(1) 。而当指针指向到一个槽位时,该槽位连接的延迟任务队列中的延迟任务全部被触发。
延迟任务的触发和执行不应该影响指针向后移动的时间精确性。因此一般情况下,用于移动指针的线程只负责任务的触发,任务的执行交由其他的线程来完成。比如,可以将槽位上的延迟任务队列放入到额外的线程池中执行,然后在槽位上新建一个空白的新的延迟任务队列用于后续任务的添加。
在基本原理中我们分析了时间轮的基础结构。不过当时我们假设需要插入的延迟任务的时间不会超过时间轮的长度,也就是说每一个槽位上的延迟任务队列中的任务的延迟时间都是相同的。
在这种情况下,要支持更大时间跨度的延迟任务,要么增加时间轮的槽位数,要么减少时间轮的精度,也就是每一个槽位代表的单位时间。时间轮的精度显然是一个业务上的硬性要求,那么只能增加槽位数。假设要求精度为 1 秒,要能支持延迟时间为 1 天的延迟任务,时间轮的槽位数需要 60 × 60 × 24 = 86400 。这就需要消耗更多的内存。显然,单纯增加槽位数并不是一个好的解决方案。
在论文中,针对大跨度的延迟任务支持,提供了两种扩展方案。
在该方案中,算法引入了“轮次”的概念,延迟任务的延迟时间除以时间轮长度得到的商值为轮次。延迟任务的延迟时间除以时间轮长度得到的余数为要插入的槽位偏移量。
当插入延迟任务时首先计算轮次和槽位偏移量,通过槽位偏移量确定延迟任务插入的槽位。当指针指向某一个槽位时,对槽位指向的延迟任务队列进行遍历,其中轮次为0的延迟任务全部触发,其余任务则等待下一个周期。
通过引入轮次,就可以在有限的槽位上支持无穷时间范围的延迟任务。但是虽然插入任务的时间复杂度仍然是 O(1) ,但是在延迟任务触发时却需要遍历延迟任务队列来确认其轮次是否为0。任务触发时的时间复杂却上升为了 O(n) 。
对于这个情况,还有一个变化的细节可以采用,就是将延迟任务队列按照轮次进行排序,比方说使用小顶堆对延迟任务队列进行排序。这样,当指针指向一个槽位触发延迟任务时,只需要不断的从队列头取出任务进行轮次检查,一旦任务轮次不等于0就可以停止。任务触发的时间复杂度下降为 O(1) 。对应的,由于队列是排序的了,任务插入的时候除了需要定位插入的槽位,还需要定位在队列中的插入位置。插入的时间复杂度变化为 O(1) 和 O(Log2n) ,n 为该槽位上延迟任务队列的长度。
看看手表的设计,有秒针,分针,时针。像秒针与分针,虽然都有 60 格 ,但是各自的格子代表的时间长度不同。参考这个思路,我们可以声明多个不同层级的时间轮,每一个时间轮的槽位的时间跨度是其次级时间轮的整体时间范围。
当低层级的时间轮的指针完整的走完一圈,其对应的高层级时间轮对应的移动一个槽位。并且高层级时间轮指针指向的槽位中的任务按照延迟时间计算,重新放入到低层级时间轮的不同槽位中。这样的方式,保证了每一个时间轮中的每一个槽位的延迟任务队列中的任务都具备相同时间精度的延迟时间。
以精度为 1 秒,时间范围为 1 天的时间轮为例子,可以设计三级时间轮:秒级时间轮有 60 个槽位,每个槽位的时间为 1 秒;分钟级时间轮有 60 个槽位,每个槽位的时间为 60 秒;小时级时间轮有24个槽位,每个槽位的时间为 60 分钟。当秒级时间轮走完 60 秒后,秒级时间轮的指针再次指向下标为0的槽位,而分钟级时间轮的指针向后移动一个槽位,并且将该槽位上的延迟任务全部取出并且重新计算后放入秒级时间轮。
总共只需要 60 + 60 + 24 = 144 个槽位即可支撑。对比上面提到的单级时间轮需要 86400 个槽位而言,节省了相当的内存。
层级时间轮有两种常见的做法:
时间轮算法的核心思想就是通过循环数组和指针移动的方式,将新增延迟任务的时间复杂度下降到 O(1) ,但是在具体实现上,包括如何处理更大时间跨度的延迟任务上,各家不同的实现都会有一些细节上的变化。下面我们以 Netty 中都时间轮实现为例子来进行代码分析。
Netty 的实现自定义了一个超时器的接口io.netty.util.Timer
,其方法如下
public interface Timer
{
//新增一个延时任务,入参为定时任务TimerTask,和对应的延迟时间
Timeout newTimeout(TimerTask task, long delay, TimeUnit unit);
//停止时间轮的运行,并且返回所有未被触发的延时任务
Set < Timeout > stop();
}
public interface Timeout
{
Timer timer();
TimerTask task();
boolean isExpired();
boolean isCancelled();
boolean cancel();
}
Timeout
接口是对延迟任务的一个封装,其接口方法说明其实现内部需要维持该延迟任务的状态。后续我们分析其实现内部代码时可以更容易的看到。
Timer
接口有唯一实现HashedWheelTimer
。首先来看其构造方法,如下
public HashedWheelTimer(ThreadFactory threadFactory, long tickDuration, TimeUnit unit, int ticksPerWheel, boolean leakDetection, long maxPendingTimeouts)
{
//省略代码,省略参数非空检查内容。
wheel = createWheel(ticksPerWheel);
mask = wheel.length - 1;
//省略代码,省略槽位时间范围检查,避免溢出以及小于 1 毫秒。
workerThread = threadFactory.newThread(worker);
//省略代码,省略资源泄漏追踪设置以及时间轮实例个数检查
}
首先是方法createWheel
,用于创建时间轮的核心数据结构,循环数组。来看下其方法内容
private static HashedWheelBucket[] createWheel(int ticksPerWheel)
{
//省略代码,确认 ticksPerWheel 处于正确的区间
//将 ticksPerWheel 规范化为 2 的次方幂大小。
ticksPerWheel = normalizeTicksPerWheel(ticksPerWheel);
HashedWheelBucket[] wheel = new HashedWheelBucket[ticksPerWheel];
for(int i = 0; i < wheel.length; i++)
{
wheel[i] = new HashedWheelBucket();
}
return wheel;
}
数组的长度为 2 的次方幂方便进行求商和取余计算。
HashedWheelBucket
内部存储着由HashedWheelTimeout
节点构成的双向链表,并且存储着链表的头节点和尾结点,方便于任务的提取和插入。
方法HashedWheelTimer#newTimeout
用于新增延迟任务,下面来看下代码
public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit)
{
//省略代码,用于参数检查
start();
long deadline = System.nanoTime() + unit.toNanos(delay) - startTime;
if(delay > 0 && deadline < 0)
{
deadline = Long.MAX_VALUE;
}
HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline);
timeouts.add(timeout);
return timeout;
}
可以看到,在新增任务的时候,任务并不是直接进入到循环数组中,而是首先被放入到一个队列,也就是属性timeouts
,该队列是一个 MPSC 类型的队列,采用这个模式主要出于提升并发性能考虑,因为这个队列只有线程workerThread
会进行任务提取操作。
该线程是在构造方法中通过调用workerThread = threadFactory.newThread(worker)
被创建。但是创建之后并不是马上执行线程的start
方法,其启动的时机是这个时间轮第一次新增延迟任务的时候,也就是本方法中的start
方法的内容。下面是其代码
public void start()
{
switch(WORKER_STATE_UPDATER.get(this))
{
case WORKER_STATE_INIT:
if(WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_INIT, WORKER_STATE_STARTED))
{
workerThread.start();
}
break;
case WORKER_STATE_STARTED:
break;
case WORKER_STATE_SHUTDOWN:
throw new IllegalStateException("cannot be started once stopped");
default:
throw new Error("Invalid WorkerState");
}
while(startTime == 0)
{
try
{
startTimeInitialized.await();
}
catch(InterruptedException ignore)
{
// Ignore - it will be ready very soon.
}
}
}
方法很明显的分为两个部分,第一部分为Switch
方法块,通过对状态变量的 CAS 操作,确保只有一个线程能够执行workerThread.start()
方法来启动工作线程,避免并发异常。第二部分为阻塞等待,通过CountDownLatch
类型变量startTimeInitialized
执行阻塞等待,用于等待工作线程workerThread
真正进入工作状态。
从newTimeout
方法的角度来看,插入延迟任务首先是放入队列中,之前分析数据结构的时候也说过任务的触发是指针指向时间轮中某个槽位时进行,那么必然存在一个需要将队列中的延迟任务放入到时间轮的数组之中的工作。这个动作显然就是就是由workerThread
工作线程来完成。下面就来看下这个线程的具体代码内容。
工作线程是依托于HashedWheelTimer.Worker
这个实现了Runnable
接口的类进行工作的,那下面看下其对run
方法的实现代码,如下
public void run()
{
{//代码块①
startTime = System.nanoTime();
if(startTime == 0)
{
//使用startTime==0 作为线程进入工作状态模式标识,因此这里重新赋值为1
startTime = 1;
}
//通知外部初始化工作线程的线程,工作线程已经启动完毕
startTimeInitialized.countDown();
}
{//代码块②
do {
final long deadline = waitForNextTick();
if(deadline > 0)
{
int idx = (int)(tick & mask);
processCancelledTasks();
HashedWheelBucket bucket = wheel[idx];
transferTimeoutsToBuckets();
bucket.expireTimeouts(deadline);
tick++;
}
} while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_STARTED);
}
{//代码块③
for(HashedWheelBucket bucket: wheel)
{
bucket.clearTimeouts(unprocessedTimeouts);
}
for(;;)
{
HashedWheelTimeout timeout = timeouts.poll();
if(timeout == null)
{
break;
}
if(!timeout.isCancelled())
{
unprocessedTimeouts.add(timeout);
}
}
processCancelledTasks();
}
}
为了方便阅读,这边将run
方法的内容分为三个代码块。首先来看代码块①。通过系统调用System.nanoTime
为启动时间startTime
设置初始值,该变量代表了时间轮的基线时间,用于后续相对时间的计算。赋值完毕后,通过startTimeInitialized
变量对外部的等待线程进行通知。
接着来看代码块②。这是主要的工作部分,整体是在一个while
循环中,确保工作线程只在时间轮没有被终止的时候工作。首先来看方法waitForNextTick
,在时间轮中,指针移动一次,称之为一个tick
,这个方法显然内部应该是用于等待指针移动到下一个tick
,来看具体代码,如下
private long waitForNextTick()
{
long deadline = tickDuration * (tick + 1);
for(;;)
{
final long currentTime = System.nanoTime() - startTime;
long sleepTimeMs = (deadline - currentTime + 999999) / 1000000;
if(sleepTimeMs <= 0)
{
if(currentTime == Long.MIN_VALUE)
{
return -Long.MAX_VALUE;
}
else
{
return currentTime;
}
}
if(PlatformDependent.isWindows())
{
sleepTimeMs = sleepTimeMs / 10 * 10;
}
try
{
Thread.sleep(sleepTimeMs);
}
catch(InterruptedException ignored)
{
if(WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_SHUTDOWN)
{
return Long.MIN_VALUE;
}
}
}
}
整个方法的思路很简单,前面说过,时间轮每移动一次指针,意味着一个tick
。这里tick
可以看成是指针移动的次数。由于槽位的时间范围是固定的,因此可以简单的计算出来指针移动到下一个槽位,理论上应该经过的时间,也就是long deadline = tickDuration * (tick + 1)
。之后再计算从时间轮启动到当前,实际经过的时间,也就是long currentTime = System.nanoTime() - startTime
。二者的差值就是线程所需要睡眠的时间。
如果差值小于0,意味着实际经过的时间超过了理论时间,此时已经超出了应该休眠的范围,方法需要立即返回。由于在这个方法的执行过程中,可能会遇到时间轮被停止的情况,因此使用一个特殊值来表达这个事件,也就是Long.MIN_VALUE
,这也是为什么currentTime
要避开这个值的原因。
还有一点需要注意,Thread.sleep
方法的实现是依托于操作系统提供的中断检查,也就是操作系统会在每一个中断的时候去检查是否有线程需要唤醒并且提供CPU资源。默认情况下 Linux 的中断间隔是 1 毫秒,而 Windows 的中断间隔是 10 毫秒或者 15 毫秒,具体取决于硬件识别。
如果是在 Windows 平台下,当方法调用Thread.sleep
传入的参数不是10的整数倍时,其内部会调用系统方法timeBeginPeriod()
和timeEndPeriod()
来修改中断周期为 1 毫秒,并且在休眠结束后再次设置回默认值。这样的目的是为了保证休眠时间的准确性。但是在 Windows 平台下,频繁的调用修改中断周期会导致 Windows 时钟出现异常,大多数时候的表现是导致时钟加快。这将导致比如尝试休眠 10 秒时,实际上只休眠了 9 秒。所以在这里,通过sleepTimeMs = sleepTimeMs / 10 * 10
保证了sleepTimeMs
是 10 的整数倍,从而避免了 Windows 的这个 BUG 。
当方法waitForNextTick
返回后,并且返回的值是正数,意味着当前tick
的休眠等待已经完成,可以进行延迟任务的触发处理了。通过int idx = (int)(tick & mask)
调用,确定下一个被触发延迟任务的槽位在循环数组中的下标。在处理触发任务之前,首先将已经取消的延迟任务从槽位所指向的延迟任务队列中删除。每次调用HashedWheelTimer#newTimeout
新增延迟任务时都会返回一个Timeout
对象,可以通过cancle
方法将这个延迟任务取消。当执行取消动作的时候,并不会直接从延迟队列中删除,而是将这个对象放入到取消队列,也就是HashedWheelTimer.cancelledTimeouts
属性。在准备遍历槽位上延迟任务队列之前,通过方法processCancelledTasks
来遍历这个取消队列,将其中的延迟任务从各自槽位上的延迟任务队列中删除。使用这种方式的好处在于延迟任务的删除只有一个线程会进行,避免了多线程带来的并发干扰,减少了开发难度。
在处理完取消的延迟任务后,调用方法transferTimeoutsToBuckets
来将新增延迟任务队列HashedWheelTimer.timeouts
中的延迟任务分别添加到合适其延迟时间的槽位中。方法的代码很简单,就是循环不断从timeouts
取出任务,并且计算其延迟时间与时间轮范围的商值和余数,结果分别为其轮次与槽位下标。根据槽位下标将该任务添加到槽位对应的延迟任务队列中。
在这里可以看到 Netty 作者对时间轮这一结构的并发设计,新增任务是向 MPSC 队列新增元素实现。而槽位上的延迟任务队列只有时间轮本身的线程能够进行新增和删除,设计为了 SPSC 模式。前者是为了提高无锁并发下的性能,后者则是通过约束,减少了设计难度。
transferTimeoutsToBuckets
方法每次最多只会转移 100000 个延迟任务到合适的槽位中,这是为了避免外部循环添加任务导致的饿死。方法执行完毕后,就到了槽位上延迟任务的触发处理,也就是方法HashedWheelBucket#expireTimeouts
的功能,方法内的逻辑也很简单。遍历队列,如果延迟任务的轮次不为 0,则减 1。否则触发任务执行方法,也就是HashedWheelTimeout#expire
。该方法内部依然通过 CAS 方式对状态进行更新,避免方法的触发和取消之间的竞争冲突。从这个方法的实现可以看到,Netty 采用了轮次的方式来对超出时间轮范围的延迟时间进行支持。多层级时间轮的实现相比轮次概念的实现更为复杂,考虑到在网络IO应用中,超出时间轮范围的场景比较少,使用轮次的方式去支撑更大的时间,是一个相对容易实现的方案。
当需要被触发的延迟任务都被触发后,通过tick
加 1 来表达指针移动到下一个槽位。
外部线程通过调用HashedWheelTimer#stop
方法来停止时间轮,停止的方式很简单,就是通过 CAS 调用来修改时间轮的状态属性。而在代码块②中通过循环的方式在每一次tick
都会检查这个状态位。代码块③的内容很简单,遍历所有的槽位,并且遍历槽位的延迟任务队列,将所有未到达延迟时间并且未取消的任务,都放入到一个集合中,最终将这个集合返回。这个集合内存储的就是所有未能执行的延迟任务。
在处理大量延迟任务的场景中,时间轮是一个很高效的算法与数据结构。Netty 在对时间轮的实现上,在添加任务,过期任务,删除任务等环节进行了一些细节上的调整。实际上,不同中间件中都有对时间轮的一些实现,各自也都有区别,但是核心都是围绕在循环数组与槽位过期这个概念上。不同的细节变化有各自适合的场景和考量。
原文:https://www.cnblogs.com/jfire/p/12243196.html