澳门新葡亰娱乐官网Java 并发包之线程池综述

澳门新葡亰娱乐官网 7

前言

多线程编程中,为每个任务分配一个线程是不现实的,线程创建的开销和资源消耗都是很高的。线程池应运而生,成为我们管理线程的利器。Java
通过Executor接口,提供了一种标准的方法将任务的提交过程和执行过程解耦开来,并用Runnable表示任务。

下面,我们来分析一下 Java 线程池框架的实现ThreadPoolExecutor

下面的分析基于JDK1.7

■ 线程池的创建

原文出处http://cmsblogs.com/
chenssy

生命周期

ThreadPoolExecutor中,使用CAPACITY的高3位来表示运行状态,分别是:

  1. RUNNING:接收新任务,并且处理任务队列中的任务
  2. SHUTDOWN:不接收新任务,但是处理任务队列的任务
  3. STOP:不接收新任务,不出来任务队列,同时中断所有进行中的任务
  4. TIDYING:所有任务已经被终止,工作线程数量为
    0,到达该状态会执行terminated()
  5. TERMINATED:terminated()执行完毕

澳门新葡亰娱乐官网 1

状态转换图

ThreadPoolExecutor中用原子类来表示状态位

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

   
在Java中,您可以通过调整-Xss参数来调节每个线程栈的大小(64bit系统默认1024KB),当减小该值时意味着可以创建更多的线程数,但问题是JVM资源是有限的,线程不能无限创建!

作为Executor框架中最核心的类,ThreadPoolExecutor代表着鼎鼎大名的线程池,它给了我们足够的理由来弄清楚它。

线程池模型

   
从笔者开发经验来看,线程池应该是并发包中使用频率和运用场景最多的并发框架,几乎所有并发/异步执行任务的需求都需要用到线程池,线程复用,以内部线程池的形式对外提供管理任务执行,线程调度,线程池管理等等服务。合理的使用线程池可以带来如下三个好处

下面我们就通过源码来一步一步弄清楚它。

核心参数

  • corePoolSize:最小存活的工作线程数量(如果设置allowCoreThreadTimeOut,那么该值为
    0)
  • maximumPoolSize:最大的线程数量,受限于CAPACITY
  • keepAliveTime:对应线程的存活时间,时间单位由TimeUnit指定
  • workQueue:工作队列,存储待执行的任务
  • RejectExecutionHandler:拒绝策略,线程池满后会触发

线程池的最大容量CAPACITY中的前三位用作标志位,也就是说工作线程的最大容量为(2^29)-1

      1.降低资源消耗:通过重用已创建的线程来降低线程创建和销毁的消耗

内部状态

线程有五种状态:新建,就绪,运行,阻塞,死亡,线程池同样有五种状态:Running,
SHUTDOWN, STOP, TIDYING, TERMINATED。

    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    private static final int COUNT_BITS = Integer.SIZE - 3;
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

    // runState is stored in the high-order bits
    private static final int RUNNING    = -1 << COUNT_BITS;
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    private static final int STOP       =  1 << COUNT_BITS;
    private static final int TIDYING    =  2 << COUNT_BITS;
    private static final int TERMINATED =  3 << COUNT_BITS;

    // Packing and unpacking ctl
    private static int runStateOf(int c)     { return c & ~CAPACITY; }
    private static int workerCountOf(int c)  { return c & CAPACITY; }
    private static int ctlOf(int rs, int wc) { return rs | wc; }

变量ctl定义为AtomicInteger
,其功能非常强大,记录了“线程池中的任务数量”和“线程池的状态”两个信息。共32位,其中高3位表示”线程池状态”,低29位表示”线程池中的任务数量”。

RUNNING            -- 对应的高3位值是111。
SHUTDOWN       -- 对应的高3位值是000。
STOP                   -- 对应的高3位值是001。
TIDYING              -- 对应的高3位值是010。
TERMINATED     -- 对应的高3位值是011。

RUNNING:处于RUNNING状态的线程池能够接受新任务,以及对新添加的任务进行处理。

SHUTDOWN:处于SHUTDOWN状态的线程池不可以接受新任务,但是可以对已添加的任务进行处理。

STOP:处于STOP状态的线程池不接收新任务,不处理已添加的任务,并且会中断正在处理的任务。

TIDYING:当所有的任务已终止,ctl记录的”任务数量”为0,线程池会变为TIDYING状态。当线程池变为TIDYING状态时,会执行钩子函数terminated()。terminated()在ThreadPoolExecutor类中是空的,若用户想在线程池变为TIDYING时,进行相应的处理;可以通过重载terminated()函数来实现。

TERMINATED:线程池彻底终止的状态。

各个状态的转换如下:

澳门新葡亰娱乐官网 2

线程池状态转换

四种模型

  • CachedThreadPool:一个可缓存的线程池,如果线程池的当前规模超过了处理需求时,那么将回收空闲的线程,当需求增加时,则可以添加新的线程,线程池的规模不存在任何的限制。
  • FixedThreadPool:一个固定大小的线程池,提交一个任务时就创建一个线程,直到达到线程池的最大数量,这时线程池的大小将不再变化。
  • SingleThreadPool:一个单线程的线程池,它只有一个工作线程来执行任务,可以确保按照任务在队列中的顺序来串行执行,如果这个线程异常结束将创建一个新的线程来执行任务。
  • ScheduledThreadPool:一个固定大小的线程池,并且以延迟或者定时的方式来执行任务,类似于Timer。

      2.提高响应速度:任务到达时不需要等待线程创建就可以立即执行

创建线程池

我们可以通过ThreadPoolExecutor构造函数来创建一个线程池:

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

共有七个参数,每个参数含义如下:

corePoolSize

线程池中核心线程的数量。当提交一个任务时,线程池会新建一个线程来执行任务,直到当前线程数等于corePoolSize。如果调用了线程池的prestartAllCoreThreads()方法,线程池会提前创建并启动所有基本线程。

maximumPoolSize

线程池中允许的最大线程数。线程池的阻塞队列满了之后,如果还有任务提交,如果当前的线程数小于maximumPoolSize,则会新建线程来执行任务。注意,如果使用的是无界队列,该参数也就没有什么效果了。

keepAliveTime

线程空闲的时间。线程的创建和销毁是需要代价的。线程执行完任务后不会立即销毁,而是继续存活一段时间:keepAliveTime。默认情况下,该参数只有在线程数大于corePoolSize时才会生效。

unit

keepAliveTime的单位。TimeUnit

workQueue

用来保存等待执行的任务的阻塞队列,等待的任务必须实现Runnable接口。我们可以选择如下几种:

  • ArrayBlockingQueue:基于数组结构的有界阻塞队列,FIFO。【死磕Java并发】—-J.U.C之阻塞队列:ArrayBlockingQueue
  • LinkedBlockingQueue:基于链表结构的有界阻塞队列,FIFO。
  • SynchronousQueue:不存储元素的阻塞队列,每个插入操作都必须等待一个移出操作,反之亦然。【死磕Java并发】—-J.U.C之阻塞队列:SynchronousQueue
  • PriorityBlockingQueue:具有优先界别的阻塞队列。【死磕Java并发】—-J.U.C之阻塞队列:PriorityBlockingQueue

threadFactory

用于设置创建线程的工厂。该对象可以通过Executors.defaultThreadFactory(),如下:

    public static ThreadFactory defaultThreadFactory() {
        return new DefaultThreadFactory();
    }

返回的是DefaultThreadFactory对象,源码如下:

    static class DefaultThreadFactory implements ThreadFactory {
        private static final AtomicInteger poolNumber = new AtomicInteger(1);
        private final ThreadGroup group;
        private final AtomicInteger threadNumber = new AtomicInteger(1);
        private final String namePrefix;

        DefaultThreadFactory() {
            SecurityManager s = System.getSecurityManager();
            group = (s != null) ? s.getThreadGroup() :
                                  Thread.currentThread().getThreadGroup();
            namePrefix = "pool-" +
                          poolNumber.getAndIncrement() +
                         "-thread-";
        }

        public Thread newThread(Runnable r) {
            Thread t = new Thread(group, r,
                                  namePrefix + threadNumber.getAndIncrement(),
                                  0);
            if (t.isDaemon())
                t.setDaemon(false);
            if (t.getPriority() != Thread.NORM_PRIORITY)
                t.setPriority(Thread.NORM_PRIORITY);
            return t;
        }
    }

ThreadFactory的左右就是提供创建线程的功能的线程工厂。他是通过newThread()方法提供创建线程的功能,newThread()方法创建的线程都是“非守护线程”而且“线程优先级都是Thread.NORM_PRIORITY”。

handler

RejectedExecutionHandler,线程池的拒绝策略。所谓拒绝策略,是指将任务添加到线程池中时,线程池拒绝该任务所采取的相应策略。当向线程池中提交任务时,如果此时线程池中的线程已经饱和了,而且阻塞队列也已经满了,则线程池会选择一种拒绝策略来处理该任务。

线程池提供了四种拒绝策略:

  1. AbortPolicy:直接抛出异常,默认策略;
  2. CallerRunsPolicy:用调用者所在的线程来执行任务;
  3. DiscardOldestPolicy:丢弃阻塞队列中靠最前的任务,并执行当前任务;
  4. DiscardPolicy:直接丢弃任务;
    当然我们也可以实现自己的拒绝策略,例如记录日志等等,实现RejectedExecutionHandler接口即可。

