Java线程池源码分析二(ScheduledThreadPoolExecutor)


线程池类图

本篇文章我们从右边这条线
Executor==>ExcutorService==>ScheduledExecutorService==>ScheduledThreadPoolExecutor来分析一下。
线程池UML类图

ScheduledExecutorService

public interface ScheduledExecutorService extends ExecutorService {
    public ScheduledFuture<?> schedule(Runnablcommand,
        long delay, TimeUnit unit);

    public <V> ScheduledFuture<V> schedule(Callable<V> callable,long delay, TimeUnit unit);

    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit);

    public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,long initialDelay,long delay,TimeUnit unit);
}

前2个方法是在一段时间后(delay,unit决定),开启一个任务,任务只运行一次。后2个方法是每隔一段时间定时触发一个任务。其中方法3是每隔固定的时间启动一个任务(若前一个任务还未完成,则下一个任务可能会晚一点,不会并发的运行)。方法4是前一个任务运行完成后经过指定的时间运行下一个任务。

ScheduledThreadPoolExecutor

(1)schedule和scheduleAtFixedRate

public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
    if (callable == null || unit == null)
        throw new NullPointerException();
    RunnableScheduledFuture<V> t = decorateTask(callable,new ScheduledFutureTask<V>(callable,triggerTime(delay, unit)));
    delayedExecute(t);
    return t;
}
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit) {
    if (command == null || unit == null)
        throw new NullPointerException();
    if (period <= 0)
        throw new IllegalArgumentException();
    ScheduledFutureTask<Void> sft =
        new ScheduledFutureTask<Void>(command,null,triggerTime(initialDelay, unit),unit.toNanos(period));
    RunnableScheduledFuture<Void> t = decorateTask(command, sft);
    sft.outerTask = t;
    delayedExecute(t);
    return t;
}

我们选取上述2个方法分别代表只运行一次的任务和定时执行的任务。可以看到多次执行的定时任务就多了sft.outerTask = t,其他基本一样。
(2)delayedExecute

private void delayedExecute(RunnableScheduledFuture<?> task) {
    if (isShutdown())
        reject(task);
    else {
        super.getQueue().add(task);
        if (isShutdown() &&
        !canRunInCurrentRunState(task.isPeriodic()) &&
            remove(task))
            task.cancel(false);
        else
            ensurePrestart();
    }
}

如果线程池状态不是RUNNING状态,则执行拒绝策略拒绝任务,否则将任务添加到queue中。接着判断若状态不是RUNNING,并且删除成功,然后取消任务。若正常,则执行ensurePrestart()方法。
(3)ensurePrestart

void ensurePrestart() {
    int wc = workerCountOf(ctl.get());
    if (wc < corePoolSize)
        addWorker(null, true);
    else if (wc == 0)
        addWorker(null, false);
}

这里就调用到了我们上一篇文章中讲的ThreadPoolExecutor的addWorker方法。这里的command都为null,意思是所有的工作线程都是从queue中获取任务。
上章我们知道若addWorker成功,则会启动一个工作线程调用runWorker方法

final void runWorker(Worker w) {
    。。。。。。
    try {
        while (task != null || (task = getTask())!= null) {
            try {
                beforeExecute(wt, task);
                task.run();
                afterExecute(task, thrown);
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}

我们主要关注和上一章普通的ThreadPoolExecutor的不同点。在我看来,有以下2个地方:

  • task = getTask(),这个方法从queue里面取得一个Task,这里用的queue是DelayedWorkQueue,和普通的LinkedBlockingQueue不同,从名字上可以看出这个是实现延时任务的。
  • task.run();这个task是ScheduledFutureTask,FutureTask的子类。
    下面我们会单独来分析下上面2个方法。

(4)ScheduledFutureTask.run()

public void run() {
        boolean periodic = isPeriodic();
        if (!canRunInCurrentRunState(periodic))
            cancel(false);
        else if (!periodic)
            ScheduledFutureTask.super.run();
        else if (ScheduledFutureTask.super.runAndReset()) {
            setNextRunTime();
            reExecutePeriodic(outerTask);
        }
    }

若线程池不是RUNNING状态,则取消;若只运行一次的任务,则调用父类FutureTask的run方法;若周期运行的任务,先设置下次运行的时间,然后调用reExecutePeriodic方法。

void reExecutePeriodic(RunnableScheduledFuture<?> task) {
    if (canRunInCurrentRunState(true)) {
        super.getQueue().add(task);
        if (!canRunInCurrentRunState(true) && remove(task))
            task.cancel(false);
        else
            ensurePrestart();
    }
}

这个方法首先将任务添加到workQueue中,然后又会调用到上面的ensurePrestart方法。
(4)getTask()方法

private Runnable getTask() {
    boolean timedOut = false; // Did the last poll() time out?

    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }

        int wc = workerCountOf(c);

        // Are workers subject to culling?
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }

        try {
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}

这个workQueue是ScheduledThreadPoolExecutor中定义的DelayedWorkQueue。

DelayedWorkQueue

public RunnableScheduledFuture<?> take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            for (;;) {
                RunnableScheduledFuture<?> first = queue[0];
                if (first == null)
                    available.await();
                else {
                    long delay = first.getDelay(NANOSECONDS);
                    if (delay <= 0)
                        return finishPoll(first);
                    first = null; // don't retain ref while waiting
                    if (leader != null)
                        available.await();
                    else {
                        Thread thisThread = Thread.currentThread();
                        leader = thisThread;
                        try {
                            available.awaitNanos(delay);
                        } finally {
                            if (leader == thisThread)
                                leader = null;
                        }
                    }
                }
            }
        } finally {
            if (leader == null && queue[0] != null)
                available.signal();
            lock.unlock();
        }
    }

这个take方法是个阻塞方法,会一直等到获取到任务为止(或者被其他线程中断)若工作队列为空,则当前线程await();若工作队列不为空,则先获取剩余delay时间,若delay<=0(说明延时时间已经到了),则从队列中取出任务并从队列中删除。若delay>0,最终会运行到available.awaitNanos(delay),等待剩下的时间。然后进行下一次for循环,显然此时queue不为空,并且此时delay<=0,则从队列中取出任务并删除。然后运行finally方法,唤醒其他阻塞在当前lock的当前Condition上的线程。
poll方法就不分析了。


文章作者: 叶明
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 叶明 !
评论
 上一篇
贵州游记 贵州游记
6月7号中午12点就请了一天半的假,直接从公司出发去浦东机场,6月11号回到上海。总共4天半的时间,这次贵州之行,给我最大的印象就是贵州的山真是多,雨也很多,空气很好。另外本次贵州之行,也是我第一次在外面住青旅,在青旅结识了不少喜欢旅游的朋
2016-06-12
下一篇 
Java线程池源码分析 Java线程池源码分析
线程池类图本篇文章我们先从左边这条线Executor==>ExcutorService==>AbstractExecutorService==>ThreadPoolExecutor来分析一下。
2016-05-07
  目录