以排队买票来理解线程池ThreadPoolExecutor

    技术2022-07-12  92

    首先我们需要了解线程池的主要成员变量:

    成员变量: 首先是最重要的3个: corePoolSize:核心池大小 maximumPoolSize :最大线程池大小 keepAliveTime :非核心线程存活时间 这三个是构造线程池对象必须要输入的三个参数 poolsize :线程池当前的线程数 unit :用于指定keepAliveTime的时间单位,常用的有TimeUnit.MILLISECONDS,TimeUnit.SECONDS(秒), TimeUnit.MINUTES(分钟) allowCoreTheadTimeOut :是否允许为核心线程设置存活时间 首先我们来理解线程池最重要的4个参数:corePoolSize、maximumPoolSize 、keepAliveTime 、poolSize 理解了这4个参数,其他的部分就不难理解了。

           我们先假设一个场景:火车站卖票,假设火车站有10个窗口,线程池初始化以后这10个窗口都处于工作状态,那么核心池大小corePoolSize就是10,这个时候,有8个人来买票,那么这8个人在窗口有空闲的情况下会分别在8个窗口买票,应该不会有人在有窗口空闲的时候还去排在别人后面吧!那么这8个人正在买票的这个过程就可以理解为8个线程,此时poolSize 等于8,这个时候又来了更多的人买票,当所有窗口都被占满的时候,第11个人就只能排队了,第11个人来了,而其他人还没买完,那他只能等着,来都来了,那就排队买票呗!那么他排队的这个位置就叫做队列,这个时候,人越来越多,火车站每个窗口都排成了长长的一条龙。        火车站站长定睛一看,顿感大事不妙:咱们这个火车站等待区只能容纳100个人(队列的上限),再来就要挤爆了,平时按照取票的速度怎么也不会累积这么多的人,可到了春运时期,人流量增多,这可咋整呢?于是站长立即启动应急预案,再增加5个额外的售票窗口,以缓解人流量,那么此时maximumPoolSize就等于15,而poolSize此时等于15,但是这个时候人还是越来越多,火车站大厅里面站满了整整115个人,都等着买票,这时候又来了一位老乡跟站长说我要买票,站长实在没辙了,跟他说火车站没位置了,你等会再来吧!并且抛出RejectedExecuteException异常。 以上就是线程池调度的基本策略,当任务到达时,如果线程数小于corePoolSize,那么就会创建新的线程来执行该任务,如果线程数等于corePoolSize,那么新来的任务就只能在队列中等待,如果这个时候队列也满了,再来任务的话,就会“启动应急预案”开启新的线程,但总所有的线程数加起来不能超过maximumPoolSize 。

           我们用一个例子来感受一下任务增多时,线程池调度的基本策略:

    public class ThreadTest{ private static final ThreadPoolExecutor threadPool = new ThreadPoolExecutor(5, 10, 4000, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(5), new ThreadPoolExecutor.AbortPolicy()); static class MyTimerTask extends TimerTask { @Override public void run() { System.out.println("正在执行的任务有:"+threadPool.getPoolSize()+", 队列中的任务有:"+threadPool.getQueue().size() +", 执行完毕的任务有:"+threadPool.getCompletedTaskCount()); new Timer().schedule(new MyTimerTask(),500); } } public static void main(String[] args) { MyTimerTask myTimerTask = new MyTimerTask(); myTimerTask.run(); for (int i = 0; i < 15; i++) { final int j = i; try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } threadPool.execute(new Runnable() { @Override public void run() { try { Thread.sleep(16000); } catch (Exception e) { } } }); } } }

           这里我们用了Timer类来构造一个定时器,实时监控线程的运行状态,便于理解,可以看到,新加的任务首先会将核心池填满,然后是队列,最后是最大线程池。        更改for循环,将其改为16次,如果最大线程池和队列都被填满,再添加一个新的任务,则会抛出异常。

           如果设置keepAliveTime =1分钟,假设一个人在额外窗口买票,但是这个买票的工作人员十分的不负责任,一遍卖票一遍嗑瓜子,导致超过1分钟了,买票的人还没拿到票,这时候被站长发现了,一气之下把这个卖票员炒了,并关闭这个额外的买票窗口,咱们火车站不允许出现如此消极工作的员工。如果allowCoreThreadTimeOut()(这个方法即可以读也可以写),如果他是true,那么允许核心线程超时,如果为false,那么不允许,默认是false,在false的情况下如果有一个核心线程的卖票员,一遍卖票一边嗑瓜子,导致买票时间超过1分钟,这时候又被站长发现了,一怒之下把卖票员炒了并关闭了该核心线程,哼!别以为你是核心员工我就不炒你。

    接下来我们需要从线程池的具体实现原理:(该部分借鉴自他人文章,但并非复制粘贴)

    1.线程池的状态 2.任务的执行 3.线程池中任务的初始化 4.任务缓存队列以及排队策略 5.任务拒绝策略 6.线程池关闭 7.线程容量调整

    1.线程池的五种状态 RUNNING SHUTDOWN STOP TIDYING TERMINATED

    RUNNING状态:线程池处在RUNNING状态时,能够接收新任务,以及对已添加的任务进行处理。 火车站正常运营中,来多少人我收多少人,如果挤爆了,再来人我就拒绝并且抛出RejectedExecuteException异常。 SHUTDOWN状态:线程池处在SHUTDOWN状态时,不接收新任务,但能处理已添加的任务。在执行shutdown()方法后,线程池进入SHUTDOWN,火车站打烊了,正在买票的和在队列继续买你们的票,但是新来的任务就别来了,打烊了,员工还等着回家睡觉呢! STOP状态:线程池处在STOP状态时,不接收新任务,不处理已添加的任务,并且会中断正在处理的任务。 在执行shutDownNow()方法后,线程池进入STOP状态。火车站正在运行,突然来了一堆恐怖分子,大喊shutDownNow(),我们要占领火车站,想要小命的赶紧走,这时候大家纷纷逃离,命都要没了还买啥票啊!于是陆陆续续的都走了。 TIDYING状态:当所有的任务已终止,ctl记录的”任务数量”为0,线程池会变为TIDYING状态。当线程池变为TIDYING状态时,会执行钩子函数terminated()。不管是火车站打烊还是被恐怖分子占领,这个时候火车站都空无一人了。 TERMINATED状态:线程池彻底终止,就变成TERMINATED状态。 在经历了TIDYING以后,火车站被拆除。嗯!强行这样理解哈哈!

    2.任务的执行 任务执行的基本策略就是之前解释4个重要参数的策略,具体策略可以翻看ThreadPoolExecutor,我这里只是浅显的解读一下,并无深入地源码解读。重要的部分都在execute()方法中。

    3.线程池中线程初始化 创建线程池后需要提交任务才会创建线程,如果需要线程池创建之后立即创建线程,可执行presetstartCoreThread()或presetstartAllCoreThread(),这两个方法都能创建空闲的线程,直到有新的任务到来。

    4.任务缓存队列及排队策略 队列这一块由于博主还是一个初学者,所以还没看到队列这一块,但是把大概的介绍一下,为以后的学习提供方向 基于数组的先进先出队列ArrayBlockingQueue: ArrayBlockingQueue是基于数组的、有界的、遵循FIFO原则的阻塞队列,队列初始化时必须指定队列的长度。 基于链表的先进先出队列LinkedBlockingQueue:LinkedBlockingQueue是基于链表的、有界的、遵循FIFO原则的阻塞队列,队列默认的最大长度为Integer.MAX_VALUE 不保存提交的任务,而是新建线程来执行任务SynchronousQueue:来火车站的人要么买票,要么抛出RejectedExecuteException异常,想排队?没门

    5.任务拒绝策略 火车站被挤爆的时候,如果还有人来买票,站长提着扫帚出来赶人了,以下是四种拒绝策略: AbortPolicy:丢弃任务并抛出RejectedExecuteException异常。这是线程池默认的拒绝策略 DiscardPolicy:丢弃任务但不抛出异常,使用此策略,可能会使我们无法发现系统的异常状态。建议是一些无关紧要的业务采用此策略。 DiscardOldestPolicy:丢弃队列最前面的任务,然后重新提交被拒绝的任务。此拒绝策略,是一种喜新厌旧的拒绝策略。是否要采用此种拒绝策略,还得根据实际业务是否允许丢弃老任务来认真衡量。好惨啊!在火车站排队买票排队最久的那个居然要被赶出去!!! CallerRunsPolicy:如果任务被拒绝了,则由调用线程(提交任务的线程)直接执行此任务。站长:小伙子看你骨骼惊奇,给你开个后门买票,不用排队啦!

    6.线程池的关闭 shutdown()和shutDownNow():这两种方法前面已说不再赘述,至于是火车站打烊还是被恐怖分子打劫,具体的需要去看源码。

    7.线程池容量动态调整 setCorePoolSize()和SetMaximumPoolSize():虽然火车站开张了,但是来买票的人寥寥无几,为节省火车站开销,减少这两个值,或者火车站开张了但是门庭若市,站长为提升买票效率,增加这两个值,这是在线程池创建以后为优化CPU性能,合理利用资源而需要动态去改变的两个值,来应对不同场景下的任务。

    一般情况下我们不需要直接去调用ThreadPoolExecutor来创建线程池而是利用Executor接口去创建: FixedThreadPool(nThreads):创建缓冲池容量为nThreads的线程池,corePoolSize=maximumPoolSize=nThreads,队列无限,直到内存溢出为止,站长说:来吧!我就开放这么nThreads个窗口,你来再多的人我也不开放额外卖票窗口,我大厅大得很,来的人都给我排队 SingleThreadExecutor():corePoolSize=0,maximumPoolSize=1,SingleThreadExecutor就像线程数为1的FixedThreadPool CachedThreadPool():corePoolSize=0,maximumPoolSize=最大int值,使劲利用CPU资源,站长说:咱有的是钱,招2147483647个卖票员工,让春运不再拥挤,让每个人都能安心回家。

    Processed: 0.019, SQL: 9