执行任务 execute

核心逻辑:

  1. 当前线程数量
    corePoolSize,直接开启新的核心线程执行任务addWorker(command, true)
  2. 当前线程数量 >= corePoolSize,且任务加入工作队列成功
    1. 检查线程池当前状态是否处于RUNNING
    2. 如果否,则拒绝该任务
    3. 如果是,判断当前线程数量是否为 0,如果为 0,就增加一个工作线程。
  3. 开启普通线程执行任务addWorker(command, false),开启失败就拒绝该任务

从上面的分析可以总结出线程池运行的四个阶段:

  1. poolSize < corePoolSize 且队列为空,此时会新建线程来处理提交的任务
  2. poolSize == corePoolSize,此时提交的任务进入工作队列,工作线程从队列中获取任务执行,此时队列不为空且未满。
  3. poolSize == corePoolSize,并且队列已满,此时也会新建线程来处理提交的任务,但是poolSize < maxPoolSize
  4. poolSize == maxPoolSize,并且队列已满,此时会触发拒绝策略

      3.提高线程的可管理性:线程池可以统一管理、分配、调优和监控

线程池

Executor框架提供了三种线程池,他们都可以通过工具类Executors来创建。

FixedThreadPool

FixedThreadPool,可重用固定线程数的线程池,其定义如下:

    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

corePoolSize 和
maximumPoolSize都设置为创建FixedThreadPool时指定的参数nThreads,意味着当线程池满时且阻塞队列也已经满时,如果继续提交任务,则会直接走拒绝策略,该线程池不会再新建线程来执行任务,而是直接走拒绝策略。FixedThreadPool使用的是默认的拒绝策略,即AbortPolicy,则直接抛出异常。

keepAliveTime设置为0L,表示空闲的线程会立刻终止。

workQueue则是使用LinkedBlockingQueue,但是没有设置范围,那么则是最大值(Integer.MAX_VALUE),这基本就相当于一个无界队列了。使用该“无界队列”则会带来哪些影响呢?当线程池中的线程数量等于corePoolSize
时,如果继续提交任务,该任务会被添加到阻塞队列workQueue中,当阻塞队列也满了之后,则线程池会新建线程执行任务直到maximumPoolSize。由于FixedThreadPool使用的是“无界队列”LinkedBlockingQueue,那么maximumPoolSize参数无效,同时指定的拒绝策略AbortPolicy也将无效。而且该线程池也不会拒绝提交的任务,如果客户端提交任务的速度快于任务的执行,那么keepAliveTime也是一个无效参数。

其运行图如下(参考《Java并发编程的艺术》):

澳门新葡亰娱乐官网 3

FixedThreadPool

SingleThreadExecutor

SingleThreadExecutor是使用单个worker线程的Executor,定义如下:

    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }

作为单一worker线程的线程池,SingleThreadExecutor把corePool和maximumPoolSize均被设置为1,和FixedThreadPool一样使用的是无界队列LinkedBlockingQueue,所以带来的影响和FixedThreadPool一样。

澳门新葡亰娱乐官网 4

SingleThreadExecutor

CachedThreadPool

CachedThreadPool是一个会根据需要创建新线程的线程池 ,他定义如下:

    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

CachedThreadPool的corePool为0,maximumPoolSize为Integer.MAX_VALUE,这就意味着所有的任务一提交就会加入到阻塞队列中。keepAliveTime这是为60L,unit设置为TimeUnit.SECONDS,意味着空闲线程等待新任务的最长时间为60秒,空闲线程超过60秒后将会被终止。阻塞队列采用的SynchronousQueue,而我们在【死磕Java并发】—-J.U.C之阻塞队列:SynchronousQueue中了解到SynchronousQueue是一个没有元素的阻塞队列,加上corePool
= 0 ,maximumPoolSize =
Integer.MAX_VALUE,这样就会存在一个问题,如果主线程提交任务的速度远远大于CachedThreadPool的处理速度,则CachedThreadPool会不断地创建新线程来执行任务,这样有可能会导致系统耗尽CPU和内存资源,所以在使用该线程池是,一定要注意控制并发的任务数,否则创建大量的线程可能导致严重的性能问题

澳门新葡亰娱乐官网 5

CachedThreadPool

拒绝策略

前面我们提到任务无法执行会被拒绝,RejectedExecutionHandler是处理被拒绝任务的接口。下面是四种拒绝策略。

  • AbortPolicy:默认策略,终止任务,抛出RejectedException
  • CallerRunsPolicy澳门新葡亰娱乐官网,:在调用者线程执行当前任务,不抛异常
  • DiscardPolicy: 抛弃策略,直接丢弃任务,不抛异常
  • DiscardOldersPolicy:抛弃最老的任务,执行当前任务,不抛异常

 

任务提交

线程池根据业务不同的需求提供了两种方式提交任务:Executor.execute()、ExecutorService.submit()。其中ExecutorService.submit()可以获取该任务执行的Future。
我们以Executor.execute()为例,来看看线程池的任务提交经历了那些过程。

定义:

public interface Executor {

    void execute(Runnable command);
}

ThreadPoolExecutor提供实现:

    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }

执行流程如下:

  1. 如果线程池当前线程数小于corePoolSize,则调用addWorker创建新线程执行任务,成功返回true,失败执行步骤2。
  2. 如果线程池处于RUNNING状态,则尝试加入阻塞队列,如果加入阻塞队列成功,则尝试进行Double
    Check,如果加入失败,则执行步骤3。
  3. 如果线程池不是RUNNING状态或者加入阻塞队列失败,则尝试创建新线程直到maxPoolSize,如果失败,则调用reject()方法运行相应的拒绝策略。

在步骤2中如果加入阻塞队列成功了,则会进行一个Double Check的过程。Double
Check过程的主要目的是判断加入到阻塞队里中的线程是否可以被执行。如果线程池不是RUNNING状态,则调用remove()方法从阻塞队列中删除该任务,然后调用reject()方法处理任务。否则需要确保还有线程执行。

addWorker
当线程中的当前线程数小于corePoolSize,则调用addWorker()创建新线程执行任务,当前线程数则是根据ctl变量来获取的,调用workerCountOf(ctl)获取低29位即可:

    private static int workerCountOf(int c)  { return c & CAPACITY; }

