Java线程池源码分析


线程池类图

本篇文章我们先从左边这条线
Executor==>ExcutorService==>AbstractExecutorService==>ThreadPoolExecutor来分析一下。
线程池UML类图

  • 上面url继承类图,线程池的最顶层的接口是Executor,这个接口只有一个方法void execute(Runnable command)
  • ExecutorService继承Executor,新增了submit(Runnable(Callable)),shutDown,shutDownNow等几个主要方法
  • AbstractExecutorService实现了上面的ExecutorService接口的若干个方法。
  • ThreadPoolExecutor继承AbstractExecutorService,实现了线程池的一些主要的方法execute(Runnable)。

AbstractExecutorService

AbstractExecutorService实现了submit方法,代码如下:

  • submit(Callable task)方法
    public <T> Future<T> submit(Callable<T> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task);
        execute(ftask);
        return ftask;
    }
  • newTaskFor(Callable callable)方法
    protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
        return new FutureTask<T>(callable);
    }
    上面的FutureTask实现了RunnableFuture接口,RunnableFuture继承了
    Runnable和Future接口。Runnable接口只有一个void run方法,Future接口有cancel(boolean),V get(),V get(long timeout, TimeUnit unit),boolean isCancelled(),boolean isDone()方法。

ThreadPoolExecutor

接着上面的AbstractExecutorService.submit方法,会调用到execute(ftask),这个execute方法就是ThreadPoolExecutor中的。我们接下来就以execute方法作为起点来分析。

execute

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    else if (!addWorker(command, false))
        reject(command);
}
  • 首先检查当前工作线程数是否小于corePoolSize,若小于,则添加一个worker来处理这个任务(commadn),添加任务成功则返回.
  • 如果线程还处于running状态,并且任务成功添加到queue中,重新检查一次线程池的状态,若线程池非running,则从queue中删除任务,成功则调用reject,这里根据拒绝策略来执行;若当前工作的线程数为0,则添加一个worker(addWorker(null, false),这里要注意,这次的addWorker的参数和上面第一次的不一样)
  • 如果添加worker失败,也执行reject方法。

addWorker(Runnable firstTask, boolean core)

private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;

        for (;;) {
            int wc = workerCountOf(c);
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            if (compareAndIncrementWorkerCount(c))
                break retry;
            c = ctl.get();  // Re-read ctl
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }

    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // Recheck while holding lock.
                // Back out on ThreadFactory failure or if
                // shut down before lock acquired.
                int rs = runStateOf(ctl.get());

                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    workers.add(w);
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}

(1)这个判断逻辑比较复杂,我们先来看下

if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;

若当前状态大于SHUTDOWN,显然if判断条件为ture,直接returnfalse。(很好理解,线程池处于关闭状态,肯定不让新添加worker了)
若当前状态小于SHUTDOWN,if判断条件为false,接着往下走(线程池为RUNNING状态,很好理解)
若当前状态等于SHUTDOWN:若firstTask等于null并且工作队列有任务,则if判断条件为false,代码不会return,会继续往下运行;若firstTask不等于null或者工作队列为空,则判断条件为true,会return false(这个也好理解,我们知道SHUTDOWN状态,线程池不再接受新的任务,但是已经在工作队列中的任务还是要完成才行。所以若first等于null,并且工作队列有任务,还要继续往下走。若相反,则不会往下走)
(2)判断当前工作线程数

for (;;) {
    int wc = workerCountOf(c);
    if (wc >= CAPACITY ||
        wc >= (core ? corePoolSize : maximumPoolSize))
        return false;
    if (compareAndIncrementWorkerCount(c))
        break retry;
    c = ctl.get();  // Re-read ctl
    if (runStateOf(c) != rs)
        continue retry;
    // else CAS failed due to workerCount change; retry inner loop
}

当前工作线程数没有超过线程池设置的参数的限制,则利用CAS添加一个worker,并跳出外层的for循环,继续向下运行。否则返回false,添加worker失败。
(3) 完成了上述1 2步骤后,会执行new Worker(firstTask),Thread t = w.thread并再次检查线程池的状态,若合法,则向工作线程池HashSet中添加当前worker,并执行t.start。此时才开启了子线程来执行任务。

子线程run方法

上面步骤3调用了t.start,会开启一个子线程来运行Worker中的run方法。

public void run() {
        runWorker(this);
}

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
        while (task != null || (task = getTask()) != null) {
            w.lock();
            // If pool is stopping, ensure thread is interrupted;
            // if not, ensure thread is not interrupted.  This
            // requires a recheck in second case to deal with
            // shutdownNow race while clearing interrupt
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    task.run();
                }finally {
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}

上述worker不断通过getTask()方法,从workQueue中获取任务;若没有获取到任务,则调用processWorkerExit方法。

getTask()

private Runnable getTask() {
    boolean timedOut = false; // Did the last poll() time out?
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);
        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }
        int wc = workerCountOf(c);
        // Are workers subject to culling?
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }
        try {
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}

getTask方法是一个无限的for循环方法,它首先判断当前线程池的状态

if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }

这个判断也很好理解,若rs==SHUTDOWN,workQueue为空,显然应该直接返回null,并提前是工作的worker减一。(getTask返回null,runWorker方法会调用processWorkerExit从HashSet中remove当前worker);若rs>大于SHUTDOWN(这个对应线程池的shutDownNow方法,工作队列中等待的任务不再执行);其他情况,说明线程池处于运行状态,继续往下运行。然后根据当前线程池设置的最大线程数,以及是否允许线coreThread超时间以及workQueue的状态来判断是否通过CAS操作来是线程数减一并return null。最后我们要关注下下面这个从工作队列中取得任务的三目运算。

Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();

