Java基础--ThreadPoolExecutor--线程池和锁

    技术2022-07-11  86

    Java基础--ThreadPoolExecutor--线程池和锁

    1. Executors1.1 Executors构造1.2 newFixedThreadPool(int nThreads)1.3 newCachedThreadPool1.4 newSingleThreadExecutor1.5 newScheduledThreadPool1.6 newSingleThreadScheduledExecutor1.7 newWorkStealingPool1.8 总结 2. ThreadPoolExecutor3 Executor4 ExecutorService5 RejectedExecutionHandler6 ThreadPoolExecutor7 WorkertryAcquiretryReleaserun构造runWorkergetTaskdecrementWorkerCountprocessWorkerExittryTerminateinterruptIdleWorkers 8. executeaddWorkeraddWorkerFailed 9. submit(AbstractExecutorService)10. AbortPolicy11. CallerRunsPolicy12. DiscardOldestPolicy13. DiscardPolicy14. 总结 使用多线程的时候,经常会使用Executors创建线程池,然后使用线程池。从而达到复用线程,减少线程切换,从而增加性能。

    1. Executors

    1.1 Executors构造

    Executors是一个工具类,不能被实例化。

    1.2 newFixedThreadPool(int nThreads)

    创建一个可重用固定线程数的线程池,以共享的无界队列方式来运行这些线程。 创建指定个线程的线程池,而且不能扩容,在刚开始就会初始化指定线程池。 创建线程池本身会比较耗时,而且传入的数值过大时,可能造成OOM。 参数

    核心线程数最大线程数非核心空闲线程存活时间非核心空闲线程存活时间单位任务存储队列 调用的是threadPoolExecutor。 调用的是threadPoolExecutor。

    1.3 newCachedThreadPool

    创建一个可根据需要创建新线程的线程池,但是在以前构造的线程可用时将重用它们。 创建一个初始为0的可缓存的线程池。非核心空闲线程存活时间为60s.

    1.4 newSingleThreadExecutor

    创建一个使用单个 worker 线程的 Executor,以无界队列方式来运行该线程。

    1.5 newScheduledThreadPool

    创建一个线程池,它可安排在给定延迟后运行命令或者定期地执行。

    1.6 newSingleThreadScheduledExecutor

    创建一个单线程执行程序,它可安排在给定延迟后运行命令或者定期地执行。

    1.7 newWorkStealingPool

    具有抢占式操作的线程池(1.8增加)

    1.8 总结

    2. ThreadPoolExecutor

    这是ThreadPoolExecutor的结构

    3 Executor

    Executor接口只有一个方法,execute,参数是一个Runnable。 传入一个Runnable任务,调用execute执行,其执行的时间是不确定的。

    4 ExecutorService

    5 RejectedExecutionHandler

    无法由 ThreadPoolExecutor 执行的任务的处理程序。

    6 ThreadPoolExecutor

    看到这里,ThreadPoolExecutor的结构就比较明显了。 首先ThreadPoolExecutor实现了Executor接口,传入任务,在未来时间执行。 ThreadPoolExecutor实现了ExecutorService接口,可以用于控制ThreadPoolExecutor是否接受任务,接受有返回的任务,以及ThreadPoolExecutor的停止等。 ThreadPoolExecutor继承AbstractExecutorServe抽象类。抽象类封装了提交任务,执行任务的通用封装。 ThreadPoolExecutor内部聚合了Worker类,Worker类实现了Runnable接口,可以实现任务的执行。 而且Wroker继承了AbstractQueuedSynchronizer类,用于实现多线程间数据的安全,以及相对应的线程调度。 ThreadPoolExecutor内部还组合了多个Policy的内部类,用于当ThreadPoolExector出现异常时的处理。

    7 Worker

    在Worker里面主要看下Worker继承AQS,实现AQS要求实现的tryAcquire,tryRelease,tryAcquireShared,tryReleaseShard和isHeldExclusively方法。 其他的请看: Java基础–AQS原理 Java基础–AQS的Condition源码解析

    tryAcquire

    ThreadPoolExecutor内部的Worker是独占锁,而且是二进制锁,其锁状态只有两个状态:0,1 0表示空闲,1表示占用。

    tryRelease

    释放锁,清空锁持有线程,然后设置锁状态为0. 因为是独占锁,所以Worker没有实现tryAcquireShared和tryReleaseShared方法。

    run

    Wroker还实现了Runnable接口,Runnable接口只有一个方法,就是定义任务的run方法。 这是run方法的时序图,我们看下run方法都干了什么: 看起来挺复杂的。

    构造

    构造方法需要有一个Runnable 的任务来初始化。 创建Worker时,会设置Worker的锁状态是-1,然后将传入的Runnable任务初始化任务。 然后将当前Runnable任务传入,从ThreadFactory获取线程。

    runWorker

    final void runWorker(Worker w) { // 传入是Worker对象(this) // 获取当前线程 Thread wt = Thread.currentThread(); // 获取任务 Runnable task = w.firstTask; // 将Worker的任务设置为空 w.firstTask = null; // 将Worker的锁释放,允许Worker接收新的任务 w.unlock(); // allow interrupts // boolean completedAbruptly = true; // 尝试执行任务 try { // 任务不为空,或者调用getTask还能得到任务 while (task != null || (task = getTask()) != null) { // Worker上锁 w.lock(); // If pool is stopping, ensure thread is interrupted; // if not, ensure thread is not interrupted. This // requires a recheck in second case to deal with // shutdownNow race while clearing interrupt // 判断线程池的状态 if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && // 线程被中年高端 !wt.isInterrupted()) // 当前线程中断 wt.interrupt(); try { // 线程执行前准备,在ThreadPoolExecutor中是空 beforeExecute(wt, task); // 定义一个异常超类,Exception和Error都是Thowable的子类 Throwable thrown = null; try { // 执行任务 task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { // 线程执行后操作,在ThreadPoolExecutor中也是空 afterExecute(task, thrown); } } finally { // 将任务置空,准备下一次执行任务 task = null; // 将线程执行的任务数量++ w.completedTasks++; // 释放锁 w.unlock(); } } // 设置worker意外的标志为false completedAbruptly = false; } finally { // 调用processWorkerExit进行worker退出 processWorkerExit(w, completedAbruptly); } }

    getTask

    // 获取任务 private Runnable getTask() { // 超时标志设置为false boolean timedOut = false; // Did the last poll() time out? // 死循环 for (;;) { // 获取线程池状态 int c = ctl.get(); // 获取状态 int rs = runStateOf(c); // Check if queue empty only if necessary. // 检查线程池状态和线程池任务队列 if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { // 如果线程池状态不是RUNNING或者线程池状态是SHUTDOWN但是线程池任务队列空 // 那么执行decrementWorkerCount decrementWorkerCount(); // 将线程池状态设置为0 // 不返回任务 return null; } // 获取线程池现在运行的线程数量(Worker数量) int wc = workerCountOf(c); // Are workers subject to culling? // 检查线程池是否允许非核心线程超时,以及线程数量是否超过核心现场数量 boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; // 如果线程池不允许非核心线程超时,而且线程数量超过核心线程允许数量,那么当前线程不允许在执行任务 // 假设允许10个核心线程,但是现在线程池已经有11个线程了,而且不允许非核心线程超时 // 那么就需要终止一个线程,保证核心线程有10个。但是应该终止哪个线程呢? // 既然当前线程的任务已经执行完毕了,现在在调用getTask获取下一个执行的任务, // 那么终止的线程就是你了 if ((wc > maximumPoolSize || (timed && timedOut)) // 如果线程池线程数量小于允许的核心线程数量,但是任务队列已经空了,那么返回null && (wc > 1 || workQueue.isEmpty())) { // 因为当前线程已经不再执行任务了,所以执行任务的线程数量需要减1 if (compareAndDecrementWorkerCount(c)) // 线程池线程数量减1,返回null return null; // 如果线程池数量减1失败,那么,重试,直到线程池数量减1成功 continue; } // 尝试从任务队列中获取任务 try { // 如果允许超时 Runnable r = timed ? // 那么就等待指定时间,从任务队列中获取一个任务 // 难道多个线程池的线程同时从任务队列中获取一个任务,这里不会发生并发问题吗?(存疑) // 非核心线程 workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : // 如果超时获取null // 否则阻塞获取任务 // 核心线程 workQueue.take(); // 如果任务不是null if (r != null) // 返回任务 return r; // 否则设置超时 timedOut = true; } catch (InterruptedException retry) { // 出现中断异常,重试 timedOut = false; } } }

    一个线程安全的整形变量。它有这几个值: RUNNING,SHUTDOWN,STOP,TIDYING,TERMINATED.

    RUNNING:Worker可以接收新的任务执行SHUTDOWN:不在接收新的任务,但是会将已经接收的任务执行完STOP:不在接收新的任务,而且已经接收的任务也不会再运行,会给正在运行中的线程发送中断。TIDYING:Worker任务列表清空,准备执行异常方法TERMINATED:Worker的异常方法已经执行。

    这几个状态的状态转换图:

    decrementWorkerCount

    每次将线程安全的ctl的值减1,直到为0.

    processWorkerExit

    worker意外退出

    private void processWorkerExit(Worker w, boolean completedAbruptly) { // 判断worker是否发生意外 if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted // 如果发生意外,那么清空线程池worker的数量(相当于线程池发生意外) decrementWorkerCount(); // 获取线程池的锁 final ReentrantLock mainLock = this.mainLock; // 上锁 mainLock.lock(); try { // 线程池执行任务数量同步worker的执行成功数量 completedTaskCount += w.completedTasks; // 将出现意外的worker从worker的set中移除 workers.remove(w); } finally { mainLock.unlock(); } // 线程池进入终止阶段,尝试执行线程池终止的操作 tryTerminate(); int c = ctl.get(); if (runStateLessThan(c, STOP)) { if (!completedAbruptly) { int min = allowCoreThreadTimeOut ? 0 : corePoolSize; if (min == 0 && ! workQueue.isEmpty()) min = 1; if (workerCountOf(c) >= min) return; // replacement not needed } addWorker(null, false); } }

    tryTerminate

    执行线程池终止的操作

    final void tryTerminate() { // 死循环 for (;;) { // 获取线程池的状态 int c = ctl.get(); // 判断线程池的状态是否是RUNNING if (isRunning(c) || // 判断线程池的状态是不是TIDYING runStateAtLeast(c, TIDYING) || // 判断线程池的状态是不是SHUTDOWN而且任务队列空(需要转换状态为TIDYING) (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty())) // 只有TERMINATED状态的线程池才会执行异常方法 return; // 线程池中还有正在运行的任务 if (workerCountOf(c) != 0) { // Eligible to terminate // 终止一个worker interruptIdleWorkers(ONLY_ONE); // 结束 return; } // 线程池中没有任务在执行 final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // 设置线程池的状态是TIDYING if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) { try { // 执行线程池异常操作 terminated(); } finally { // 线程池状态设置为TERMINATED ctl.set(ctlOf(TERMINATED, 0)); // 唤醒等待的线程(等待的线程是什么呢?(存疑)) termination.signalAll(); } return; } } finally { mainLock.unlock(); } // else retry on failed CAS } }

    interruptIdleWorkers

    // 中断worker

    private void interruptIdleWorkers(boolean onlyOne) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { for (Worker w : workers) { Thread t = w.thread; if (!t.isInterrupted() && w.tryLock()) { try { t.interrupt(); } catch (SecurityException ignore) { } finally { w.unlock(); } } // 只中断一个worker if (onlyOne) break; } } finally { mainLock.unlock(); } }

    8. execute

    提交任务,没有返回值

    public void execute(Runnable command) { // 检测任务 if (command == null) throw new NullPointerException(); // 获取线程池worker数量 int c = ctl.get(); // 如果线程池中线程数量小于核心线程数量 if (workerCountOf(c) < corePoolSize) { // 增加worker if (addWorker(command, true)) // 结束 return; // 增加失败,重新获取线程池中线程数量 c = ctl.get(); } // 如果线程池的状态是RUNNING,而且任务队列中加入任务成功 if (isRunning(c) && workQueue.offer(command)) { // 获取线程池线程数量 int recheck = ctl.get(); // 如果线程池的状态不是RUNNING,那么将当前任务从任务队列中移除 if (! isRunning(recheck) && remove(command)) // 执行任务失败操作 reject(command); // 如果线程池的状态还是RUNNING或者任务队列中移除当前任务失败 else if (workerCountOf(recheck) == 0) // 增加一个空的非核心worker addWorker(null, false); } // 以当前任务增加worker,woeker不是核心worker else if (!addWorker(command, false)) // 执行任务失败操作 reject(command); }

    addWorker

    增加worker

    private boolean addWorker(Runnable firstTask, boolean core) { retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. // 检测线程池状态和任务以及任务队列 if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) // 未通过检测 return false; for (;;) { int wc = workerCountOf(c); if (wc >= CAPACITY || // worker数量超过允许的最大数量 wc >= (core ? corePoolSize : maximumPoolSize)) // worker数量小于核心worker数量 return false; // worker数量++ if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } } boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { // 新创建 worker w = new Worker(firstTask); // 获取worker的线程 final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); // worker加入到workers队列(set) workers.add(w); int s = workers.size(); if (s > largestPoolSize) // 更新线程池worker数量 largestPoolSize = s; // worker增加成功 workerAdded = true; } } finally { mainLock.unlock(); } // worker增加成功 if (workerAdded) { // 启动worker的线程 t.start(); workerStarted = true; } } } finally { // 如果worker启动失败 if (! workerStarted) // worker启动失败,将worker从workers队列中移除 addWorkerFailed(w); } return workerStarted; }

    addWorkerFailed

    worker增加失败,将worker移除

    private void addWorkerFailed(Worker w) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { if (w != null) workers.remove(w); // 线程池worker数量-- decrementWorkerCount(); // 执行线程池异常操作(空) tryTerminate(); } finally { mainLock.unlock(); } }

    9. submit(AbstractExecutorService)

    提交任务,有返回值

    10. AbortPolicy

    任务执行失败,抛出异常。

    11. CallerRunsPolicy

    任务被拒绝,那么在当前线程调用(不能保证是多线程,可能是主线程直接调用run方法(串行))

    12. DiscardOldestPolicy

    移除任务队列头的任务。 可以理解为,任务队列满了,在加入任务的时候,会将队列头部的挤掉。

    13. DiscardPolicy

    任务增加失败,什么也不做。

    14. 总结

    ThreadPoolExecutor继承了AbstractExecutorService,AbstractExecutorService实现了任务的提交,任务的增加等方法。但是实际上调用的是ThreadPoolExecutor的execute方法。 在ThreadPoolExecutor的execute方法中会判断线程池是否可用,如果可用,就会获取线程池的锁(ReentrantLock),然后将任务加入任务队列。 线程池是如何保证线程安全的? 1.线程池任务调度使用ReentrantLock保证任务不会被重复执行 任务队列必须是BlockQueue类型的,BlockQueue的子类保证队列的出入的线程安全。 2.线程池的worker节点继承了AbstractQueueSynchronizer() 当worker在运行任务前上锁,在任务运行结束后解锁。上锁后,不会响应中断,保证开始运行的任务不会被其他线程中断。只有任务结束,才会被中断。 3.worker的锁是不可重入的锁 防止线程池操作调整大小,获取数量等操作时,中断线程,导致任务没有完整的被执行。 任务加入线程池失败有4中处理方式

    AbortPolicy: 抛出异常CallerRunsPolicy:主线程调用run运行DiscardOldestPolicy:抛弃任务队列头的任务,将失败的任务加入任务队列DiscardPolicy:抛弃失败的任务
    Processed: 0.018, SQL: 9