addWorker(Runnable firstTask, boolean
core)方法用于创建线程执行任务,源码如下:

    private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();

            // 获取当前线程状态
            int rs = runStateOf(c);


            if (rs >= SHUTDOWN &&
                    ! (rs == SHUTDOWN &&
                            firstTask == null &&
                            ! workQueue.isEmpty()))
                return false;

            // 内层循环,worker + 1
            for (;;) {
                // 线程数量
                int wc = workerCountOf(c);
                // 如果当前线程数大于线程最大上限CAPACITY  return false
                // 若core == true,则与corePoolSize 比较,否则与maximumPoolSize ,大于 return false
                if (wc >= CAPACITY ||
                        wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                // worker + 1,成功跳出retry循环
                if (compareAndIncrementWorkerCount(c))
                    break retry;

                // CAS add worker 失败,再次读取ctl
                c = ctl.get();

                // 如果状态不等于之前获取的state,跳出内层循环,继续去外层循环判断
                if (runStateOf(c) != rs)
                    continue retry;
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {

            // 新建线程:Worker
            w = new Worker(firstTask);
            // 当前线程
            final Thread t = w.thread;
            if (t != null) {
                // 获取主锁:mainLock
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {

                    // 线程状态
                    int rs = runStateOf(ctl.get());

                    // rs < SHUTDOWN ==> 线程处于RUNNING状态
                    // 或者线程处于SHUTDOWN状态,且firstTask == null(可能是workQueue中仍有未执行完成的任务,创建没有初始任务的worker线程执行)
                    if (rs < SHUTDOWN ||
                            (rs == SHUTDOWN && firstTask == null)) {

                        // 当前线程已经启动,抛出异常
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();

                        // workers是一个HashSet<Worker>
                        workers.add(w);

                        // 设置最大的池大小largestPoolSize,workerAdded设置为true
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    // 释放锁
                    mainLock.unlock();
                }
                // 启动线程
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {

            // 线程启动失败
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }
  1. 判断当前线程是否可以添加任务,如果可以则进行下一步,否则return
    false;
  2. rs >= SHUTDOWN ,表示当前线程处于SHUTDOWN
    ,STOP、TIDYING、TERMINATED状态
  3. rs == SHUTDOWN , firstTask !=
    null时不允许添加线程,因为线程处于SHUTDOWN 状态,不允许添加任务
  4. rs == SHUTDOWN , firstTask == null,但workQueue.isEmpty() ==
    true,不允许添加线程,因为firstTask ==
    null是为了添加一个没有任务的线程然后再从workQueue中获取任务的,如果workQueue
    == null,则说明添加的任务没有任何意义。
  5. 内嵌循环,通过CAS worker + 1
  6. 获取主锁mailLock,如果线程池处于RUNNING状态获取处于SHUTDOWN状态且
    firstTask == null,则将任务添加到workers
    Queue中,然后释放主锁mainLock,然后启动线程,然后return
    true,如果中途失败导致workerStarted=
    false,则调用addWorkerFailed()方法进行处理。

在这里需要好好理论addWorker中的参数,在execute()方法中,有三处调用了该方法:
第一次:workerCountOf(c) < corePoolSize ==> addWorker(command,
true),这个很好理解,当然线程池的线程数量小于 corePoolSize
,则新建线程执行任务即可,在执行过程core ==
true,内部与corePoolSize比较即可。
第二次:加入阻塞队列进行Double Check时,else if (workerCountOf(recheck)
== 0) ==>addWorker(null,
false)。如果线程池中的线程==0,按照道理应该该任务应该新建线程执行任务,但是由于已经该任务已经添加到了阻塞队列,那么就在线程池中新建一个空线程,然后从阻塞队列中取线程即可。
第三次:线程池不是RUNNING状态或者加入阻塞队列失败:else if
(!addWorker(command, false)),这里core ==
fase,则意味着是与maximumPoolSize比较。

在新建线程执行任务时,将讲Runnable包装成一个Worker,Woker为ThreadPoolExecutor的内部类

Woker内部类

Woker的源码如下:

    private final class Worker extends AbstractQueuedSynchronizer
            implements Runnable {
        private static final long serialVersionUID = 6138294804551838833L;

        // task 的thread
        final Thread thread;

        // 运行的任务task
        Runnable firstTask;

        volatile long completedTasks;

        Worker(Runnable firstTask) {

            //设置AQS的同步状态private volatile int state,是一个计数器,大于0代表锁已经被获取
            setState(-1);
            this.firstTask = firstTask;

            // 利用ThreadFactory和 Worker这个Runnable创建的线程对象
            this.thread = getThreadFactory().newThread(this);
        }

        // 任务执行
        public void run() {
            runWorker(this);
        }

    }

从Worker的源码中我们可以看到Woker继承AQS,实现Runnable接口,所以可以认为Worker既是一个可以执行的任务,也可以达到获取锁释放锁的效果。这里继承AQS主要是为了方便线程的中断处理。这里注意两个地方:构造函数、run()。构造函数主要是做三件事:1.设置同步状态state为-1,同步状态大于0表示就已经获取了锁,2.设置将当前任务task设置为firstTask,3.利用Worker本身对象this和ThreadFactory创建线程对象。

当线程thread启动(调用start()方法)时,其实就是执行Worker的run()方法,内部调用runWorker()。

runWorker

    final void runWorker(Worker w) {

        // 当前线程
        Thread wt = Thread.currentThread();

        // 要执行的任务
        Runnable task = w.firstTask;

        w.firstTask = null;

        // 释放锁,运行中断
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            while (task != null || (task = getTask()) != null) {
                // worker 获取锁
                w.lock();

                // 确保只有当线程是stoping时,才会被设置为中断,否则清楚中断标示
                // 如果线程池状态 >= STOP ,且当前线程没有设置中断状态,则wt.interrupt()
                // 如果线程池状态 < STOP,但是线程已经中断了,再次判断线程池是否 >= STOP,如果是 wt.interrupt()
                if ((runStateAtLeast(ctl.get(), STOP) ||
                        (Thread.interrupted() &&
                                runStateAtLeast(ctl.get(), STOP))) &&
                        !wt.isInterrupted())
                    wt.interrupt();
                try {
                    // 自定义方法
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        // 执行任务
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    // 完成任务数 + 1
                    w.completedTasks++;
                    // 释放锁
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }

运行流程

  1. 根据worker获取要执行的任务task,然后调用unlock()方法释放锁,这里释放锁的主要目的在于中断,因为在new
    Worker时,设置的state为-1,调用unlock()方法可以将state设置为0,这里主要原因就在于interruptWorkers()方法只有在state >=
    0时才会执行;
  2. 通过getTask()获取执行的任务,调用task.run()执行,当然在执行之前会调用worker.lock()上锁,执行之后调用worker.unlock()放锁;
  3. 在任务执行前后,可以根据业务场景自定义beforeExecute() 和
    afterExecute()方法,则两个方法在ThreadPoolExecutor中是空实现;
  4. 如果线程执行完成,则会调用getTask()方法从阻塞队列中获取新任务,如果阻塞队列为空,则根据是否超时来判断是否需要阻塞;
  5. task ==
    null或者抛出异常(beforeExecute()、task.run()、afterExecute()均有可能)导致worker线程终止,则调用processWorkerExit()方法处理worker退出流程。

getTask()

    private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?

        for (;;) {

            // 线程池状态
            int c = ctl.get();
            int rs = runStateOf(c);

            // 线程池中状态 >= STOP 或者 线程池状态 == SHUTDOWN且阻塞队列为空,则worker - 1,return null
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }

            int wc = workerCountOf(c);

            // 判断是否需要超时控制
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

            if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }

            try {

                // 从阻塞队列中获取task
                // 如果需要超时控制,则调用poll(),否则调用take()
                Runnable r = timed ?
                        workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                        workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }

timed ==

线程池中的 Worker

Worker继承了AbstractQueuedSynchronizerRunnable,前者给Worker提供锁的功能,后者执行工作线程的主要方法runWorker(Worker w)(从任务队列捞任务执行)。Worker
引用存在workers集合里面,用mainLock守护。

private final ReentrantLock mainLock = new ReentrantLock();
private final HashSet<Worker> workers = new HashSet<Worker>();

■ ThreadPoolExecutor —— 线程池最核心的类

true,调用poll()方法,如果在keepAliveTime时间内还没有获取task的话,则返回null,继续循环。timed

false,则调用take()方法,该方法为一个阻塞方法,没有任务时会一直阻塞挂起,直到有任务加入时对该线程唤醒,返回任务。

在runWorker()方法中,无论最终结果如何,都会执行processWorkerExit()方法对worker进行退出处理。

processWorkerExit()

    private void processWorkerExit(Worker w, boolean completedAbruptly) {

        // true:用户线程运行异常,需要扣减
        // false:getTask方法中扣减线程数量
        if (completedAbruptly)
            decrementWorkerCount();

        // 获取主锁
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            completedTaskCount += w.completedTasks;
            // 从HashSet中移出worker
            workers.remove(w);
        } finally {
            mainLock.unlock();
        }

        // 有worker线程移除,可能是最后一个线程退出需要尝试终止线程池
        tryTerminate();

        int c = ctl.get();
        // 如果线程为running或shutdown状态,即tryTerminate()没有成功终止线程池,则判断是否有必要一个worker
        if (runStateLessThan(c, STOP)) {
            // 正常退出,计算min:需要维护的最小线程数量
            if (!completedAbruptly) {
                // allowCoreThreadTimeOut 默认false:是否需要维持核心线程的数量
                int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
                // 如果min ==0 或者workerQueue为空,min = 1
                if (min == 0 && ! workQueue.isEmpty())
                    min = 1;

                // 如果线程数量大于最少数量min,直接返回,不需要新增线程
                if (workerCountOf(c) >= min)
                    return; // replacement not needed
            }
            // 添加一个没有firstTask的worker
            addWorker(null, false);
        }
    }

核心函数 runWorker

下面是简化的逻辑,注意:每个工作线程的run都执行下面的函数

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    while (task != null || (task = getTask()) != null) {
        w.lock();
        beforeExecute(wt, task);
        task.run();
        afterExecute(task, thrown);
        w.unlock();
    }
    processWorkerExit(w, completedAbruptly);
}
  1. getTask()中获取任务
  2. 锁住 worker
  3. 执行beforeExecute(wt, task),这是ThreadPoolExecutor提供给子类的扩展方法
  4. 运行任务,如果该worker有配置了首次任务,则先执行首次任务且只执行一次。
  5. 执行afterExecute(task, thrown);
  6. 解锁 worker
  7. 如果获取到的任务为 null,关闭 worker

 - 类定义: 实现了 AbstractExecutorService
类,ExecutorService,Executor 接口

首先completedAbruptly的值来判断是否需要对线程数-1处理,如果completedAbruptly

true,说明在任务运行过程中出现了异常,那么需要进行减1处理,否则不需要,因为减1处理在getTask()方法中处理了。然后从HashSet中移出该worker,过程需要获取mainlock。然后调用tryTerminate()方法处理,该方法是对最后一个线程退出做终止线程池动作。如果线程池没有终止,那么线程池需要保持一定数量的线程,则通过addWorker(null,false)新增一个空的线程。

addWorkerFailed()

在addWorker()方法中,如果线程t==null,或者在add过程出现异常,会导致workerStarted
== false,那么在最后会调用addWorkerFailed()方法:

    private void addWorkerFailed(Worker w) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            // 从HashSet中移除该worker
            if (w != null)
                workers.remove(w);

            // 线程数 - 1
            decrementWorkerCount();
            // 尝试终止线程
            tryTerminate();
        } finally {
            mainLock.unlock();
        }
    }

整个逻辑显得比较简单。

tryTerminate()

当线程池涉及到要移除worker时候都会调用tryTerminate(),该方法主要用于判断线程池中的线程是否已经全部移除了,如果是的话则关闭线程池。

    final void tryTerminate() {
        for (;;) {
            int c = ctl.get();
            // 线程池处于Running状态
            // 线程池已经终止了
            // 线程池处于ShutDown状态,但是阻塞队列不为空
            if (isRunning(c) ||
                    runStateAtLeast(c, TIDYING) ||
                    (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
                return;

            // 执行到这里,就意味着线程池要么处于STOP状态,要么处于SHUTDOWN且阻塞队列为空
            // 这时如果线程池中还存在线程,则会尝试中断线程
            if (workerCountOf(c) != 0) {
                // /线程池还有线程,但是队列没有任务了,需要中断唤醒等待任务的线程
                // (runwoker的时候首先就通过w.unlock设置线程可中断,getTask最后面的catch处理中断)
                interruptIdleWorkers(ONLY_ONE);
                return;
            }

            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // 尝试终止线程池
                if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                    try {
                        terminated();
                    } finally {
                        // 线程池状态转为TERMINATED
                        ctl.set(ctlOf(TERMINATED, 0));
                        termination.signalAll();
                    }
                    return;
                }
            } finally {
                mainLock.unlock();
            }
        }
    }

在关闭线程池的过程中,如果线程池处于STOP状态或者处于SHUDOWN状态且阻塞队列为null,则线程池会调用interruptIdleWorkers()方法中断所有线程,注意ONLY_ONE==
true,表示仅中断一个线程。

interruptIdleWorkers

    private void interruptIdleWorkers(boolean onlyOne) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (Worker w : workers) {
                Thread t = w.thread;
                if (!t.isInterrupted() && w.tryLock()) {
                    try {
                        t.interrupt();
                    } catch (SecurityException ignore) {
                    } finally {
                        w.unlock();
                    }
                }
                if (onlyOne)
                    break;
            }
        } finally {
            mainLock.unlock();
        }
    }

