本篇主要总结和分析一下线程池的源码。说是线程池,实际上是ThreadPoolExecutor,这主要还是因为《阿里巴巴Java开发手册》中强制要求线程池不允许Executors去创建,而是通过ThreadPoolExecutor,这是由于Executors允许创建大量线程,也允许堆积大量请求,可能会导致OOM。
还是依照惯例,先查看一下ThreadPoolExecutor继承和实现了什么接口。 可以看到ThreadPoolExecutor结构比较简单,首先继承了AbstractExecutorService抽象类,而这个抽象类实现了ExecutorService接口,这个接口又继承了Executor接口。
public class ThreadPoolExecutor extends AbstractExecutorService public abstract class AbstractExecutorService implements ExecutorService public interface ExecutorService extends Executor public interface ExecutorExecutor接口非常简单,只有一个execute方法,这个方法在后面的线程池中主要用于提交任务。
public interface Executor { void execute(Runnable command); }ExecutorService在之前的基础上又实现了不少功能,除了一些用于终止线程池的方法和成员变量之外,还增加了对于Callable接口和Future类的支持,使用submit方法来提交一些需要返回的线程请求。
public interface ExecutorService extends Executor { void shutdown(); List<Runnable> shutdownNow(); boolean isShutdown(); boolean isTerminated(); boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException; <T> Future<T> submit(Callable<T> task); <T> Future<T> submit(Runnable task, T result); Future<?> submit(Runnable task); <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException; <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException; <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException; <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; }AbstractExecutorService抽象类主要就是对submit方法、invokeAny方法之类的实现,这里就不再展开了。
ThreadPoolExecutor拥有多达24个成员变量,这里对一些比较重要的参数进行说明。
我们先来看线程池的核心参数:
//线程工厂,通过线程工厂来创建线程 private volatile ThreadFactory threadFactory; //拒绝策略 private volatile RejectedExecutionHandler handler; //线程没有任务的存活时间 private volatile long keepAliveTime; //如果为false(默认),核心线程空闲时间也保持活动, //否则超过keepAliveTime后关闭 private volatile boolean allowCoreThreadTimeOut; //核心线程池的大小 private volatile int corePoolSize; //最大的线程数量 private volatile int maximumPoolSize; //任务队列 private final BlockingQueue<Runnable> workQueue; //用来存储worker的集合,worker是线程池中任务真正的执行者 private final HashSet<Worker> workers = new HashSet<Worker>();通过这些参数我们可以顺便回忆一下线程池的原理:
如果核心线程池未满则直接创建线程执行任务。如果核心线程池已满则加入任务队列。如果任务队列已满但还未达到线程池上限则直接创建线程执行任务。如果任务队列已满且达到线程池上限,则执行拒绝策略。线程完成任务的时候如果核心线程池已满,则等待一段时间后关闭线程完成任务的时候如果核心线程池未满,则保持活跃(默认)除了上面一些参数,线程池中还有一些参数用于表示线程池和线程的状态。 ctl是一个原子类型的Integer,这表示对于这个值的更新和读取是线程安全的。 通过代码上面的注释可以得知ctl是一个用于表示线程池状态的变量,它的高3位用于表示线程的运行状态,后29位表示线程的数量。
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); private static final int COUNT_BITS = Integer.SIZE - 3; private static final int CAPACITY = (1 << COUNT_BITS) - 1; // runState is stored in the high-order bits //正常的运行时状态 private static final int RUNNING = -1 << COUNT_BITS; //关闭状态,不接受新任务,这个时候还在处理队列中的任务 private static final int SHUTDOWN = 0 << COUNT_BITS; //暂停状态,不接受新任务,不处理队列中的任务,中断运行中的任务 private static final int STOP = 1 << COUNT_BITS; //整理状态,所有任务都已经结束 private static final int TIDYING = 2 << COUNT_BITS; //结束状态,已经执行了terminated方法 private static final int TERMINATED = 3 << COUNT_BITS;线程池存在一个继承于AbstractQueuedSynchronizer(后面会专门写一篇源码分析)的内部类worker用于执行提交的任务,这里省略了一部分有关于锁的方法。 可以看到worker主要有三个成员变量(这里省略了一个序列化ID,线程本身是不可序列化的,这里主要用于抛出异常),分别为线程Thread、执行的任务firstTask以及一个任务计数器。 当调用构造函数的时候首先会把线程的状态设置为-1,这是用来防止线程被中断。 然后获取一个提交的任务,如果是核心线程的话后续会消耗任务队列中的任务。 最后通过线程工厂获取一个线程。 然后worker的run方法是通过调用外部类的runworker方法。
private final class Worker extends AbstractQueuedSynchronizer implements Runnable { /** Thread this worker is running in. Null if factory fails. */ final Thread thread; /** Initial task to run. Possibly null. */ Runnable firstTask; /** Per-thread task counter */ volatile long completedTasks; Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); } /** Delegates main run loop to outer runWorker */ public void run() { runWorker(this); } }介绍完了成员变量和内部类,接下来看一下构造方法,线程池的构造方法中,核心池大小、最大线程数、存活时间、时间单位、阻塞队列是必须的参数,可选的参数有线程工厂、拒绝策略。
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { //可以看到如果没有指定线程工厂和拒绝策略的话会使用默认的 this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler); } public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.acc = System.getSecurityManager() == null ? null : AccessController.getContext(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }我们来看一下默认的线程工厂,默认的线程工厂的实现在Executor中,这里主要看一下它是如何新建线程的。 可以看到比较普通,设定了分组,使用传入的Runnable接口(在之前worker传入了worker类),然后采用了"pool-x-thread-y"作为线程的名字,并且没有指定栈的大小。 最后将守护线程设置为false,设定优先级为默认优先级。
public Thread newThread(Runnable r) { Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0); if (t.isDaemon()) t.setDaemon(false); if (t.getPriority() != Thread.NORM_PRIORITY) t.setPriority(Thread.NORM_PRIORITY); return t; }可以看到默认的拒绝策略是指定内部类AbortPolicy,这个内部类实现了RejectedExecutionHandler接口,在AbortPolicy定义的策略是直接抛出异常。 当然线程池中还有另外3个内部类也同样实现了这个接口:
CallerRunsPolicy:使用调用者的线程来处理DiscardPolicy:直接丢弃DiscardOldsetPolicy:丢弃最老的任务如果想自定义一个拒绝策略,那么传入一个实现了RejectedExecutionHandler接口的类也可以。
private static final RejectedExecutionHandler defaultHandler = new AbortPolicy(); public static class AbortPolicy implements RejectedExecutionHandler { public AbortPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { throw new RejectedExecutionException("Task " + r.toString() + " rejected from " + e.toString()); } }终于轮到了提交任务的环节。
excute传入的参数主要是实现了Runnable接口的类,首先会获取线程的状态,如果线程数少于核心线程数则直接添加worker。之后会根据上面讲的原理执行不同的操作,可以看到这里主要的一个方法就是addWorker。
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) reject(command); }在ThreadPoolExecute中并没有实现submit,只是继承了AbstractExecutorService抽象类中的submit方法。 主要就是通过包装类Future转换runable和callable,最后使用execute方法提交。
public <T> Future<T> submit(Callable<T> task) { if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task); execute(ftask); return ftask; }可以看到在addworker方法中最开始是使用CAS的方式来更新ctl,如果在这个过程中线程池关闭(如果处于SHUTDOWN状态并且队列中还有任务则继续)则直接返回false。 然后会通过加ReenrantLock的方式锁定这个线程池,然后新建worker并添加,如果添加成功的话则调用其thread成员的start方法。
private boolean addWorker(Runnable firstTask, boolean core) { //通过cas更新,如果更新失败则自旋 retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { int wc = workerCountOf(c); if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } } //锁定整个线程池进行添加 boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } //添加成功则执行start方法 if (workerAdded) { t.start(); workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; }在开始之前,我想提出一个小问题,为什么调用了worker对象中的thread成员的start方法就能让worker开始执行呢? 回想一下昨天学过的线程start方法,这其实是个本地方法,实质上就是调用传入进来的runnable接口的run方法。 我们再来回看一下worker中的thread变量是如何生成的:
this.thread = getThreadFactory().newThread(this);也就说传入的Runnable对象并非是我们提交的那个firstTask,而是worker自身,所以这里会调用worker的run方法:
public void run() { runWorker(this); }通过这种方式worker才得以调度task,我还能说什么?精彩!
终于轮到了线程池最关键的部分了。 可以看到方法中一上来就获取了firstTask对象,然后判断这个对象是否为空,如果是的再去查看一下任务队列中的任务是否为空。 如果都为空则调用线程退出方法,如果任一不为空则开始执行这个task。 这里我们可以看到线程池还有两个task开始前和结束后执行的方法:beforeExecute和afterExecute,这两个方法是protected修饰且没有任何内容,所以如果想在任务开始前或者开始后干点什么就需要继承ThreadPoolExecute然后覆盖这两个方法。
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { while (task != null || (task = getTask()) != null) { w.lock(); // If pool is stopping, ensure thread is interrupted; // if not, ensure thread is not interrupted. This // requires a recheck in second case to deal with // shutdownNow race while clearing interrupt if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); Throwable thrown = null; try { task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } }前面说到,当一个worker调用getTask返回的是null的时候会调用退出方法,那什么时候会返回null呢? 通过源码我们可以总结出如下情况:
线程池大于STOP或者是线程池处于SHUTDOWN并且任务队列为空当线程池中的worker数量大于最大线程或者该worker等待超时并且这个时候worker数量大于1并且任务队列为空。其他的情况都比较好理解,那什么时候worker等待会超时呢? 根据最后面的代码可以得出结论: 当线程池允许关闭核心线程或者该worker不是核心线程会调用任务队列的poll(time)方法,如果一段时间内没有从任务队列中获取任务就把这个worker关闭。 如果线程池不允许关闭核心线程并且该worker是核心线程的时候会调用任务队列的take方法,如果没有任务就会一直阻塞在这里。 所以核心线程的活跃是通过阻塞队列的阻塞来实现的。
private Runnable getTask() { boolean timedOut = false; // Did the last poll() time out? for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null; } int wc = workerCountOf(c); // Are workers subject to culling? boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; continue; } try { Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; timedOut = true; } catch (InterruptedException retry) { timedOut = false; } } }这个方法是worker执行完所有自身的任务以及任务队列中的任务之后执行的方法。 当执行完所有任务之后,worker会尝试清理并且关闭线程池。 如果线程池还不能关系,则判断一下当前线程数是否小于核心线程数,如果是的话则添加一个线程。
private void processWorkerExit(Worker w, boolean completedAbruptly) { if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted decrementWorkerCount(); final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { completedTaskCount += w.completedTasks; workers.remove(w); } finally { mainLock.unlock(); } tryTerminate(); int c = ctl.get(); if (runStateLessThan(c, STOP)) { if (!completedAbruptly) { int min = allowCoreThreadTimeOut ? 0 : corePoolSize; if (min == 0 && ! workQueue.isEmpty()) min = 1; if (workerCountOf(c) >= min) return; // replacement not needed } addWorker(null, false); } }两种,通过构造函数实现和通过Executes工具类实现。 通过Executes工具类可以实现3种类型的线程池:
FixedThreadPool:固定线程数量的线程池SingleThreadExecutor:只有一个线程的线程池CachedThreadPool:该方法返回一个可根据实际情况调整线程数量的线程池。因为这道题是需要在面试的十几二十分钟内写完,所以体现一下自己了解线程池的基本原理就行了,不需要真的能够无BUG运行。毕竟实际代码加注释有2000多行,还不算抽象类和接口,怎么写的完。 回顾一下刚才线程池源码分析的一些重点:
任务类:继承Runnable接口,传入Runnable,设置thread和firstTask,,通过run方法调用外部的runTask。提交任务:线程数量小于核心池的时候直接新建线程,否则加入队伍,如果队伍满了就直接新建线程。执行任务:通过一个while循环不断获取任务,获取任务的时候如果线程数小于等于核心数则使用take,否则使用poll(time),当任务为null的时候退出,线程消亡。经杰哥指出,run循环的时候记得要把firstTask置为null,不然就一直只执行原来那个任务了。 import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; public class ThreadPool { BlockingQueue<Runnable> tasksQueue; int maxThreadNum; int coreThreadNum; boolean shutdown = false; int keepAliveTime; AtomicInteger size; public ThreadPool(int coreThreadNum,int maxThreadNum,int keepAliveTime){ this.coreThreadNum = coreThreadNum; this.maxThreadNum = maxThreadNum; this.keepAliveTime = keepAliveTime; tasksQueue = new ArrayBlockingQueue<>(10); } //内部类,实现了Runnable接口 private class Task implements Runnable{ Thread thread; Runnable firstTask; Task(Runnable r){ this.firstTask = r; //传入自身使得线程start的时候调用自己的run方法 this.thread = new Thread(this); } public void run(){ try { runTask(this); } catch (InterruptedException e) { e.printStackTrace(); } } public void setFirstTask(Runnable runnable){ this.firstTask = runnable; } } private void runTask(Task task) throws InterruptedException { Runnable r = task.firstTask; //记得把firstTask置为null //不然就一直执行这个runnable了 task.firstTask = null; //使用while循环获取任务,任务为null的时候退出 while(task.firstTask != null || (r = getTask())!=null){ r.run(); } } public Runnable getTask() throws InterruptedException { //根据不同情况选择使用take还是poll if(size.get() <= coreThreadNum){ return tasksQueue.take(); }else { return tasksQueue.poll(keepAliveTime, TimeUnit.SECONDS); } } //提交任务 public void execute(Runnable runnable){ if(shutdown) return; int num = size.get(); if(num == maxThreadNum) return; if(num >= coreThreadNum && num < maxThreadNum && tasksQueue.size() < 10) tasksQueue.add(runnable); else if(num < coreThreadNum) { Task task = new Task(runnable); size.getAndAdd(1); task.thread.start(); }else if (tasksQueue.size() == 10){ Task task = new Task(runnable); size.getAndAdd(1); task.thread.start(); } } public void setShutdown(){ this.shutdown = true; } }今天总结了一下线程池的相关知识点,相比较Thread还是难了不少。 剩下的时间比较紧张,我发现字节好像特别喜欢问redis相关内容,所以一定花一点时间把redis的东西好好再复习一遍。 所以明天花一点时间看一下锁部分的源码,然后就对之前的知识点做一个回顾。