线程池:ThreadPoolExecutor

    技术2022-07-11  116

    github地址: https://github.com/lishanglei/thread-pool.git

    源码

    public ThreadPoolExecutor(int corePoolSize, //核心线程数 int maximumPoolSize,//最大线程数量 long keepAliveTime, //线程存活时间 TimeUnit unit, //线程存活时间单位 BlockingQueue<Runnable> workQueue,//消息队列 RejectedExecutionHandler handler,//拒绝策略 ThreadFactory threadFactory //线程工厂 ) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler); }

    workQueue任务队列:

    直接提交队列有界任务队列无界任务队列优先任务队列
    直接提交队列:

    ​ 设置为synchronousQueue队列,SynchronousQueue是一个特殊的BlockingQueue

    ​ 它没有容量,每执行一个插入操作就会堵塞,需要再执行一个删除操作才会被唤醒

    ​ 反之,每一个删除操作也要等待对应的插入操作

    public class SynchronousQueue { private static ExecutorService pool; public static void main(String[] args) { /** * 当任务队列为synchronousQueue, * 创建的线程数大于maximumPoolSize时,直接执行了拒绝策列抛出异常 * 使用synchronousQueue队列,提交的任务不会被保存,总是会马上提交执行, * 如果用于执行任务的线程数少于maximumPoolSize时 * 则尝试创建新的线程,如果达到maximumPoolSize设置的最大值, * 则根据你设置的handler执行拒绝策略 * 因此这种方式提交的任务不会被缓存起来,而是会被马上执行,在这何种情况下, * 你需要对你程序的并发量有一个准确的评估 * 才能设置适合的maximumPoolSize数量 */ pool = new ThreadPoolExecutor( 1, //核心线程数 2, //最大线程数 1000, //线程存活时间 TimeUnit.MILLISECONDS, //线程存活时间单位 new java.util.concurrent.SynchronousQueue<>(), //直接提交队列 Executors.defaultThreadFactory(), //线程工厂 new ThreadPoolExecutor.AbortPolicy()); //拒绝策略:直接抛出异常 for (int i = 0; i < 3; i++) { pool.execute(new Thread(() -> { System.out.println(Thread.currentThread().getName()); })); } } }
    有界任务队列:
    public class ArrayBlockingQueue { private static ExecutorService pool; public static void main(String[] args) { /** * 使用ArraysBlockingQueue有界任务队列,若有新的任务需要执行时,线程池会创建新的线程 * 知道创建数量达到最大corePoolSize,则会将新的任务加入到等待队列中,若等待队列已满,则超过 * ArrayBlockingQueue初始化的数量,则继续创建线程, * 直到线程数达到maximumPoolSize设置的组大线程数 * 若大于maximumPoolSize,则执行拒绝策略,在这种情况下, * 线程数量的上限与有接任务队列的状态有直接关系 * ,如果有界任务队列初始容量比较大或者没有达到超负荷的状态, * 线程数将一直维持在corePoolSize以下, * 反之当任务队列已满时,则会以maximumPoolSize为最大线程数上线 */ pool = new ThreadPoolExecutor( 5, //核心线程数 10, //最大线程数 1000, //线程存活时间 TimeUnit.MILLISECONDS, //线程存活时间单位 new java.util.concurrent.ArrayBlockingQueue<>(3),//有界任务队列 Executors.defaultThreadFactory(), //线程工厂 new ThreadPoolExecutor.AbortPolicy()); //拒绝策略:直接抛出异常 for (int i = 0; i <20; i++) { pool.execute(new Thread(() -> { System.out.println(Thread.currentThread().getName()); })); } } }
    无界任务队列:
    public class LinkedBlockingQueue { private static ExecutorService pool; public static void main(String[] args) { /** * 使用无界任务队列,线程池的任务队列可以无限制的添加新的任务队列, * 而线程池创建的最大线程数量就是你的corePoolSize设置的数量 * ,也就是说在这种情况下maximumPoolSize参数时无效的, * 哪怕你的任务队列中缓存了很多未执行的任务 * 当线程池的线程数达到corePoolSize后,就不会再增加了. * 后续若有新的任务加入,则直接进入等待队列,当使用这种任务队列模式时, * 一定要主义你任务提交与处理之间的协调与控制 * 不然会出现队列中的任务由于无法及时处理导致一直增长,直到资源耗尽 */ pool = new ThreadPoolExecutor( 5, //核心线程数 10, //最大线程数 1000, //线程存活时间 TimeUnit.MILLISECONDS, //线程存活时间单位 new java.util.concurrent.LinkedBlockingQueue<>(),//无界任务队列 Executors.defaultThreadFactory(), //线程工厂 new ThreadPoolExecutor.AbortPolicy()); //拒绝策略:直接抛出异常 for (int i = 0; i <200; i++) { pool.execute(new Thread(() -> { System.out.println(Thread.currentThread().getName()); })); } } }
    优先任务队列:
    public class PriorityBlockingQueue { private static ExecutorService pool; public static void main(String[] args) { /** * 除第一个任务直接创建线程执行外,其它任务都被放入优先任务队列, * 按照优先级进行重新排列执行, * 且线程数一直为corePoolSize. * PriorityBlockingQueue其实是一个特殊的无界队列, * 它其中无论添加了多少个任务,线程池创建的线程数不会 * 超过corePoolSize的数量,只不过其它任务队列一般是按照先进先出的规则处理任务, * 而PriorityBlockingQueue队列可以自定义根据任务的优先级顺序先后执行 * */ pool = new ThreadPoolExecutor( 1, //核心线程数 2, //最大线程数 1000, //线程存活时间 TimeUnit.MILLISECONDS, //线程存活时间单位 new java.util.concurrent.PriorityBlockingQueue<>(), //优先任务队列 Executors.defaultThreadFactory(), //线程工厂 new ThreadPoolExecutor.AbortPolicy()); //拒绝策略:直接抛出异常 for (int i = 0; i <20; i++) { pool.execute(new ThreadTask(i)); } } } @Data class ThreadTask implements Runnable,Comparable<ThreadTask>{ private int priority; public ThreadTask(int priority) { this.priority = priority; } //当前对象和其它对象比较,当前优先级大就返回-1,优先级小就返回1,值越小优先级越高 @Override public int compareTo(ThreadTask o) { return this.priority>o.priority?-1:1; } @Override public void run() { try { Thread.sleep(1000); System.out.println("priority:"+this.priority+"ThreadName:"+Thread.currentThread().getName()); } catch (InterruptedException e) { e.printStackTrace(); } } }

    ThreadFactory:

    ​ 线程池中的线程就是通过ThreadPoolExecutor中的ThreadFactory线程工厂创建的,

    ​ 那么通过自定义ThreadFactory,可以按需要对线程池中创建的线程进行一些特殊的设置,如命名,优先级等

    public class ThreadFactoryDemo { private static ThreadPoolExecutor executor; public static void main(String[] args) { //自定义线程工厂 executor = new ThreadPoolExecutor(2, 4, 1000, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(5), new ThreadFactory() { @Override public Thread newThread(Runnable r) { System.out.println("线程" + r.hashCode() + "创建"); Thread thread = new Thread(r, "threadPool" + r.hashCode()); return thread; } }); for (int i = 0; i < 10; i++) { executor.execute(new Thread(() -> { System.out.println("ThreadName:" + Thread.currentThread().getName()); })); } } }

    拒绝策略:

    AbortPolicy策略:该策略会直接抛出异常,阻止系统正常工作CallerRunsPolicy策略:如果线程池的线程数量达到上限,该策略会把任务队列中的任务放在嗲用着线程当中运行DiscardOldestPolicy策略:该策略会丢弃任务队列中最老的一个任务,也就是当前任务队列中最先被添加进去的,马上要被执行的那个任务,并尝试再次提交DiscardPolicy策略:该策略会默默丢弃无法处理的任务,不予任何处理,当然使用此策略,业务场景中需要允许任务的丢失 public class RejectedExecutorHandlerDemo { private static ExecutorService pool; //自定义拒绝策略 public static void main(String[] args) { pool=new ThreadPoolExecutor( 1, 2, 1000, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(5), Executors.defaultThreadFactory(), new RejectedExecutionHandler() { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { System.out.println(r.toString()+"执行了拒绝策略"); } } ); for(int i=0;i<10;i++){ pool.execute(new Thread(()->{ try { //让线程阻塞,使后续任务进入到缓存队列 Thread.sleep(1000); System.out.println("ThreadName:"+Thread.currentThread().getName()); } catch (InterruptedException e) { e.printStackTrace(); } })); } } }

    线程池拓展:

    ​ 通过这三个接口我们可以监控每个任务的开始和结束时间

    beforeExecute:线程池中任务运行前执行afterExecute:线程池中任务运行完毕后执行terminated:线程池退出后执行 public class ThreadPoolExecutorExpand { private static ThreadPoolExecutor executor ; /** * Nthreads=CPU数量 * Ucpu=目标CPU的使用率,0<=Ucpu<=1 * W/C=任务等待时间与任务计算时间的比率 */ //private static int Nthreads = Ncpu*Ucpu*(1+W/C) public static void main(String[] args) { //实现自定义接口 executor=new ThreadPoolExecutor( 2, 4, 1000, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(5), new ThreadFactory() { @Override public Thread newThread(Runnable r) { System.out.println("线程"+r.hashCode()+"创建"); Thread thread =new Thread(r,"threadPool"+r.hashCode()); return thread; } }, new ThreadPoolExecutor.CallerRunsPolicy() ){ protected void beforeExecute(Thread t,Runnable r){ System.out.println("准备执行: "+((ThreadTask)r).getTaskName()); } protected void afterExecute(Runnable r,Throwable t){ System.out.println("执行完毕: "+((ThreadTask)r).getTaskName()); } protected void terminated() { System.out.println("线程池退出"); } }; for(int i=0;i<10;i++){ executor.execute(new ThreadTask("Task" +i)); } /** * 可以看到通过对beforeExecute()、afterExecute()和terminated()的实现, * 我们对线程池中线程的运行状态进行了监控, * 在其执行前后输出了相关打印信息。另外使用shutdown方法可以比较安全的关闭线程池, * 当线程池调用该方法后,线程池中不再接受后续添加的任务。但是,此时线程池不会立刻退出, * 直到添加到线程池中的任务都已经处理完成,才会退出。 */ executor.shutdown(); } } @Data class ThreadTask implements Runnable{ private String taskName; public ThreadTask(String taskName) { this.taskName = taskName; } @Override public void run() { //输出执行线程的名称 System.out.println("TaskName"+this.getTaskName()+"---ThreadName:"+Thread.currentThread().getName()); } }

    Processed: 0.010, SQL: 9