onlyOne==true仅终止一个线程,否则终止所有线程。

获取任务 getTask

线程池内部的任务队列是一个阻塞队列,具体实现在构造时传入。

private final BlockingQueue<Runnable> workQueue;

getTask()从任务队列中获取任务,支持阻塞和超时等待任务,四种情况会导致返回null,让worker关闭。

  1. 现有的线程数量超过最大线程数量
  2. 线程池处于STOP状态
  3. 线程池处于SHUTDOWN状态且工作队列为空
  4. 线程等待任务超时,且线程数量超过保留线程数量

核心逻辑:根据timed在阻塞队列上超时等待或者阻塞等待任务,等待任务超时会导致工作线程被关闭。

timed = allowCoreThreadTimeOut || wc > corePoolSize;
Runnable r = timed ?
    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
    workQueue.take();

在以下两种情况下等待任务会超时:

  1. 允许核心线程等待超时,即allowCoreThreadTimeOut(true)
  2. 当前线程是普通线程,此时wc > corePoolSize

工作队列使用的是BlockingQueue,这里就不展开了,后面再写一篇详细的分析。

public class ThreadPoolExecutor extends AbstractExecutorService implements ExecutorService,Executor {

线程终止

线程池ThreadPoolExecutor提供了shutdown()和shutDownNow()用于关闭线程池。

shutdown():按过去执行已提交任务的顺序发起一个有序的关闭,但是不接受新任务。

shutdownNow()
:尝试停止所有的活动执行任务、暂停等待任务的处理,并返回等待执行的任务列表。

shutdown

    public void shutdown() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess();
            // 推进线程状态
            advanceRunState(SHUTDOWN);
            // 中断空闲的线程
            interruptIdleWorkers();
            // 交给子类实现
            onShutdown();
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
    }

shutdownNow

    public List<Runnable> shutdownNow() {
        List<Runnable> tasks;
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess();
            advanceRunState(STOP);
            // 中断所有线程
            interruptWorkers();
            // 返回等待执行的任务列表
            tasks = drainQueue();
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
        return tasks;
    }

与shutdown不同,shutdownNow会调用interruptWorkers()方法中断所有线程。

    private void interruptWorkers() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (Worker w : workers)
                w.interruptIfStarted();
        } finally {
            mainLock.unlock();
        }
    }

同时会调用drainQueue()方法返回等待执行到任务列表。

    private List<Runnable> drainQueue() {
        BlockingQueue<Runnable> q = workQueue;
        ArrayList<Runnable> taskList = new ArrayList<Runnable>();
        q.drainTo(taskList);
        if (!q.isEmpty()) {
            for (Runnable r : q.toArray(new Runnable[0])) {
                if (q.remove(r))
                    taskList.add(r);
            }
        }
        return taskList;
    }

欢迎扫一扫我的公众号关注 — 及时得到博客订阅哦!

澳门新葡亰娱乐官网 6

个人微信公众号

总结

  • ThreadPoolExecutor基于生产者-消费者模式,提交任务的操作相当于生产者,执行任务的线程相当于消费者。
  • Executors提供了四种基于ThreadPoolExecutor构造线程池模型的方法,除此之外,我们还可以直接继承ThreadPoolExecutor,重写beforeExecuteafterExecute方法来定制线程池任务执行过程。
  • 使用有界队列还是无界队列需要根据具体情况考虑,工作队列的大小和线程的数量也是需要好好考虑的。
  • 拒绝策略推荐使用CallerRunsPolicy,该策略不会抛弃任务,也不会抛出异常,而是将任务回退到调用者线程中执行。

 -
构造器:通过观察每个构造器的源码具体实现,发现前面三个构造器都是调用的第四个构造器进行的初始化工作

/**
 * 线程工厂默认为DefaultThreadFactory
 * 饱和策略默认为AbortPolicy
 */
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
        Executors.defaultThreadFactory(), defaultHandler);
}
/**
 * 线程工厂可配置
 * 饱和策略默认为AbortPolicy
 */    
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
        threadFactory, defaultHandler);
}   
/**
 * 线程工厂默认为DefaultThreadFactory
 * 饱和策略可配置
 */
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          RejectedExecutionHandler handler) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
        Executors.defaultThreadFactory(), handler);
}
/**
 * 线程工厂可配置
 * 饱和策略可配置
 */
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.acc = System.getSecurityManager() == null ?
                null :  AccessController.getContext();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}

   – 重要变量

//线程池控制器
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
//任务队列
private final BlockingQueue<Runnable> workQueue;
//全局锁
private final ReentrantLock mainLock = new ReentrantLock();
//工作线程集合
private final HashSet<Worker> workers = new HashSet<Worker>();
//终止条件 - 用于等待任务完成后才终止线程池
private final Condition termination = mainLock.newCondition();
//曾创建过的最大线程数
private int largestPoolSize;
//线程池已完成总任务数
private long completedTaskCount;
//工作线程创建工厂
private volatile ThreadFactory threadFactory;
//饱和拒绝策略执行器
private volatile RejectedExecutionHandler handler;
//工作线程活动保持时间(超时后会被回收) - 纳秒
private volatile long keepAliveTime;
/**
 * 允许核心工作线程响应超时回收
 * false:核心工作线程即使空闲超时依旧存活
 * true:核心工作线程一旦超过keepAliveTime仍然空闲就被回收
 */
private volatile boolean allowCoreThreadTimeOut;
//核心工作线程数
private volatile int corePoolSize;
//最大工作线程数
private volatile int maximumPoolSize;
//默认饱和策略执行器 - AbortPolicy -> 直接抛出异常
private static final RejectedExecutionHandler defaultHandler =
        new AbortPolicy();

 

■ ThreadPoolExecutor 的使用

    –
创建线城池实际上就是实例化一个线程池对象,这里我们使用最完整的构造器来描述最完整的创建过程:

     
1. corePoolSize(核心工作线程数):无任务时,线程池允许(维护)的最小空闲线程池数;当一个任务被提交到线程池就新建一个工作线程来执行任务(即使此时有空闲的核心工作线程)直到(实际工作线程数
>= 核心工作线程数)为止;调用
prestartAllCoreThreads()方法会提前创建并启动所有核心工作线程

     
2. maximumPoolSize(最大工作线程数):线程池允许创建的最大工作线程数;当(队列已满
&& 实际工作线程数 <
最大工作线程数)时,线程池会创建新的工作线程(即使此时仍有空闲的工作线程)执行任务直到最大工作线程数为止;设置无界队列时该参数其实无效

     
3. keepAliveTime(工作线程最大空闲时间):单位纳秒,满足超时条件且空闲的工作线程会被回收;超时的非核心工作线程会被回收,核心工作线程不会被回收;当allowCoreThreadTimeOut=true
时,则超时的核心工作线程也会被回收;若该值没有设置则线程会永远存活;建议当场景为任务短而多时,可以调高时间以提高线程利用率

      4. unit(线程活动保持时间单位):
线程活动保持时间单位,可选的包括NANOSECONDS纳秒、MICROSECONDS微秒、MILLISECONDS毫秒、SECONDS秒、MINUTES分、HOURS时、DAYS天

      5. workQueue(任务队列): 用来保存等待执行的任务的阻塞队列;当
(实际工作线程数 >= 核心工作线程数) && (任务数 <
任务队列长度)时,任务会offer()入队等待;关于任务队列详见下文的任务队列与排队策略

      6. threadFactory(线程创建工厂):
顾名思义,就是用于创建线程的工厂,允许自定义创建工厂,可以线程进行初始化配置,比如名字、守护线程、异常处理等等

      7. handler(饱和策略执行器):
