如果存在大量的创建线程、销毁线程操作,效率是很低的。因为创建线程是比较耗时、耗资源的。如果非常频繁地创建、销毁线程会极大地降低效率。为避免这种情况的发生,产生了threadpoolexecutor线程池。
重要成员:
//AtomicInteger提供原子操作来进行Integer的使用。ctl是AtomicInteger的,所以是线程安全的。 //用ctl来记录线程的个数。 //ctl维护两个概念上的参数:workCount和runState。workCount表示有效的线程数量,runState表示线程池的运行状态。 private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); //为了将状态和数量放在一起,所以高3位用于表示表示状态,低29位表示数量。此时的COUNT_BITS 值为 32-3 = 29. private static final int COUNT_BITS = Integer.SIZE - 3; //容纳的线程最大的个数 private static final int CAPACITY = (1 << COUNT_BITS) - 1; //五种运行状态 private static final int RUNNING = -1 << COUNT_BITS; private static final int SHUTDOWN = 0 << COUNT_BITS; private static final int STOP = 1 << COUNT_BITS; private static final int TIDYING = 2 << COUNT_BITS; private static final int TERMINATED = 3 << COUNT_BITS;为了从clt中获取各部分的值,提供了如下方法:
// Packing and unpacking ctl private static int runStateOf(int c) { return c & ~CAPACITY; } private static int workerCountOf(int c) { return c & CAPACITY; } private static int ctlOf(int rs, int wc) { return rs | wc; }五种运行状态: RUNNING:接受新任务并且处理已经进入队列的任务。 SHUTDOWN:不接受新任务,但是处理已经进入队列的任务。 STOP:不接受新任务,不处理已经进入队列的任务,并且中断正在执行的任务。 TIDYING:所有任务执行完成,workerCount为0。线程转到了状态TIDYING会执行terminated()钩子方法。 TERMINATED:terminated()已经执行完成。 状态间的转换: RUNNING -> SHUTDOWN: 调用了shutdown()方法。 (RUNNING 或 SHUTDOWN) -> STOP: 调用了shutdownNow()。 SHUTDOWN -> TIDYING: 当队列和线程池为空。 STOP -> TIDYING:当线程池为空。 TIDYING -> TERMINATED:当terminated()钩子方法执行完成。
构造方法:
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler); } public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, defaultHandler); } public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), handler); } public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.acc = System.getSecurityManager() == null ? null : AccessController.getContext(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }ThreadPoolExecuto的构造方法中有corePoolSize、maximunPoolSize、keepAliveTime、unit、workQueue、threadFactory和handler参数: corePoolSize: 核心线程的数量。默认是没有超时的,也就是说就算线程闲置,也不会被处理。但是如果设置了allowCoreTimeOut为true,那么当核心线程闲置时,会被回收。 maximumPoolSize:最大线程池尺寸,被CAPACITY限制(2^29-1); keepAliveTime:闲置线程被回收的时间限制; unit: keepAliveTime的单位; workQueue: 用于存放任务的队列 ; threadFactory: 创建线程的工厂类 ; handler: 当任务执行失败时,使用handler通知调用者;
当创建好一个ThreadPoolExecutor对象后,调用execute(Runnable r)方法执行任务。下面是execute方法的实现:
public void execute(Runnable command) { //检查command不能为null if (command == null) throw new NullPointerException(); int c = ctl.get(); //如果当前线程小于corePoolSize if (workerCountOf(c) < corePoolSize) { //如果添加Worker线程成功,则返回 if (addWorker(command, true)) return; c = ctl.get(); } //检测当前的线程池是否处在Running状态 if (isRunning(c) && workQueue.offer(command)) { //再次检查ctl状态 int recheck = ctl.get(); //如果不在运行状态了,那么就从队列中移除任务 if (! isRunning(recheck) && remove(command)) reject(command); //如果在运行阶段,但是Worker数量为0,调用addWorker方法 else if (workerCountOf(recheck) == 0) addWorker(null, false); } //如果不能入队,且不能创建Worker,那么reject else if (!addWorker(command, false)) reject(command); }execute方法中主要使用到addWorker方法,addWorker方法用于创建线程,并且通过core参数表示该线程是否是核心线程,如果返回true则表示创建成功,否则失败。addWorker的代码如下所示:
// 申请一个新的线程 private boolean addWorker(Runnable firstTask, boolean core) { // retry相当于是一个标记(), // 后面执行continue retry时,会跳出当前循环;进入最外层for循环; retry: for (;;) { int c = ctl.get(); // 获取当前运行状态 int rs = runStateOf(c); //rs>=SHUTDOWN为false,即线程池处于RUNNING状态; //rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty()这个条件为true,也就意味着三个条件同时满足,即线程池状态为SHUTDOWN且firstTask为null且队列不为空,这种情况为处理队列中剩余任务。 if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { // 获得当前线程池中线程的数量 int wc = workerCountOf(c); //如果worker数量超过了容量或者超过了corePoolSize或者maximumPoolSize,直接返回false if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) // 这里都会创建新线程失败 return false; // 执行到这,表明可以创建新线程,给新线程数增加一 if (compareAndIncrementWorkerCount(c)) // 跳出循环 break retry; // 执行到这表明cas失败了, c = ctl.get(); // Re-read ctl // 检测状态是否改变 if (runStateOf(c) != rs) // 状态已经改变,则回到外层循环重新获取状态 continue retry; // 否则,继续内层循环 // else CAS failed due to workerCount change; retry inner loop } } //创建新线程 boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { //以firstTask作为Worker的第一个任务创建Worker w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { // 为了解决多线程安全的问题, // 会涉及到很多的对于线程的增加或删除, // 采用mainLock.lock(); 保证在添加线程和删除线程的同步。 final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. // 获取当前线程池的状态 int rs = runStateOf(ctl.get()); // 如果线程池处于运行态,直接进入到if中 // 如果线程池关闭了并且新任务是null,那直接取消增加该新线程 if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { // 如果新线程已经启动,直接抛出异常,因为这里还没有判断完毕,不能启动 if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); // 给线程的所有结合增加一个新线程 workers.add(w); int s = workers.size(); // 保证largestPoolSize 的值一定大于等于所有的线程个数 if (s > largestPoolSize) largestPoolSize = s; // 执行到这,表明新增线程的统计工作已经基本完成了 workerAdded = true; } } finally { // 释放锁的过程, // 让其他新线程得以继续增加或删除 mainLock.unlock(); } // 检测是否增家了新线程 if (workerAdded) { // 如果增加了新线程,就启动它 // 这里也不是立马就执行任务,而是等待cpu调度线程执行任务 // 后面会进行分析 t.start(); workerStarted = true; } } } finally { // 判断新线程是否已经启动 if (! workerStarted) // 没有启动,那就说明出现了问题,先把这个新线程销毁了 addWorkerFailed(w); } // 返回新线程的最终运行状态来代表创建的失败与否 return workerStarted; } private void addWorkerFailed(Worker w) { final ReentrantLock mainLock = this.mainLock; // 和上面一样,为了保证多线程安全 mainLock.lock(); try { // 将线程从线程集合中删掉 if (w != null) workers.remove(w); // 减少线程数量 decrementWorkerCount(); // 尝试终止状态 tryTerminate(); } finally { // 释放锁 mainLock.unlock(); } }Worker内部类:
private final class Worker extends AbstractQueuedSynchronizer implements Runnable { private static final long serialVersionUID = 6138294804551838833L; /** Thread this worker is running in. Null if factory fails. */ final Thread thread; /** Initial task to run. Possibly null. */ Runnable firstTask; /** Per-thread task counter */ volatile long completedTasks; /** * Creates with given first task and thread from ThreadFactory. * @param firstTask the first task (null if none) */ Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); } /** Delegates main run loop to outer runWorker */ public void run() { runWorker(this); } // Lock methods // // The value 0 represents the unlocked state. // The value 1 represents the locked state. protected boolean isHeldExclusively() { return getState() != 0; } protected boolean tryAcquire(int unused) { if (compareAndSetState(0, 1)) { setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; } protected boolean tryRelease(int unused) { setExclusiveOwnerThread(null); setState(0); return true; } public void lock() { acquire(1); } public boolean tryLock() { return tryAcquire(1); } public void unlock() { release(1); } public boolean isLocked() { return isHeldExclusively(); } void interruptIfStarted() { Thread t; if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) { try { t.interrupt(); } catch (SecurityException ignore) { } } } }Worker继承自AbstractQueuedSynchronizer并实现Runnbale接口。Woerker的构造方法中会使用threadFactory构造线程变量并持有,run方法调用了runWorker方法,将线程委托给主循环线程。runWorker方法:
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { //当任务不为null时 while (task != null || (task = getTask()) != null) { //对Worker加锁 w.lock(); //如果线程池停止了,那么中断线程 if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); Throwable thrown = null; try { //执行任务 task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { //任务执行前干一些事情,这些方法由用户去实现 afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } }getTask()方法:
// runWorker()里面的wihle循环中,获取一个新的任务 private Runnable getTask() { boolean timedOut = false; // Did the last poll() time out? for (;;) { int c = ctl.get(); // 轮到执行该任务的时候,获取当前线程池的状态 int rs = runStateOf(c); // Check if queue empty only if necessary. //进行线程池的状态的检测 if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { // 将Worker中的数量减少一个 decrementWorkerCount(); // 返回null,意味着while()循环结束 return null; } int wc = workerCountOf(c); // 是否允许线程超时 boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; // 当前线程数量已经超过最大允许的线程数量,这个线程的任务必定不被执行 if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { // 进行线程数量减一 if (compareAndDecrementWorkerCount(c)) // runWorker()的while循环结束 return null; continue; } try { Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; timedOut = true; } catch (InterruptedException retry) { timedOut = false; } } }