若timed为ture(设置allowCoreThreadTimeOut为true),则超过了等待的时间还没有从workQueue中取得任务则r = null,此时就有可能造成即使workerCount小于corePoolSize,当前的worker也可能被回收。
若timed为false,则调用阻塞方法从workQueue中获取任务,newFixedThreadPool就会一直调用这个阻塞方法,从而达到不显示关闭线程池的情况下,即使workQueue为空,也能维持固定的工作线程的个数。

shutDown(shutDownNow)方法

public List<Runnable> shutdownNow() {
    List<Runnable> tasks;
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess();
        //shutDwonNow为STOP,shutDown为SHUTDOWN
        advanceRunState(STOP);(advanceRunState(SHUTDOWN);)
        interruptWorkers();(interruptIdleWorkers)
        //shutDownNow专用
        tasks = drainQueue();
        //shutDown专用 ScheduledThreadPoolExecutor回调
        onShutdown();
    } finally {
        mainLock.unlock();
    }
    tryTerminate();
    return tasks;
}

shutDown和shutDownnNow方法区别(代码层面):

  • shutDownNow:advanceRunState(STOP),interruptWorkers
    shutDown:advanceRunState(shutDown),interruptIdleWorkers
  • shutDown多了个onShutdown();ScheduledThreadPoolExecutor复写了onShutDown方法。
  • shutDownNow方法工作队列中还未完成的任务。
  • interruptIdleWorkers

interruptIdleWorkers与interruptWorkers

(1)shutDownNow

private void interruptWorkers() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        for (Worker w : workers)
            w.interruptIfStarted();
    } finally {
        mainLock.unlock();
    }
}

显然这个是中断所有的线程
(2)shutDown

private void interruptIdleWorkers(boolean onlyOne) {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        for (Worker w : workers) {
            Thread t = w.thread;
            if (!t.isInterrupted() && w.tryLock()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                } finally {
                    w.unlock();
                }
            }
            if (onlyOne)
                break;
        }
    } finally {
        mainLock.unlock();
    }
}

注意onlyOne参数,这个只有在调用tryTerminate()方法里面,会调用interruptIdleWorkers(true),其他情况都是interruptIdleWorkers(false),所以对于shutDown方法,也是尝试中断所有还没有被中断的线程。
3)tryTerminate
上面(2)中提到了tryTerminate方法,接下来就来看下这个方法

final void tryTerminate() {
    for (;;) {
        int c = ctl.get();
        if (isRunning(c) ||
            runStateAtLeast(c, TIDYING) ||
            (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
            return;
        if (workerCountOf(c) != 0) { // Eligible to terminate
            interruptIdleWorkers(ONLY_ONE);
            return;
        }

        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                try {
                    terminated();
                } finally {
                    ctl.set(ctlOf(TERMINATED, 0));
                    termination.signalAll();
                }
                return;
            }
        } finally {
            mainLock.unlock();
        }
        // else retry on failed CAS
    }
}

从上述代码可以看出,若线程池状态为SHUTDOWN,workQueue为空,工作线程数为0或者线程池状态为STOP,工作线程数为0,都最终会把线程池状态设置为TERMINATED,并且唤醒所有因为调用awaitTermination()方法阻塞在termination.awaitNanos(nanos)还未醒过来的线程。

public boolean awaitTermination(long timeout, TimeUnit unit)
    throws InterruptedException {
    long nanos = unit.toNanos(timeout);
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        for (;;) {
            if (runStateAtLeast(ctl.get(), TERMINATED))
                return true;
            if (nanos <= 0)
                return false;
            nanos = termination.awaitNanos(nanos);
        }
    } finally {
        mainLock.unlock();
    }
}

上述tryTerminate方法,在addWorkerFailed(),processWorkerExit(),shutDown(),shutDownNow(),remove(Runnable task)方法中都会调用到。

线程池5种状态解释

上面经常提到线程池的运行状态,这里稍作解释一下。

private static final int RUNNING    = -1 << COUNT_BITS;
private static final int SHUTDOWN   =  0 << COUNT_BITS;
private static final int STOP       =  1 << COUNT_BITS;
private static final int TIDYING    =  2 << COUNT_BITS;
private static final int TERMINATED =  3 << COUNT_BITS;

种状态的定义

  • RUNNING: 接受新的任务,处理workQueue中的任务。
  • SHUTDOWN: 不接受新的任务,但是会继续完成workQueue中的任务
  • STOP: 不接受新的任务,也不处理workQueue中未完成的任务,尝试中断所有运行中的任务
  • TIDYING: 所有任务已经完成, 工作线程数为0,线程池状态变成TIDYING随之将会调用terminated()方法。
  • TERMINATED: terminated()方法已经完成

5种状态相互转换

  • RUNNING -> SHUTDOWN: 调用shutdown()方法,也许隐式在finalize()方法
  • (RUNNING or SHUTDOWN) -> STOP: 调用shutdownNow()方法
  • SHUTDOWN -> TIDYING: workQueue和pool都为空
  • STOP -> TIDYING: pool为空
  • TIDYING -> TERMINATED: terminated()方法完成

文章作者: 叶明
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 叶明 !
评论
 上一篇
Java线程池源码分析二(ScheduledThreadPoolExecutor) Java线程池源码分析二(ScheduledThreadPoolExecutor)
线程池类图本篇文章我们从右边这条线Executor==>ExcutorService==>ScheduledExecutorService==>ScheduledThreadPoolExecutor来分析一下。
2016-05-13
下一篇 
成都游记 成都游记
Day1-宽窄巷子到了成都,酒店安排好后,就直接去了宽窄巷子,找了一个茶馆,边喝茶边看戏。值得一提的是,李天炜同学上去当了一次演员,冒着生命危险给我们赢得了一个果盘,只有他不知道😂。
2016-04-28
  目录