当线程池和队列都已满,此时说明线程已无力再接收更多的任务,即任务数饱和,没法接单了;此时需要使用一种饱和策略处理新提交的任务,默认是Abort(直抛Reject异常),还包括Discard(LIFO规则丢弃)、DiscardOldest(LRU规则丢弃)
以及 CallerRuns(调用者线程执行),允许自定义执行器

 

     从上面给出的 ThreadPoolExecutor
类的代码可以知道,ThreadPoolExecutor 继承了
AbstractExecutorService,我们来看一下 AbstractExecutorService
的实现:

public abstract class AbstractExecutorService implements ExecutorService {
    protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) { };
    protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { };

    public Future<?> submit(Runnable task) {};
    public <T> Future<T> submit(Runnable task, T result) { };
    public <T> Future<T> submit(Callable<T> task) { };

    private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,
                            boolean timed, long nanos)
        throws InterruptedException, ExecutionException, TimeoutException {
    };
    public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
        throws InterruptedException, ExecutionException {
    };
    public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                           long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException {
    };

    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
        throws InterruptedException {
    };
    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                         long timeout, TimeUnit unit)
        throws InterruptedException {
    };
}

     AbstractExecutorService 是一个抽象类,它实现了ExecutorService
接口:

public interface ExecutorService extends Executor {
    void shutdown();
    boolean isShutdown();
    boolean isTerminated();

    boolean awaitTermination(long timeout, TimeUnit unit)
        throws InterruptedException;

    <T> Future<T> submit(Callable<T> task);
    <T> Future<T> submit(Runnable task, T result);
    Future<?> submit(Runnable task);

    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
        throws InterruptedException;
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                  long timeout, TimeUnit unit)
        throws InterruptedException;

    <T> T invokeAny(Collection<? extends Callable<T>> tasks)
        throws InterruptedException, ExecutionException;
    <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                    long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

    而ExecutorService 又是继承了Executor 顶层接口:

public interface Executor {
    void execute(Runnable command);
}

 

    – 提交、执行和关闭任务 (重要方法)

  1. execute():  适用于提交无须返回值的任务
      – 该方法是无法判断任务是否被线程池执行成功

  2. submit():  适用于提交需要返回值的任务
      -可以通过返回的Future对象得知任务是否已经执行成功

      –get()
方法会阻塞当前线程直到任务完成,但要注意防范无限阻塞!!!

      -使用 get(long timeout,TimeUnit unit)
方法会阻塞当前线程直到任务完成或超时,不会有无限阻塞的发生但需要注意超时后任务可能还没完成!!!

       3.  shutdown()
有序地关闭线程池,已提交的任务会被执行(包含正在执行和任务队列中的),但会拒绝新任务

            shutdownNow():
立即(尝试)停止执行所有任务(包含正在执行和任务队列中的),并返回待执行任务列表

 

 ■ ThreadPoolExecutor 实现原理

    – 流程图

澳门新葡亰娱乐官网 7

 

    – 线程池的状态 

  线程状态的流转遵循如下顺序,即由小到大顺序排列:
  RUNNING -> SHUTDOWN -> STOP -> TIDYING -> TERMINATED

    * 补充:数值的变迁感觉就好比我们的年龄,越大离上帝就越近 = =

//线程池状态控制器,用于保证线程池状态和工作线程数 ps:低29位为工作线程数量,高3位为线程池状态
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
//设定偏移量 Integer.SIZE = 32 -> 即COUNT_BITS = 29
private static final int COUNT_BITS = Integer.SIZE - 3;
//确定最大的容量2^29-1
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;
//获取线程池状态,取高3位
private static int runStateOf(int c)     { return c & ~CAPACITY; }
//获取工作线程数量,取低29位
private static int workerCountOf(int c)  { return c & CAPACITY; }
/** 
 * 获取线程池状态控制器
 * @param rs 表示runState 线程池状态
 * @param wc 表示workerCount 工作线程数量
 */
private static int ctlOf(int rs, int wc) { return rs | wc; }

  这里补充一点二进制运算符基础知识方便忘却的读者理解一下:
    &:与运算符,同位都为1才为1,否则为0

     |:或运算符,同位有一个为1即为1,否则为0

    ~:非运算符,0和1互换,即若是0变成1,1则变成0

    ^:异或运算符,同位相同则为0,不同则为1

 

     – 工人生产(生产者与消费者模式)

    之前每个变量的作用都已经标明出来了,这里通过实例展示其作用:

/** 
  假如有一个工厂,工厂里面有10个工人,每个工人同时只能做一件任务。
  因此只要当10个工人中有工人是空闲的,来了任务就分配给空闲的工人做;

  当10个工人都有任务在做时,如果还来了任务,就把任务进行排队等待;
  如果说新任务数目增长的速度远远大于工人做任务的速度,那么此时工厂主管可能会想补救措施,比如重新招4个临时工人进来;
  然后就将任务也分配给这4个临时工人做;

  如果说着14个工人做任务的速度还是不够,此时工厂主管可能就要考虑不再接收新的任务或者抛弃前面的一些任务了。
  当这14个工人当中有人空闲时,而新任务增长的速度又比较缓慢,工厂主管可能就考虑辞掉4个临时工了,只保持原来的10个工人,毕竟请额外的工人是要花钱的。
**/

      那么我们知道其实线程就相当于工人,所以我们来看下线程池的内部类
Worker:

  1. 继承AQS类:
    实现简单的不可重入互斥锁,以提供便捷的锁操作,目的用于处理中断情况
  2. 实现Runnable接口:
    “投机取巧”的设计,主要是借用Runnable接口的统一写法,好处是不用重新写一个同功能接口
  3. 工作线程:
    Worker会通过thread变量绑定一个真正执行任务的工作线程(一对一),初始化时就由线程工厂分配好,它会反复地获取和执行任务
  4. 任务:
    Worker每次都会将新任务赋值给firstTask变量,工作线程每次通过该变量处理新获取到的任务(初始化时该值允许为null,有特殊作用,下文会详述)

    /*
      Worker类封装了 ( 锁 + 线程 + 任务 ) 这三个部分,从而成为了一个多面手的存在
    /

    private final class Worker

        extends AbstractQueuedSynchronizer
        implements Runnable{
    /** 实际上真正的工作线程 - 幕后大佬,但可能因线程工厂创建失败而为null */
    final Thread thread;
    /** 待执行任务,可能为null */
    Runnable firstTask;
    /** 该工作线程已完成的任务数 -- 论KPI的重要性 */
    volatile long completedTasks;        
    Worker(Runnable firstTask) {
        //设置锁状态为-1,目的是为了阻止在runWorker()之前被中断
        setState(-1); 
        /**
         * 新任务,任务来源有两个:
         *   1.调用addWorker()方法新建线程时传入的第一个任务
         *   2.调用runWorker()方法时内部循环调用getTask() -- 这就是线程复用的具现
         */
        this.firstTask = firstTask;
        /**
         * 创建一个新的线程 -> 这个是真正的工作线程
         * 注意Worker本身就是个Runnable对象
         * 因此newThread(this)中的this也是个Runnable对象
         */
        this.thread = getThreadFactory().newThread(this);
    }
    

    }

     – 执行任务

/**
 * 工作线程运行
 * runWorker方法内部会通过轮询的方式
 * 不停地获取任务和执行任务直到线程被回收
 */
public void run() {
    runWorker(this);
}

     (重点)
这里简单介绍一下线程在线程池执行任务的工作流程

  1.工作线程开始执行前,需先对worker加锁,任务完成解锁

  2.任务执行前后分别执行beforeExecute()和afterExecute()方法

  3.执行中遇到异常会向外抛出,线程是否死亡取决于您对于异常的处理

  4.每个任务执行完后,当前工作线程任务完成数自增,同时会循环调用getTask()从任务队列中反复获取任务并执行,无任务可执行时线程会阻塞在该方法上

  5.当工作线程因各种理由退出时,会执行processWorkerExit()回收线程(核心是将该worker从workers集合中移除,注意之前worker已经退出任务循环,因此已经不再做工了,从集合移除后就方便gc了)

     – 锁方法

// Lock methods
// The value 0 represents the unlocked state. 0表示未锁定
// The value 1 represents the locked state. 1表示已锁定
protected boolean isHeldExclusively() {
    return getState() != 0;
}
protected boolean tryAcquire(int unused) {
    //锁状态非0即1,即不可重入
    //特殊情况:只有初始化时才为-1,目的是防止线程初始化阶段被中断
    if (compareAndSetState(0, 1)) {
        //当前线程占有锁
        setExclusiveOwnerThread(Thread.currentThread());
        return true;
    }
    return false;
}
protected boolean tryRelease(int unused) {
    //释放锁
    setExclusiveOwnerThread(null);
    //状态恢复成未锁定状态
    setState(0);
    return true;
}
public void lock()        { acquire(1); }
public boolean tryLock()  { return tryAcquire(1); }
public void unlock()      { release(1); }
public boolean isLocked() { return isHeldExclusively(); }
void interruptIfStarted() {
    Thread t;
    if (getState() >= 0 && (t = thread) != null 
                && !t.isInterrupted()){
        try {
            t.interrupt();
        } catch (SecurityException ignore) {
        }
    }
}

      – 动态控制

/**
 * 设置核心工作线程数
 * 1.若新值<当前值时,将调用interruptIdleWorkers()处理超出部分线程
 * 2.若新值>当前值时,新创建的线程(若有必要)直接会处理队列中的任务
 */
