线程池类图
本篇文章我们从右边这条线
Executor==>ExcutorService==>ScheduledExecutorService==>ScheduledThreadPoolExecutor来分析一下。
ScheduledExecutorService
public interface ScheduledExecutorService extends ExecutorService {
public ScheduledFuture<?> schedule(Runnablcommand,
long delay, TimeUnit unit);
public <V> ScheduledFuture<V> schedule(Callable<V> callable,long delay, TimeUnit unit);
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit);
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,long initialDelay,long delay,TimeUnit unit);
}
前2个方法是在一段时间后(delay,unit决定),开启一个任务,任务只运行一次。后2个方法是每隔一段时间定时触发一个任务。其中方法3是每隔固定的时间启动一个任务(若前一个任务还未完成,则下一个任务可能会晚一点,不会并发的运行)。方法4是前一个任务运行完成后经过指定的时间运行下一个任务。
ScheduledThreadPoolExecutor
(1)schedule和scheduleAtFixedRate
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
if (callable == null || unit == null)
throw new NullPointerException();
RunnableScheduledFuture<V> t = decorateTask(callable,new ScheduledFutureTask<V>(callable,triggerTime(delay, unit)));
delayedExecute(t);
return t;
}
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
if (period <= 0)
throw new IllegalArgumentException();
ScheduledFutureTask<Void> sft =
new ScheduledFutureTask<Void>(command,null,triggerTime(initialDelay, unit),unit.toNanos(period));
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
sft.outerTask = t;
delayedExecute(t);
return t;
}
我们选取上述2个方法分别代表只运行一次的任务和定时执行的任务。可以看到多次执行的定时任务就多了sft.outerTask = t,其他基本一样。
(2)delayedExecute
private void delayedExecute(RunnableScheduledFuture<?> task) {
if (isShutdown())
reject(task);
else {
super.getQueue().add(task);
if (isShutdown() &&
!canRunInCurrentRunState(task.isPeriodic()) &&
remove(task))
task.cancel(false);
else
ensurePrestart();
}
}
如果线程池状态不是RUNNING状态,则执行拒绝策略拒绝任务,否则将任务添加到queue中。接着判断若状态不是RUNNING,并且删除成功,然后取消任务。若正常,则执行ensurePrestart()方法。
(3)ensurePrestart
void ensurePrestart() {
int wc = workerCountOf(ctl.get());
if (wc < corePoolSize)
addWorker(null, true);
else if (wc == 0)
addWorker(null, false);
}
这里就调用到了我们上一篇文章中讲的ThreadPoolExecutor的addWorker方法。这里的command都为null,意思是所有的工作线程都是从queue中获取任务。
上章我们知道若addWorker成功,则会启动一个工作线程调用runWorker方法
final void runWorker(Worker w) {
。。。。。。
try {
while (task != null || (task = getTask())!= null) {
try {
beforeExecute(wt, task);
task.run();
afterExecute(task, thrown);
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
我们主要关注和上一章普通的ThreadPoolExecutor的不同点。在我看来,有以下2个地方:
- task = getTask(),这个方法从queue里面取得一个Task,这里用的queue是DelayedWorkQueue,和普通的LinkedBlockingQueue不同,从名字上可以看出这个是实现延时任务的。
- task.run();这个task是ScheduledFutureTask,FutureTask的子类。
下面我们会单独来分析下上面2个方法。
(4)ScheduledFutureTask.run()
public void run() {
boolean periodic = isPeriodic();
if (!canRunInCurrentRunState(periodic))
cancel(false);
else if (!periodic)
ScheduledFutureTask.super.run();
else if (ScheduledFutureTask.super.runAndReset()) {
setNextRunTime();
reExecutePeriodic(outerTask);
}
}
若线程池不是RUNNING状态,则取消;若只运行一次的任务,则调用父类FutureTask的run方法;若周期运行的任务,先设置下次运行的时间,然后调用reExecutePeriodic方法。
void reExecutePeriodic(RunnableScheduledFuture<?> task) {
if (canRunInCurrentRunState(true)) {
super.getQueue().add(task);
if (!canRunInCurrentRunState(true) && remove(task))
task.cancel(false);
else
ensurePrestart();
}
}
这个方法首先将任务添加到workQueue中,然后又会调用到上面的ensurePrestart方法。
(4)getTask()方法
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// Are workers subject to culling?
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
这个workQueue是ScheduledThreadPoolExecutor中定义的DelayedWorkQueue。
DelayedWorkQueue
public RunnableScheduledFuture<?> take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
RunnableScheduledFuture<?> first = queue[0];
if (first == null)
available.await();
else {
long delay = first.getDelay(NANOSECONDS);
if (delay <= 0)
return finishPoll(first);
first = null; // don't retain ref while waiting
if (leader != null)
available.await();
else {
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
available.awaitNanos(delay);
} finally {
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
if (leader == null && queue[0] != null)
available.signal();
lock.unlock();
}
}
这个take方法是个阻塞方法,会一直等到获取到任务为止(或者被其他线程中断)若工作队列为空,则当前线程await();若工作队列不为空,则先获取剩余delay时间,若delay<=0(说明延时时间已经到了),则从队列中取出任务并从队列中删除。若delay>0,最终会运行到available.awaitNanos(delay),等待剩下的时间。然后进行下一次for循环,显然此时queue不为空,并且此时delay<=0,则从队列中取出任务并删除。然后运行finally方法,唤醒其他阻塞在当前lock的当前Condition上的线程。
poll方法就不分析了。