电子说
ScheduledThreadPoolExecutor
可以用来很方便实现我们的调度任务,具体使用可以参考调度线程池ScheduledThreadPoolExecutor的正确使用姿势这篇文章,那大家知道它是怎么实现的吗,本文就带大家来揭晓谜底。
我们先思考下,如果让大家去实现ScheduledThreadPoolExecutor
可以周期性执行任务的功能,需要考虑哪些方面呢?
ScheduledThreadPoolExecutor
的整体实现思路是什么呢?答:我们是不是可以继承线程池类,按照线程池的思路,将任务先丢到阻塞队列中,等到时间到了,工作线程就从阻塞队列获取任务执行。
答:我们可以根据参数获取这个任务还要多少时间执行,那么我们是不是可以从阻塞队列中获取任务的时候,通过条件队列的的awaitNanos(delay)
方法,阻塞一定时间。
答:这就更加简单了,任务执行完成后,把它再次加入到队列不就行了吗。
ScheduledThreadPoolExecutor
的类结构图如上图所示,很明显它是在我们的线程池ThreadPoolExecutor
框架基础上扩展的。
ScheduledExecutorService
:实现了该接口,封装了调度相关的APIThreadPoolExecutor
:继承了该类,保留了线程池的能力和整个实现的框架DelayedWorkQueue
:内部类,延迟阻塞队列。ScheduledFutureTask
:延迟任务对象,包含了任务、任务状态、剩余的时间、结果等信息。通过ScheduledThreadPoolExecutor
类的成员属性,我们可以了解它的数据结构。
shutdown
后是否继续执行周期任务(重复执行)private volatile boolean continueExistingPeriodicTasksAfterShutdown;
shutdown
后是否继续执行延迟任务(只执行一次)private volatile boolean executeExistingDelayedTasksAfterShutdown = true;
cancel()
方法后,是否将该任务从队列中移除,默认falseprivate volatile boolean removeOnCancel = false;
private static final AtomicLong sequencer = new AtomicLong()
ScheduledFutureTask
延迟任务类ScheduledFutureTask
继承 FutureTask
,实现 RunnableScheduledFuture
接口,无论是 runnable
还是 callable
,无论是否需要延迟和定时,所有的任务都会被封装成 ScheduledFutureTask
。FutureTask
的 run
方法来实现对延时执行、周期执行的支持。FutureTask#run
,而对于周期性任务则调用FutureTask#runAndReset
并且在成功之后根据 fixed-delay/fixed-rate
模式来设置下次执行时间并重新将任务塞到工作队列。// 任务序列号
private final long sequenceNumber;
// 任务可以被执行的时间,交付时间,以纳秒表示
private long time;
// 0 表示非周期任务
// 正数表示 fixed-rate(两次开始启动的间隔)模式的周期,
// 负数表示 fixed-delay(一次执行结束到下一次开始启动) 模式
private final long period;
// 执行的任务对象
RunnableScheduledFuture
DelayedWorkQueue
延迟队列DelayedWorkQueue
是支持延时获取元素的阻塞队列, 内部采用优先队列 PriorityQueue(小根堆、满二叉树)存储元素。// 初始容量
private static final int INITIAL_CAPACITY = 16;
// 节点数量
private int size = 0;
// 存放任务的数组
private RunnableScheduledFuture?[] queue =
new RunnableScheduledFuture?[INITIAL_CAPACITY];
// 控制并发用的锁
private final ReentrantLock lock = new ReentrantLock();
// 条件队列
private final Condition available = lock.newCondition();
//指定用于等待队列头节点任务的线程
private Thread leader = null;
schedule()
原理延迟执行方法,并指定延迟执行的时间,只会执行一次。
schedule()
方法是延迟任务方法的入口。public ScheduledFuture? schedule(Runnable command,
long delay,
TimeUnit unit) {
// 判空处理
if (command == null || unit == null)
throw new NullPointerException();
// 将外部传入的任务封装成延迟任务对象ScheduledFutureTask
RunnableScheduledFuture? t = decorateTask(command,
new ScheduledFutureTask<Void>(command, null,
triggerTime(delay, unit)));
// 执行延迟任务
delayedExecute(t);
return t;
}
decorateTask(...)
该方法是封装延迟任务triggerTime(delay, unit)
方法计算延迟的时间。// 返回【当前时间 + 延迟时间】,就是触发当前任务执行的时间
private long triggerTime(long delay, TimeUnit unit) {
// 设置触发的时间
return triggerTime(unit.toNanos((delay < 0) ? 0 : delay));
}
long triggerTime(long delay) {
// 如果 delay < Long.Max_VALUE/2,则下次执行时间为当前时间 +delay
// 否则为了避免队列中出现由于溢出导致的排序紊乱,需要调用overflowFree来修正一下delay
return now() + ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
}
// 下面这种情况很少,大家看不懂可以不用强行理解
// 如果某个任务的 delay 为负数,说明当前可以执行(其实早该执行了)。
// 阻塞队列中维护任务顺序是基于 compareTo 比较的,比较两个任务的顺序会用 time 相减。
// 那么可能出现一个 delay 为正数减去另一个为负数的 delay,结果上溢为负数,则会导致 compareTo 产生错误的结果
private long overflowFree(long delay) {
Delayed head = (Delayed) super.getQueue().peek();
if (head != null) {
long headDelay = head.getDelay(NANOSECONDS);
// 判断一下队首的delay是不是负数,如果是正数就不用管,怎么减都不会溢出
// 否则拿当前 delay 减去队首的 delay 来比较看,如果不出现上溢,排序不会乱
// 不然就把当前 delay 值给调整为 Long.MAX_VALUE + 队首 delay
if (headDelay < 0 && (delay - headDelay < 0))
delay = Long.MAX_VALUE + headDelay;
}
return delay;
}
RunnableScheduledFuture
的构造方法封装为延迟任务ScheduledFutureTask(Runnable r, V result, long ns) {
super(r, result);
// 任务的触发时间
this.time = ns;
// 任务的周期, 延迟任务的为0,因为不需要重复执行
this.period = 0;
// 任务的序号 + 1
this.sequenceNumber = sequencer.getAndIncrement();
}
decorateTask()
方法装饰延迟任务// 没有做任何操作,直接将 task 返回,该方法主要目的是用于子类扩展
protected
scheduleAtFixedRate()
原理按照固定的频率周期性的执行任务,捕手renwu,一次任务的启动到下一次任务的启动的间隔
public ScheduledFuture? scheduleAtFixedRate(Runnable command, long initialDelay, long period,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
if (period <= 0)
throw new IllegalArgumentException();
// 任务封装,【指定初始的延迟时间和周期时间】
ScheduledFutureTask<Void> sft =new ScheduledFutureTask<Void>(command, null,
triggerTime(initialDelay, unit), unit.toNanos(period));
// 默认返回本身
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
sft.outerTask = t;
// 开始执行这个任务
delayedExecute(t);
return t;
}
scheduleWithFixedDelay()
原理按照指定的延时周期性执行任务,上一个任务执行完毕后,延时一定时间,再次执行任务。
public ScheduledFuture? scheduleWithFixedDelay(Runnable command, long initialDelay, long delay,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
if (delay <= 0)
throw new IllegalArgumentException();
// 任务封装,【指定初始的延迟时间和周期时间】,周期时间为 - 表示是 fixed-delay 模式
ScheduledFutureTask<Void> sft = new ScheduledFutureTask<Void>(command, null,
triggerTime(initialDelay, unit), unit.toNanos(-delay));
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
sft.outerTask = t;
// 开始执行这个任务
delayedExecute(t);
return t;
}
delayedExecute(t)
原理上面多种提交任务的方式,殊途同归,最终都会调用delayedExecute()
方法执行延迟或者周期任务。
delayedExecute()
方法是执行延迟任务的入口private void delayedExecute(RunnableScheduledFuture? task) {
// 线程池是 SHUTDOWN 状态,执行拒绝策略
if (isShutdown())
// 调用拒绝策略的方法
reject(task);
else {
// 把当前任务放入阻塞队列
super.getQueue().add(task);
// 线程池状态为 SHUTDOWN 并且不允许执行任务了,就从队列删除该任务,并设置任务的状态为取消状态
// 非主流程,可以跳过,不重点看了
if (isShutdown() && !canRunInCurrentRunState(task.isPeriodic()) && remove(task))
task.cancel(false);
else
// 开始执行了哈
ensurePrestart();
}
}
ensurePrestart()
方法开启线程执行// ThreadPoolExecutor#ensurePrestart
void ensurePrestart() {
int wc = workerCountOf(ctl.get());
// worker数目小于corePoolSize,则添加一个worker。
if (wc < corePoolSize)
// 第二个参数 true 表示采用核心线程数量限制,false 表示采用 maximumPoolSize
addWorker(null, true);
// corePoolSize = 0的情况,至少开启一个线程,【担保机制】
else if (wc == 0)
addWorker(null, false);
}
addWorker()
方法实际上父类ThreadPoolExecutor
的方法,这个方法在该文章 Java线程池源码深度解析中详细介绍过,这边做个总结:
目前工作线程已经创建好了,工作线程开始工作了,它会从阻塞队列中获取延迟任务执行,这部分也是线程池里面的原理,不做展开,那我们看下它是如何实现延迟执行的? 主要关注如何从阻塞队列中获取任务。
DelayedWorkQueue#take()
方法获取延迟任务addWoker()
方法创建工作线程后,工作线程中循环持续调用workQueue.take()
方法获取延迟任务。awaitNanos(delay)
阻塞方法等待一段时间,等时间到了,延迟时间自然小于等于0了。// DelayedWorkQueue#take()
public RunnableScheduledFuture? take() throws InterruptedException {
final ReentrantLock lock = this.lock;
// 加可中断锁
lock.lockInterruptibly();
try {
// 自旋
for (;;) {
// 获取阻塞队列中的头结点
RunnableScheduledFuture? first = queue[0];
// 如果阻塞队列没有数据,为空
if (first == null)
// 等待队列不空,直至有任务通过 offer 入队并唤醒
available.await();
else {
// 获取头节点的的任务还剩余多少时间才执行
long delay = first.getDelay(NANOSECONDS);
if (delay <= 0)
// 到达触发时间,获取头节点并调整堆,重新选择延迟时间最小的节点放入头部
return finishPoll(first);
// 逻辑到这说明头节点的延迟时间还没到
first = null;
// 说明有 leader 线程在等待获取头节点,当前线程直接去阻塞等待
if (leader != null)
// 当前线程阻塞
available.await();
else {
// 没有 leader 线程,【当前线程作为leader线程,并设置头结点的延迟时间作为阻塞时间】
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
// 当前线程通过awaitNanos方法等待delay时间后,会自动唤醒,往后面继续执行
available.awaitNanos(delay);
// 到达阻塞时间时,当前线程会从这里醒来,进入下一轮循环,就有可能执行了
} finally {
// t堆顶更新,leader 置为 null,offer 方法释放锁后,
// 有其它线程通过 take/poll 拿到锁,读到 leader == null,然后将自身更新为leader。
if (leader == thisThread)
// leader 置为 null 用以接下来判断是否需要唤醒后继线程
leader = null;
}
}
}
}
} finally {
// 没有 leader 线程并且头结点不为 null,唤醒阻塞获取头节点的线程,
// 【如果没有这一步,就会出现有了需要执行的任务,但是没有线程去执行】
if (leader == null && queue[0] != null)
available.signal();
// 解锁
lock.unlock();
}
}
finishPoll()
方法获取到任务后执行该方法主要做两个事情, 获取头节点并调整堆,重新选择延迟时间最小的节点放入头部。
private RunnableScheduledFuture?</span> finishPoll(RunnableScheduledFuture?span> f) {
// 获取尾索引
int s = --size;
// 获取尾节点
RunnableScheduledFuture? x = queue[s];
// 将堆结构最后一个节点占用的 slot 设置为 null,因为该节点要尝试升级成堆顶,会根据特性下调
queue[s] = null;
// s == 0 说明 当前堆结构只有堆顶一个节点,此时不需要做任何的事情
if (s != 0)
// 从索引处 0 开始向下调整
siftDown(0, x);
// 出队的元素索引设置为 -1
setIndex(f, -1);
return f;
}
从延迟队列中获取任务后,工作线程会调用延迟任务的run()方法执行任务。
ScheduledFutureTask#run()
方法运行任务isPeriodic()
方法判断任务是否是周期性任务还是非周期性任务FutureTask#run()
执行一次FutureTask#runAndReset()
, 返回true会设置下一次的执行时间,重新放入线程池的阻塞队列中,等待下次获取执行public void run() {
// 是否周期性,就是判断 period 是否为 0
boolean periodic = isPeriodic();
// 根据是否是周期任务检查当前状态能否执行任务,不能执行就取消任务
if (!canRunInCurrentRunState(periodic))
cancel(false);
// 非周期任务,直接调用 FutureTask#run 执行一次
else if (!periodic)
ScheduledFutureTask.super.run();
// 周期任务的执行,返回 true 表示执行成功
else if (ScheduledFutureTask.super.runAndReset()) {
// 设置周期任务的下一次执行时间
setNextRunTime();
// 任务的下一次执行安排,如果当前线程池状态可以执行周期任务,加入队列,并开启新线程
reExecutePeriodic(outerTask);
}
}
FutureTask#runAndReset()
执行周期性任务protected boolean runAndReset() {
// 任务不是新建的状态了,或者被别的线程执行了,直接返回 false
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread()))
return false;
boolean ran = false;
int s = state;
try {
Callable
ScheduledFutureTask#setNextRunTime()
设置下次执行时间fixed-rate
模式,直接加上period时间即可。fixed-delay
模式, 调用triggerTime重新计算下次时间。// 任务下一次的触发时间
private void setNextRunTime() {
long p = period;
if (p > 0)
// fixed-rate 模式,【时间设置为上一次执行任务的时间 + p】,两次任务执行的时间差
time += p;
else
// fixed-delay 模式,下一次执行时间是【当前这次任务结束的时间(就是现在) + delay 值】
time = triggerTime(-p);
}
ScheduledFutureTask#reExecutePeriodic()
,重新放入阻塞任务队列,等待获取,进行下一轮执行// ScheduledThreadPoolExecutor#reExecutePeriodic
void reExecutePeriodic(RunnableScheduledFuture? task) {
if (canRunInCurrentRunState(true)) {
// 【放入任务队列】
super.getQueue().add(task);
// 如果提交完任务之后,线程池状态变为了 shutdown 状态,需要再次检查是否可以执行,
// 如果不能执行且任务还在队列中未被取走,则取消任务
if (!canRunInCurrentRunState(true) && remove(task))
task.cancel(false);
else
// 当前线程池状态可以执行周期任务,加入队列,并【根据线程数量是否大于核心线程数确定是否开启新线程】
ensurePrestart();
}
}
全部0条评论
快来发表一下你的评论吧 !