public void setCorePoolSize(int corePoolSize)
/**
 * 设置是否响应核心工作线程超时处理
 * 1.设置false时,核心工作线程不会因为任务数不足(空闲)而被终止
 * 2.设置true时,核心工作线程和非核心工作线程待遇一样,会因为超时而终止
 * 注意:为了禁止出现持续性的线程替换,当设置true时,超时时间必须>0
 * 注意:该方法通常应在线程池被使用之前调用
 */
public void allowCoreThreadTimeOut(boolean value)
/**
 * 设置最大工作线程数
 * 1.若新值<当前值时,将调用interruptIdleWorkers()处理超出部分线程
 * 注意:当新值>当前值时是无需做任何处理的,跟设置核心工作线程数不一样
 */
public void setMaximumPoolSize(int maximumPoolSize)
/** 
 * 设置超时时间,超时后工作线程将被终止
 * 注意:若实际工作线程数只剩一个,除非线程池被终止,否则无须响应超时
 */
public void setKeepAliveTime(long time, TimeUnit unit) 

 

■ 任务提交与执行

     – execute()  –  提交任务

/**
 * 在未来的某个时刻执行给定的任务
 * 这个任务由一个新线程执行,或者用一个线程池中已经存在的线程执行
 * 如果任务无法被提交执行,要么是因为这个Executor已经被shutdown关闭
 * 要么是已经达到其容量上限,任务会被当前的RejectedExecutionHandler处理
 */
public void execute(Runnable command) {
        //新任务不允许为空,空则抛出NPE
        if (command == null)
            throw new NullPointerException();
        /**
         * 1.若实际工作线程数 < 核心工作线程数,会尝试创建一个工作线程去执行该
         * 任务,即该command会作为该线程的第一个任务,即第一个firstTask
         * 
         * 2.若任务入队成功,仍需要执行双重校验,原因有两点:
         *      - 第一个是去确认是否需要新建一个工作线程,因为可能存在
         *        在上次检查后已经死亡died的工作线程
         *      - 第二个是可能在进入该方法后线程池被关闭了,
         *        比如执行shutdown()
         *   因此需要再次检查state状态,并分别处理以上两种情况:
         *      - 若线程池中已无可用工作线程了,则需要新建一个工作线程 
         *      - 若线程池已被关闭,则需要回滚入队列(若有必要)
         * 
         * 3.若任务入队失败(比如队列已满),则需要新建一个工作线程; 
         *   若新建线程失败,说明线程池已停止或者已饱和,必须执行拒绝策略
         */
        int c = ctl.get();
        /**
         * 情况一:当实际工作线程数 < 核心工作线程数时
         * 执行方案:会创建一个新的工作线程去执行该任务
         * 注意:此时即使有其他空闲的工作线程也还是会新增工作线程,
         *      直到达到核心工作线程数为止
         */
        if (workerCountOf(c) < corePoolSize) {
            /**
             * 新增工作线程,true表示要对比的是核心工作线程数
             * 一旦新增成功就开始执行当前任务
             * 期间也会通过自旋获取队列任务进行执行
             */
            if (addWorker(command, true))
                return;
            /**
             * 需要重新获取控制器状态,说明新增线程失败
             * 线程失败的原因可能有两种:
             *  - 1.线程池已被关闭,非RUNNING状态的线程池是不允许接收新任务的
             *  - 2.并发时,假如都通过了workerCountOf(c) < corePoolSize校验,但其他线程
             *      可能会在addWorker先创建出线程,导致workerCountOf(c) >= corePoolSize,
             *      即实际工作线程数 >= 核心工作线程数,此时需要进入情况二
             */
            c = ctl.get();
        }
        /**
         * 情况二:当实际工作线程数>=核心线程数时,新提交任务需要入队
         * 执行方案:一旦入队成功,仍需要处理线程池状态突变和工作线程死亡的情况
         */
        if (isRunning(c) && workQueue.offer(command)) {
            //双重校验
            int recheck = ctl.get();
            /**
             * recheck的目的是为了防止线程池状态的突变 - 即被关闭
             * 一旦线程池非RUNNING状态时,除了从队列中移除该任务(回滚)外
             * 还需要执行任务拒绝策略处理新提交的任务
             */
            if (!isRunning(recheck) && remove(command))
                //执行任务拒绝策略
                reject(command);
            /**
             * 若线程池还是RUNNING状态 或 队列移除失败(可能正好被一个工作线程拿到处理了)
             * 此时需要确保至少有一个工作线程还可以干活
             * 补充一句:之所有无须与核心工作线程数或最大线程数相比,而只是比较0的原因是
             *          只要保证有一个工作线程可以干活就行,它会自动去获取任务
             */
            else if (workerCountOf(recheck) == 0)
                /**
                 * 若工作线程都已死亡,需要新增一个工作线程去干活
                 * 死亡原因可能是线程超时或者异常等等复杂情况
                 *
                 * 第一个参数为null指的是传入一个空任务,
                 * 目的是创建一个新工作线程去处理队列中的剩余任务
                 * 第二个参数为false目的是提示可以扩容到最大工作线程数
                 */
                addWorker(null, false);
        }
        /**
         * 情况三:一旦线程池被关闭 或者 新任务入队失败(队列已满)
         * 执行方案:会尝试创建一个新的工作线程,并允许扩容到最大工作线程数
         * 注意:一旦创建失败,比如超过最大工作线程数,需要执行任务拒绝策略
         */
        else if (!addWorker(command, false))
            //执行任务拒绝策略
            reject(command);
    }

     – addWorker() – 新增工作线程

/**
 * 新增工作线程需要遵守线程池控制状态规定和边界限制
 *
 * @param core core为true时允许扩容到核心工作线程数,否则为最大工作线程数
 * @return 新增成功返回true,失败返回false
 */
private boolean addWorker(Runnable firstTask, boolean core) {
    //重试标签
    retry:
    /***
     * 外部自旋 -> 目的是确认是否能够新增工作线程
     * 允许新增线程的条件有两个:
     *   1.满足线程池状态条件 -> 条件一
     *   2.实际工作线程满足数量边界条件 -> 条件二
     * 不满足条件时会直接返回false,表示新增工作线程失败
     */
    for (;;) {
        //读取原子控制量 - 包含workerCount(实际工作线程数)和runState(线程池状态)
        int c = ctl.get();
        //读取线程池状态
        int rs = runStateOf(c);
        /**
         * 条件一.判断是否满足线程池状态条件
         *  1.只有两种情况允许新增线程:
         *    1.1 线程池状态==RUNNING
         *    1.2 线程池状态==SHUTDOWN且firstTask为null同时队列非空
         *
         *  2.线程池状态>=SHUTDOWN时不允许接收新任务,具体如下:
         *    2.1 线程池状态>SHUTDOWN,即为STOP、TIDYING、TERMINATED
         *    2.2 线程池状态==SHUTDOWN,但firstTask非空
         *    2.3 线程池状态==SHUTDOWN且firstTask为空,但队列为空
         *  补充:针对1.2、2.2、2.3的情况具体请参加后面的"小问答"环节
         */
        if (rs >= SHUTDOWN &&
            !(rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty()))
            return false;
        /***
         * 内部自旋 -> 条件二.判断实际工作线程数是否满足数量边界条件
         *   -数量边界条件满足会对尝试workerCount实现CAS自增,否则新增失败
         *   -当CAS失败时会再次重新判断是否满足新增条件:
         *       1.若此期间线程池状态突变(被关闭),重新判断线程池状态条件和数量边界条件
        *        2.若此期间线程池状态一致,则只需重新判断数量边界条件
        */
        for (;;) {
            //读取实际工作线程数
            int wc = workerCountOf(c);
            /**
             * 新增工作线程会因两种实际工作线程数超标情况而失败:
             *  1.实际工作线程数 >= 最大容量
             *  2.实际工作线程数 > 工作线程比较边界数(当前最大扩容数)
             *   -若core = true,比较边界数 = 核心工作线程数
             *   -若core = false,比较边界数 = 最大工作线程数
             */
            if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            /**
             * 实际工作线程计数CAS自增:
             *   1.一旦成功直接退出整个retry循环,表明新增条件都满足
             *   2.因并发竞争导致CAS更新失败的原因有三种: 
             *      2.1 线程池刚好已新增一个工作线程
             *        -> 计数增加,只需重新判断数量边界条件
             *      2.2 刚好其他工作线程运行期发生错误或因超时被回收
             *        -> 计数减少,只需重新判断数量边界条件
             *      2.3 刚好线程池被关闭 
             *        -> 计数减少,工作线程被回收,
             *           需重新判断线程池状态条件和数量边界条件
             */
            if (compareAndIncrementWorkerCount(c))
                break retry;
            //重新读取原子控制量 -> 原因是在此期间可能线程池被关闭了
            c = ctl.get();
            /**
             * 快速检测是否发生线程池状态突变
             *  1.若状态突变,重新判断线程池状态条件和数量边界条件
             *  2.若状态一致,则只需重新判断数量边界条件
             */
            if (runStateOf(c) != rs)
                continue retry;
        }
    }
    /**
     * 这里是addWorker方法的一个分割线
     * 前面的代码的作用是决定了线程池接受还是拒绝新增工作线程
     * 后面的代码的作用是真正开始新增工作线程并封装成Worker接着执行后续操作
     * PS:虽然笔者觉得这个方法其实可以拆分成两个方法的(在break retry的位置)
     */
    //记录新增的工作线程是否开始工作
    boolean workerStarted = false;
    //记录新增的worker是否成功添加到workers集合中
    boolean workerAdded = false;
    Worker w = null;
    try {
        //将新提交的任务和当前线程封装成一个Worker
        w = new Worker(firstTask);
        //获取新创建的实际工作线程
        final Thread t = w.thread;
        /**
         * 检测是否有可执行任务的线程,即是否成功创建了新的工作线程
         *   1.若存在,则选择执行任务
         *   2.若不存在,则需要执行addWorkerFailed()方法
         */
        if (t != null) {
            /**
             * 新增工作线程需要加全局锁
             * 目的是为了确保安全更新workers集合和largestPoolSize
             */
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                /**
                 * 获得全局锁后,需再次检测当前线程池状态
                 * 原因在于预防两种非法情况:
                 *  1.线程工厂创建线程失败
                 *  2.在锁被获取之前,线程池就被关闭了
                 */
                int rs = runStateOf(ctl.get());
                /**
                 * 只有两种情况是允许添加work进入works集合的
                 * 也只有进入workers集合后才是真正的工作线程,并开始执行任务
                 *  1.线程池状态为RUNNING(即rs<SHUTDOWN)
                 *  2.线程池状态为SHUTDOWN且传入一个空任务
                 *  (理由参见:小问答之快速检测线程池状态?) 
                 */
                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    /**
                     * 若线程处于活动状态时,说明线程已启动,需要立即抛出"线程状态非法异常"
                     * 原因是线程是在后面才被start的,已被start的不允许再被添加到workers集合中
                     * 换句话说该方法新增线程时,而线程是新的,本身应该是初始状态(new)
                     * 可能出现的场景:自定义线程工厂newThread有可能会提前启动线程
                     */
                    if (t.isAlive())
                        throw new IllegalThreadStateException();
                    //由于加锁,所以可以放心的加入集合
                    workers.add(w);
                    int s = workers.size();
                    //更新最大工作线程数,由于持有锁,所以无需CAS
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    //确认新建的worker已被添加到workers集合中  
                    workerAdded = true;
                }
            } finally {
                //千万不要忘记主动解锁
                mainLock.unlock();
            }
            /**
             * 一旦新建工作线程被加入工作线程集合中,就意味着其可以开始干活了
             * 有心的您肯定发现在线程start之前已经释放锁了
             * 原因在于一旦workerAdded为true时,说明锁的目的已经达到
             * 根据最小化锁作用域的原则,线程执行任务无须加锁,这是种优化
             * 也希望您在使用锁时尽量保证锁的作用域最小化
             */
            if (workerAdded) {
                /**
                 * 启动线程,开始干活啦
                 * 若您看过笔者的"并发番@Thread一文通"肯定知道start()后,
                 * 一旦线程初始化完成便会立即调用run()方法
                 */
                t.start();
                //确认该工作线程开始干活了
                workerStarted = true;
            }
        }
    } finally {
        //若新建工作线程失败或新建工作线程后没有成功执行,需要做新增失败处理
        if (!workerStarted)
            addWorkerFailed(w);
    }
    //返回结果表明新建的工作线程是否已启动执行
    return workerStarted;
}

  结论之启动调用会经历一下过程

