Executors是一个工具类,不能被实例化。
创建一个可重用固定线程数的线程池,以共享的无界队列方式来运行这些线程。 创建指定个线程的线程池,而且不能扩容,在刚开始就会初始化指定线程池。 创建线程池本身会比较耗时,而且传入的数值过大时,可能造成OOM。 参数
核心线程数最大线程数非核心空闲线程存活时间非核心空闲线程存活时间单位任务存储队列 调用的是threadPoolExecutor。 调用的是threadPoolExecutor。创建一个可根据需要创建新线程的线程池,但是在以前构造的线程可用时将重用它们。 创建一个初始为0的可缓存的线程池。非核心空闲线程存活时间为60s.
创建一个使用单个 worker 线程的 Executor,以无界队列方式来运行该线程。
创建一个线程池,它可安排在给定延迟后运行命令或者定期地执行。
创建一个单线程执行程序,它可安排在给定延迟后运行命令或者定期地执行。
具有抢占式操作的线程池(1.8增加)
这是ThreadPoolExecutor的结构
Executor接口只有一个方法,execute,参数是一个Runnable。 传入一个Runnable任务,调用execute执行,其执行的时间是不确定的。
无法由 ThreadPoolExecutor 执行的任务的处理程序。
看到这里,ThreadPoolExecutor的结构就比较明显了。 首先ThreadPoolExecutor实现了Executor接口,传入任务,在未来时间执行。 ThreadPoolExecutor实现了ExecutorService接口,可以用于控制ThreadPoolExecutor是否接受任务,接受有返回的任务,以及ThreadPoolExecutor的停止等。 ThreadPoolExecutor继承AbstractExecutorServe抽象类。抽象类封装了提交任务,执行任务的通用封装。 ThreadPoolExecutor内部聚合了Worker类,Worker类实现了Runnable接口,可以实现任务的执行。 而且Wroker继承了AbstractQueuedSynchronizer类,用于实现多线程间数据的安全,以及相对应的线程调度。 ThreadPoolExecutor内部还组合了多个Policy的内部类,用于当ThreadPoolExector出现异常时的处理。
在Worker里面主要看下Worker继承AQS,实现AQS要求实现的tryAcquire,tryRelease,tryAcquireShared,tryReleaseShard和isHeldExclusively方法。 其他的请看: Java基础–AQS原理 Java基础–AQS的Condition源码解析
ThreadPoolExecutor内部的Worker是独占锁,而且是二进制锁,其锁状态只有两个状态:0,1 0表示空闲,1表示占用。
释放锁,清空锁持有线程,然后设置锁状态为0. 因为是独占锁,所以Worker没有实现tryAcquireShared和tryReleaseShared方法。
Wroker还实现了Runnable接口,Runnable接口只有一个方法,就是定义任务的run方法。 这是run方法的时序图,我们看下run方法都干了什么: 看起来挺复杂的。
构造方法需要有一个Runnable 的任务来初始化。 创建Worker时,会设置Worker的锁状态是-1,然后将传入的Runnable任务初始化任务。 然后将当前Runnable任务传入,从ThreadFactory获取线程。
一个线程安全的整形变量。它有这几个值: RUNNING,SHUTDOWN,STOP,TIDYING,TERMINATED.
RUNNING:Worker可以接收新的任务执行SHUTDOWN:不在接收新的任务,但是会将已经接收的任务执行完STOP:不在接收新的任务,而且已经接收的任务也不会再运行,会给正在运行中的线程发送中断。TIDYING:Worker任务列表清空,准备执行异常方法TERMINATED:Worker的异常方法已经执行。这几个状态的状态转换图:
每次将线程安全的ctl的值减1,直到为0.
worker意外退出
private void processWorkerExit(Worker w, boolean completedAbruptly) { // 判断worker是否发生意外 if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted // 如果发生意外,那么清空线程池worker的数量(相当于线程池发生意外) decrementWorkerCount(); // 获取线程池的锁 final ReentrantLock mainLock = this.mainLock; // 上锁 mainLock.lock(); try { // 线程池执行任务数量同步worker的执行成功数量 completedTaskCount += w.completedTasks; // 将出现意外的worker从worker的set中移除 workers.remove(w); } finally { mainLock.unlock(); } // 线程池进入终止阶段,尝试执行线程池终止的操作 tryTerminate(); int c = ctl.get(); if (runStateLessThan(c, STOP)) { if (!completedAbruptly) { int min = allowCoreThreadTimeOut ? 0 : corePoolSize; if (min == 0 && ! workQueue.isEmpty()) min = 1; if (workerCountOf(c) >= min) return; // replacement not needed } addWorker(null, false); } }执行线程池终止的操作
final void tryTerminate() { // 死循环 for (;;) { // 获取线程池的状态 int c = ctl.get(); // 判断线程池的状态是否是RUNNING if (isRunning(c) || // 判断线程池的状态是不是TIDYING runStateAtLeast(c, TIDYING) || // 判断线程池的状态是不是SHUTDOWN而且任务队列空(需要转换状态为TIDYING) (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty())) // 只有TERMINATED状态的线程池才会执行异常方法 return; // 线程池中还有正在运行的任务 if (workerCountOf(c) != 0) { // Eligible to terminate // 终止一个worker interruptIdleWorkers(ONLY_ONE); // 结束 return; } // 线程池中没有任务在执行 final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // 设置线程池的状态是TIDYING if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) { try { // 执行线程池异常操作 terminated(); } finally { // 线程池状态设置为TERMINATED ctl.set(ctlOf(TERMINATED, 0)); // 唤醒等待的线程(等待的线程是什么呢?(存疑)) termination.signalAll(); } return; } } finally { mainLock.unlock(); } // else retry on failed CAS } }// 中断worker
private void interruptIdleWorkers(boolean onlyOne) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { for (Worker w : workers) { Thread t = w.thread; if (!t.isInterrupted() && w.tryLock()) { try { t.interrupt(); } catch (SecurityException ignore) { } finally { w.unlock(); } } // 只中断一个worker if (onlyOne) break; } } finally { mainLock.unlock(); } }提交任务,没有返回值
public void execute(Runnable command) { // 检测任务 if (command == null) throw new NullPointerException(); // 获取线程池worker数量 int c = ctl.get(); // 如果线程池中线程数量小于核心线程数量 if (workerCountOf(c) < corePoolSize) { // 增加worker if (addWorker(command, true)) // 结束 return; // 增加失败,重新获取线程池中线程数量 c = ctl.get(); } // 如果线程池的状态是RUNNING,而且任务队列中加入任务成功 if (isRunning(c) && workQueue.offer(command)) { // 获取线程池线程数量 int recheck = ctl.get(); // 如果线程池的状态不是RUNNING,那么将当前任务从任务队列中移除 if (! isRunning(recheck) && remove(command)) // 执行任务失败操作 reject(command); // 如果线程池的状态还是RUNNING或者任务队列中移除当前任务失败 else if (workerCountOf(recheck) == 0) // 增加一个空的非核心worker addWorker(null, false); } // 以当前任务增加worker,woeker不是核心worker else if (!addWorker(command, false)) // 执行任务失败操作 reject(command); }增加worker
private boolean addWorker(Runnable firstTask, boolean core) { retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. // 检测线程池状态和任务以及任务队列 if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) // 未通过检测 return false; for (;;) { int wc = workerCountOf(c); if (wc >= CAPACITY || // worker数量超过允许的最大数量 wc >= (core ? corePoolSize : maximumPoolSize)) // worker数量小于核心worker数量 return false; // worker数量++ if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } } boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { // 新创建 worker w = new Worker(firstTask); // 获取worker的线程 final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); // worker加入到workers队列(set) workers.add(w); int s = workers.size(); if (s > largestPoolSize) // 更新线程池worker数量 largestPoolSize = s; // worker增加成功 workerAdded = true; } } finally { mainLock.unlock(); } // worker增加成功 if (workerAdded) { // 启动worker的线程 t.start(); workerStarted = true; } } } finally { // 如果worker启动失败 if (! workerStarted) // worker启动失败,将worker从workers队列中移除 addWorkerFailed(w); } return workerStarted; }worker增加失败,将worker移除
private void addWorkerFailed(Worker w) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { if (w != null) workers.remove(w); // 线程池worker数量-- decrementWorkerCount(); // 执行线程池异常操作(空) tryTerminate(); } finally { mainLock.unlock(); } }提交任务,有返回值
任务执行失败,抛出异常。
任务被拒绝,那么在当前线程调用(不能保证是多线程,可能是主线程直接调用run方法(串行))
移除任务队列头的任务。 可以理解为,任务队列满了,在加入任务的时候,会将队列头部的挤掉。
任务增加失败,什么也不做。
ThreadPoolExecutor继承了AbstractExecutorService,AbstractExecutorService实现了任务的提交,任务的增加等方法。但是实际上调用的是ThreadPoolExecutor的execute方法。 在ThreadPoolExecutor的execute方法中会判断线程池是否可用,如果可用,就会获取线程池的锁(ReentrantLock),然后将任务加入任务队列。 线程池是如何保证线程安全的? 1.线程池任务调度使用ReentrantLock保证任务不会被重复执行 任务队列必须是BlockQueue类型的,BlockQueue的子类保证队列的出入的线程安全。 2.线程池的worker节点继承了AbstractQueueSynchronizer() 当worker在运行任务前上锁,在任务运行结束后解锁。上锁后,不会响应中断,保证开始运行的任务不会被其他线程中断。只有任务结束,才会被中断。 3.worker的锁是不可重入的锁 防止线程池操作调整大小,获取数量等操作时,中断线程,导致任务没有完整的被执行。 任务加入线程池失败有4中处理方式
AbortPolicy: 抛出异常CallerRunsPolicy:主线程调用run运行DiscardOldestPolicy:抛弃任务队列头的任务,将失败的任务加入任务队列DiscardPolicy:抛弃失败的任务