*   (1) worker = new Worker(Runnable) –> (2) thread =
newThread(worker) –> (3) thread.start() –> (4)
thread.run()[JVM自动调用] –> (5) worker.run() –> (6)
threadPoolExecuter.runWorker(worker)*

*   *

*    – runWorker() * – 执行任务

final void runWorker(Worker w) {
    //读取当前线程 -即调用execute()方法的线程(一般是主线程)
    Thread wt = Thread.currentThread();
    //读取待执行任务
    Runnable task = w.firstTask;
    //清空任务 -> 目的是用来接收下一个任务
    w.firstTask = null;
    /**
     * 注意Worker本身也是一把不可重入的互斥锁!
     * 由于Worker初始化时state=-1,因此此处的解锁的目的是:
     * 将state-1变成0,因为只有state>=0时才允许中断;
     * 同时也侧面说明在worker调用runWorker()之前是不允许被中断的,
     * 即运行前不允许被中断
     */
    w.unlock();
    //记录是否因异常/错误突然完成,默认有异常/错误发生
    boolean completedAbruptly = true;
    try {
        /**
         * 获取任务并执行任务,取任务分两种情况:
         *   1.初始任务:Worker被初始化时赋予的第一个任务(firstTask)
         *   2.队列任务:当firstTask任务执行好后,线程不会被回收,而是之后自动自旋从任务队列中取任务(getTask)
         *     此时即体现了线程的复用
         */
        while (task != null || (task = getTask()) != null) {
            /**
             * Worker加锁的目的是为了在shutdown()时不要立即终止正在运行的worker,
             * 因为需要先持有锁才能终止,而不是为了处理并发情况(注意不是全局锁)
             * 在shutdownNow()时会立即终止worker,因为其无须持有锁就能终止
             * 关于关闭线程池下文会再具体详述
             */
            w.lock();
            /**
             * 当线程池被关闭且主线程非中断状态时,需要重新中断它
             * 由于调用线程一般是主线程,因此这里是主线程代指调用线程
             */
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                    runStateAtLeast(ctl.get(), STOP))) &&
                        !wt.isInterrupted())
                wt.interrupt();
            try {
                /**
                 * 每个任务执行前都会调用"前置方法",
                 * 在"前置方法"可能会抛出异常,
                 * 结果是退出循环且completedAbruptly=true,
                 * 从而线程死亡,任务未执行(并被丢弃)
                 */
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    //执行任务
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    /**
                     * 任务执行结束后,会调用"后置方法"
                     * 该方法也可能抛异常从而导致线程死亡
                     * 但值得注意的是任务已经执行完毕
                     */
                    afterExecute(task, thrown);
                }
            } finally {
                //清空任务 help gc
                task = null;
                //无论成功失败任务数都要+1,由于持有锁所以无须CAS
                w.completedTasks++;
                //必须要主动释放锁
                w.unlock();
            }
        }
        //无异常时需要清除异常状态
        completedAbruptly = false;
    } finally {
        /**
         * 工作线程退出循环的原因有两个:
         *  1.因意外的错误/异常退出
         *  2.getTask()返回空 -> 原因有四种,下文会详述
         * 工作线程退出循环后,需要执行相对应的回收处理
         */
        processWorkerExit(w, completedAbruptly);
    }
}

    – getTask() – 获取任务

      造成getTask()方法返回null的原因有5种:
  1.线程池被关闭,状态为(STOP || TIDYING || TERMINATED)

  2.线程池被关闭,状态为SHUTDOWN且任务队列为空

  3.实际工作线程数超过最大工作线程数

  4.工作线程满足超时条件后,同时符合下述的任意一种情况:
      4.1 线程池中还存在至少一个其他可用的工作线程

      4.2 线程池中已没有其他可用的工作线程但任务队列为空

private Runnable getTask() {
    // 记录任务队列的poll()是否超时,默认未超时
    boolean timedOut = false; 
    //自旋获取任务
    for (;;) {
        /**
         * 线程池会依次判断五种情况,满足任意一种就返回null:
         *    1.线程池被关闭,状态为(STOP || TIDYING || TERMINATED)
         *    2.线程池被关闭,状态为SHUTDOWN且任务队列为空
         *    3.实际工作线程数超过最大工作线程数
         *    4.工作线程满足超时条件后,同时符合下述的任意一种情况:
         *      4.1 线程池中还存在至少一个其他可用的工作线程
         *      4.2 线程池中已没有其他可用的工作线程但任务队列为空
         */
        int c = ctl.get();
        int rs = runStateOf(c);
        /**
         * 判断线程池状态条件,有两种情况直接返回null
         *  1.线程池状态大于SHUTDOWN(STOP||TIDYING||TERMINATED),说明不允许再执行任务
         *    - 因为>=STOP以上状态时不允许接收新任务同时会中断正在执行中的任务,任务队列的任务也不执行了         
         *  
         *  2.线程池状态为SHUTDOWN且任务队列为空,说明已经无任务可执行
         *    - 因为SHUTDOWN时还需要执行任务队列的剩余任务,只有当无任务才可退出
         */
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            /**
             * 减少一个工作线程数
             * 值得注意的是工作线程的回收是放在processWorkerExit()中进行的
             * decrementWorkerCount()方法是内部不断循环执行CAS的,保证最终一定会成功
             * 补充:因线程池被关闭而计数减少可能与addWorker()的
             *      计数CAS自增发生并发竞争
             */
            decrementWorkerCount();
            return null;
        }
        //读取实际工作线程数
        int wc = workerCountOf(c);
        /**
         * 判断是否需要处理超时:
         *   1.allowCoreThreadTimeOut = true 表示需要回收空闲超时的核心工作线程
         *   2.wc > corePoolSize 表示存在空闲超时的非核心工作线程需要回收
         */
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
         /**
          * 有三种情况会实际工作线程计数-1且直接返回null
          *
          *    1.实际工作线程数超过最大线程数
          *    2.该工作线程满足空闲超时条件需要被回收:
          *       2.1 当线程池中还存在至少一个其他可用的工作线程
          *       2.2 线程池中已没有其他可用的工作线程但任务队列为空
          *  
          * 结合2.1和2.2我们可以推导出:
          *
          *   1.当任务队列非空时,线程池至少需要维护一个可用的工作线程,
          *     因此此时即使该工作线程超时也不会被回收掉而是继续获取任务
          *
          *   2.当实际工作线程数超标或获取任务超时时,线程池会因为
          *     一直没有新任务可执行,而逐渐减少线程直到核心线程数为止;
          *     若设置allowCoreThreadTimeOut为true,则减少到1为止;
          *
          * 提示:由于wc > maximumPoolSize时必定wc > 1,因此无须比较
          * (wc > maximumPoolSize && workQueue.isEmpty()) 这种情况
          */
        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            /**
             * CAS失败的原因还是出现并发竞争,具体参考上文
             * 当CAS失败后,说明实际工作线程数已经发生变化,
             * 必须重新判断实际工作线程数和超时情况
             * 因此需要countinue
             */
            if (compareAndDecrementWorkerCount(c))
                return null;
           /**        
            */                
            continue;
        }
        //若满足获取任务条件,根据是否需要超时获取会调用不同方法
        try {
           /**
            * 从任务队列中取任务分两种:
            *  1.timed=true 表明需要处理超时情况
            *   -> 调用poll(),超过keepAliveTime返回null
            *  2.timed=fasle 表明无须处理超时情况
            *   -> 调用take(),无任务则挂起等待
            */
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            //一旦获取到任务就返回该任务并退出循环
            if (r != null)
                return r;
            //当任务为空时说明poll超时
            timedOut = true;
            /**
             * 关于中断异常获取简单讲一些超出本章范畴的内容
             * take()和poll(long timeout, TimeUnit unit)都会throws InterruptedException
             * 原因在LockSupport.park(this)不会抛出异常但会响应中断;
             * 但ConditionObject的await()会通过reportInterruptAfterWait()响应中断
             * 具体内容笔者会在阻塞队列相关番中进一步介绍
             */
        } catch (InterruptedException retry) {
            /**
             * 一旦该工作线程被中断,需要清除超时标记
             * 这表明当工作线程在获取队列任务时被中断,
             * 若您不对中断异常做任务处理,线程池就默认
             * 您希望线程继续执行,这样就会重置之前的超时标记
             */
            timedOut = false;
        }
    }
}

 

■ 关闭线程池

   – 使用shutdown()关闭线程池最主要执行5个操作:
  1.获取全局锁

  2.CAS自旋变更线程池状态为SHUTDOWN

  3.中断所有空闲工作线程(设置中断标记) -> 注意是空闲

  4.释放全局锁

  5.尝试终止线程池

/**
 * 有序关闭线程池
 * 在关闭过程中,之前已提交的任务将被执行(包括正在和队列中的),
 * 但新提交的任务会被拒绝
 * 如果线程池已经被关闭,调用该方法不会有任何附加效果
 */
public void shutdown() {
    //1.获取全局锁
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess();
        //2.CAS自旋变更线程池状态为SHUTDOWN
        advanceRunState(SHUTDOWN);
        //3.中断所有空闲工作线程
        interruptIdleWorkers();
        //专门提供给ScheduledThreadPoolExecutor的钩子方法
        onShutdown();
    } finally {
        //4.释放全局锁
        mainLock.unlock();
    }
    /**
     * 5.尝试终止线程池,此时线程池满足两个条件:
     *   1.线程池状态为SHUTDOWN
     *   2.所有空闲工作线程已被中断
     */
    tryTerminate();
}

  – 使用shutdownNow()关闭线程池最主要执行六个操作:
  1.获取全局锁

  2.CAS自旋变更线程池状态为SHUTDOWN

  3.中断所有工作线程(设置中断标记)

  4.将剩余任务重新放入一个list中并清空任务队列

  5.释放全局锁

  6.尝试终止线程池

/**
 * 尝试中断所有工作线程,并返回待处理任务列表集合(从任务队列中移除)
 *
 * 1.若想等待执行中的线程完成任务,可使用awaitTermination()
 * 2.由于取消任务操作是通过Thread#interrupt实现,因此
 *   响应中断失败的任务可能永远都不会被终止(谨慎使用!!!)
 *   响应中断失败指的是您选择捕获但不处理该中断异常
 */
public List<Runnable> shutdownNow() {
    List<Runnable> tasks;
    //1.获取全局锁
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess();
        //2.CAS自旋更新线程池状态为STOP
        advanceRunState(STOP);
        //3.中断所有工作线程
        interruptWorkers();
        //4.将剩余任务重新放入一个list中并清空任务队列
        tasks = drainQueue();
    } finally {
        //5.释放全局锁
        mainLock.unlock();
    }
    /**
     * 6.尝试终止线程池,此时线程池满足两个条件:
     *   1.线程池状态为STOP
     *   2.任务队列为空
     * 注意:此时不一定所有工作线程都被中断回收,详述见
     *       7.3 tryTerminate
     */
    tryTerminate();
    //5.返回待处理任务列表集合
    return tasks;
}

 

 ■ 饱和拒绝策略

   
线程池的饱和拒绝策略主要用于拒绝任务(但这并不意味着该任务不会被执行),线程池原生提供了四种饱和拒绝策略,基本涵盖常见的饱和处理场景:
  AbortPolicy:默认策略,直接抛出异常

  CallerRunsPolicy:只用调用线程执行该任务

  DiscardPolicy:直接丢弃任务

  DiscardOldestPolicy:丢弃队尾任务并用线程池重新尝试执行该任务

     所有的拒绝策略都需要实现该拒绝处理器接口,以统一口径:

/**
 * 用于拒绝线程池任务的处理器
 */
public interface RejectedExecutionHandler {
    /**
     * 该方法用于拒绝接受线程池任务
     * 
     * 有三种情况可能调用该方法:
     *   1.没有更多的工作线程可用
     *   2.任务队列已满
     *   3.关闭线程池
     *
     * 当没有其他处理选择时,该方法会选择抛出RejectedExecutionException异常
     * 该异常会向上抛出直到execute()的调用者
     */
    void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}

    – CallerRunsPolicy

 处理规则:新提交任务由调用者线程直接执行

 推荐:拒绝策略推荐使用CallerRunsPolicy,理由是该策略不会抛弃任务,也不会抛出异常,而是将任务回退到调用者线程中执行

/**
 * 不会直接丢弃,而是直接用调用execute()方法的线程执行该方法
 * 当然一旦线程池已经被关闭,还是要丢弃的
 *
 * 补充:值得注意的是所有策略类都是public的静态内部类,
 *      其目的应该是告知使用者 -> 该类与线程池相关但无需线程池实例便可直接使用
 */
public static class CallerRunsPolicy implements RejectedExecutionHandler {
    public CallerRunsPolicy() { }
    /**
     * 直接使用调用该方法的线程执行任务
     * 除非线程池被关闭时才会丢弃该任务
     */
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        //一旦线程池被关闭,丢弃该任务
        if (!e.isShutdown()) {
            //注意此时不是线程池执行该任务
            r.run();
        }
    }
}

    – AbortPolicy

    处理规则:直接抛出RejectedExecutionException异常

/**
 * 简单、粗暴的直接抛出RejectedExecutionException异常
 */
public static class AbortPolicy implements RejectedExecutionHandler {
    public AbortPolicy() { }
    /**
     * 直接抛出异常,但r.toString()方法会告诉你哪个任务失败了
     * 更人性化的一点是 e.toString()方法还会告诉你:
     * 线程池的状态、工作线程数、队列长度、已完成任务数
     * 建议若是不处理异常起码也要在日志里面打印一下,留个案底
     */
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        throw new RejectedExecutionException(
            "Task " + r.toString() + " rejected from " + e.toString());
    }
}

    – DiscardPolicy

     处理规则:根据LIFO(后进先出)规则直接丢弃最新提交的任务

/**
 * 直接丢弃任务
 * 这个太狠了,连个案底都没有,慎用啊
 */
public static class DiscardPolicy implements RejectedExecutionHandler {
    public DiscardPolicy() { }
    /**
     * 无作为即为丢弃
     */
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    }
}

    – DiscardOldestPolicy

  
处理规则:根据LRU(最近最少使用)规则丢弃最后一个任务,然后尝试执行新提交的任务

/**
 * 比起直接丢弃,该类会丢弃队列里最后一个但仍未被处理的任务,
 * 然后会重新调用execute()方法处理当前任务
 * 除非线程池被关闭时才会丢弃该任务
 * 此类充分证明了"来得早不如来的巧"
 */
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
    public DiscardOldestPolicy() { }
    /**
     * 丢弃队列里最近的一个任务,并执行当前任务
     * 除非线程池被关闭时才会丢弃该任务
     * 原因是队列是遵循先进先出FIFO原则,poll()会弹出队尾元素
     */
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        //一旦线程池被关闭,直接丢弃
        if (!e.isShutdown()) {
            //弹出队尾元素
            e.getQueue().poll();
            //直接用线程池执行当前任务
            e.execute(r);
        }
    }
}

 

You can leave a response, or trackback from your own site.

Leave a Reply

网站地图xml地图